# -*- coding: utf-8 -*- import os from enum import Enum import arrow import httpx import numpy as np import pandas as pd from httpx import AsyncClient, URL from app.core.config import settings from app.services.service import Service from app.utils.date import get_time_str, TIME_FMT from app.utils.math import round_half_up class Season(str, Enum): cooling = "Cooling" heating = "Warm" transition = "Transition" class SpaceInfoService(Service): def __init__( self, client: AsyncClient, project_id: str, space_id: str, server_settings=settings, ) -> None: super(SpaceInfoService, self).__init__(client) self._project_id = project_id self._space_id = space_id self._base_url = os.getenv("TRANSFER_HOST", server_settings.TRANSFER_HOST) self._now_time = get_time_str() def _common_parameters(self) -> dict: return {"projectId": self._project_id, "spaceId": self._space_id} async def is_temporary(self) -> bool: url = URL(f"{self._base_url}/environment/temp/target") params = self._common_parameters() params.update({"time": self._now_time}) raw_info = await self._get(url, params) flag = False if raw_info.get("flag") == 1: flag = True return flag async def get_custom_target(self) -> dict[str, pd.DataFrame]: url = URL(f"{self._base_url}/environment/normalAndPreDayTarget") params = self._common_parameters() params.update( {"date": arrow.get(self._now_time, TIME_FMT).date().strftime("%Y%m%d")} ) raw_info = await self._get(url, params) try: pre_target_df = pd.DataFrame(raw_info.get("preTargets")) pre_target_df.set_index("time", inplace=True) except (KeyError, TypeError): pre_target_df = pd.DataFrame() try: normal_target_df = pd.DataFrame(raw_info.get("normalTargets")) normal_target_df.set_index("time", inplace=True) except (KeyError, TypeError): normal_target_df = pd.DataFrame() return {"pre_targets": pre_target_df, "normal_targets": normal_target_df} async def get_current_temperature_target(self) -> float: targets = await self.get_custom_target() if len(targets.get("pre_targets")) > 0: current_targets = targets.get("pre_targets").append( targets.get("normal_targets") ) else: current_targets = targets.get("normal_targets") temp = ( arrow.get(self._now_time, TIME_FMT).shift(minutes=15).timestamp() // (15 * 60) * (15 * 60) ) next_quarter_minutes = arrow.get(temp).time().strftime("%H%M%S") try: current_lower_target = current_targets["temperatureMin"].loc[ next_quarter_minutes ] current_upper_target = current_targets["temperatureMax"].loc[ next_quarter_minutes ] except KeyError: current_lower_target, current_upper_target = np.NAN, np.NAN return round_half_up((current_lower_target + current_upper_target) / 2, 2) async def env_database_set(self, form: str, value: float) -> None: url = URL(f"{self._base_url}/environment/hispoint/set") params = self._common_parameters() time_str = arrow.get( arrow.get(self._now_time, TIME_FMT).timestamp() // 900 * 900 ).strftime("%Y%m%d%H%M%S") params.update({"time": time_str, "type": form, "value": value}) await self._get(url, params) async def env_database_get(self) -> dict[str, pd.DataFrame]: url = URL(f"{self._base_url}/environment/hispoint/get") params = self._common_parameters() params.update( {"date": arrow.get(self._now_time, TIME_FMT).date().strftime("%Y%m%d")} ) raw_info = await self._get(url, params) result = {} if raw_info.get("result") == "success": for k, v in raw_info.items(): if k != "result": if len(v) > 0: temp = {} data = np.array(v) temp.update({"timestamp": data[:, 0]}) temp.update({"value": data[:, 1].astype(np.float)}) result.update({k: pd.DataFrame(temp)}) else: result.update({k: pd.DataFrame()}) return result def set_custom_target( self, form: str, target_value: dict[str, list[float]], flag: str = "1" ) -> None: url = URL(f"{self._base_url}/environment/target/setting") params = { "projectId": self._project_id, "spaceId": self._space_id, "timepoint": self._now_time, "type": form, "flag": flag, } httpx.post(url, params=params, json=target_value) # await self._post(url, params=params, payload=target_value) async def set_temporary_custom(self) -> None: url = URL(f"{self._base_url}/environment/setServiceFlag") params = self._common_parameters() params.update({"time": self._now_time}) await self._get(url, params) async def get_season(self) -> Season: url = URL(f"{self._base_url}/environment/getSeasonType") params = { "projectId": self._project_id, "date": self._now_time, } raw_info = await self._get(url, params) return Season(raw_info.get("data")) class Duoduo(Service): def __init__(self, client: AsyncClient, project_id: str, server_settings=settings): super(Duoduo, self).__init__(client) self._project_id = project_id self._base_url = URL(os.getenv("CUSTOM_HOST", server_settings.CUSTOM_HOST)) self._now_time = get_time_str() async def is_customized(self, space_id: str) -> bool: url = URL(f"{self._base_url}/custom/timetarget") time_str = arrow.get( arrow.get(self._now_time, TIME_FMT).shift(minutes=15).timestamp() // 900 * 900 ).strftime("%Y%m%d%H%M%S") params = { "projectId": self._project_id, "objectId": space_id, "timepoint": time_str, } raw_info = await self._get(url, params) flag = False if raw_info.get("data"): flag = True return flag