# -*- coding: utf-8 -*- import os from enum import Enum import arrow import numpy as np from httpx import AsyncClient, URL from loguru import logger from app.core.config import settings from app.services.service import Service from app.utils.date import get_time_str, TIME_FMT from app.utils.math import round_half_up class InfoCode(str, Enum): temperature = "Tdb" co2 = "CO2" pm2d5 = "PM2d5" humidity = "RH" supply_air_flow = "SupplyAirFlow" supply_air_flow_set = "SupplyAirFlowSet" supply_air_temperature = "SupplyAirTemp" supply_air_temperature_set = "SupplyAirTempSet" fan_speed = "FanGear" fan_speed_set = "FanGearSet" fan_freq = "FanFreq" fan_freq_set = "FanFreqSet" supply_static_press = "SupplyStaticPress" supply_static_press_set = "SupplyStaticPressSet" running_status = "RunStatus" cloud_status = "InCloudStatus" equip_switch_set = "EquipSwitchSet" return_air_temperature = "ReturnAirTemp" chill_water_valve_opening_set = "ChillWaterValveOpeningSet" hot_water_valve_opening_set = "HotWaterValveOpeningSet" water_valve_switch_set = "WaterValveSwitchSet" in_cloud_set = "InCloudSet" work_mode_set = "WorkModeSet" supply_temperature = "SupplyTemp" water_out_temperature = "WaterOutTemp" water_in_temperature = "WaterInTemp" valve_opening = "ValveOpening" class DataPlatformService(Service): def __init__(self, client: AsyncClient, project_id: str, server_settings=settings): super(DataPlatformService, self).__init__(client) self._project_id = project_id self._base_url = os.getenv("PLATFORM_HOST", server_settings.PLATFORM_HOST) self._now_time = get_time_str() self._secret = server_settings.PLATFORM_SECRET def _common_parameters(self) -> dict: return {"projectId": self._project_id, "secret": self._secret} async def get_realtime_data(self, code: InfoCode, object_id: str) -> float: url = URL(f"{self._base_url}/hisdata/query_by_obj") params = self._common_parameters() start_time = get_time_str(60 * 60, flag="ago") payload = { "criteria": { "id": object_id, "code": code.value, "receivetime": { "$gte": start_time, "$lte": self._now_time, }, } } raw_info = await self._post(url, params, payload) try: latest_data = raw_info.get("Content")[-1].get("data") latest_time = raw_info.get("Content")[-1].get("receivetime") if arrow.get(latest_time, TIME_FMT).shift(minutes=15) < arrow.get( self._now_time, TIME_FMT ): logger.info( f"delayed data - {object_id}: ({latest_time}, {latest_data})" ) value = round_half_up(latest_data, 2) except (IndexError, KeyError, TypeError): value = np.NAN return value async def get_realtime_temperature(self, space_id: str) -> float: return await self.get_realtime_data(InfoCode.temperature, space_id)