# -*- coding: utf-8 -*- import numpy as np from loguru import logger from app.api.errors.iot import MissingIOTDataError from app.models.domain.devices import ACATAHInstructionsRequest from app.schemas.equipment import AHU from app.schemas.season import Season from app.schemas.space import SpaceATAH from app.utils.date import get_time_str class ATAHController: def __init__(self, ahu: AHU, spaces: list[SpaceATAH], season: Season) -> None: self._ahu = ahu self._spaces = spaces self._season = season def get_switch_set(self) -> str: switch_set = "off" for sp in self._spaces: if sp.temperature_target: switch_set = "on" return switch_set @staticmethod def get_return_air_temp_set(virtual_target: float) -> float: return virtual_target def get_supply_air_temp_set(self, next_switch_set: str, virtual_realtime: float) -> float: if next_switch_set == "on" and not self._ahu.running_status: if self._season == Season.cooling: next_supply_set = 20.0 elif self._season == Season.transition: next_supply_set = 22.0 else: next_supply_set = 28.0 else: next_supply_set = self._ahu.supply_air_temperature_set diff = self._ahu.return_air_temperature - virtual_realtime if self._season == Season.cooling: if diff > 1.0 and self._ahu.freq >= self._ahu.fan_freq_upper_limit_set: next_supply_set -= 1.0 if diff < -1.0 and self._ahu.freq <= self._ahu.fan_freq_lower_limit_set: next_supply_set += 1.0 next_supply_set = np.max([np.min([23.0, next_supply_set]), 18.0]) logger.debug(next_supply_set) elif self._season == Season.heating: if diff > 1.0 and self._ahu.freq <= self._ahu.fan_freq_lower_limit_set: next_supply_set -= 1.0 if diff < 1.0 and self._ahu.freq >= self._ahu.fan_freq_upper_limit_set: next_supply_set += 1.0 next_supply_set = np.max([np.min([28.0, next_supply_set]), 22.0]) return next_supply_set def get_freq_set(self, next_switch_set: str, virtual_target: float) -> float: if next_switch_set == "on" and not self._ahu.running_status: next_freq_set = 40.0 else: diff = self._ahu.return_air_temperature - virtual_target if self._season == Season.heating: if diff > 1.0: adjust = -2.0 elif diff < -1.0: adjust = 2.0 else: adjust = 0.0 elif self._season == Season.cooling: if diff > 1.0: adjust = 2.0 elif diff < 1.0: adjust = -2.0 else: adjust = 0.0 else: if diff > 1.0: if self._ahu.supply_air_temperature > self._ahu.return_air_temperature: adjust = -2.0 elif self._ahu.supply_air_temperature < self._ahu.return_air_temperature - 1.0: adjust = 2.0 else: adjust = 1.0 elif diff < -1.0: if self._ahu.supply_air_temperature_set < self._ahu.return_air_temperature: adjust = -2.0 elif self._ahu.supply_air_temperature > self._ahu.return_air_temperature + 1: adjust = 2.0 else: adjust = 1.0 else: adjust = 0.0 next_freq_set = self._ahu.freq_set + adjust next_freq_set = np.max( [np.min([self._ahu.fan_freq_upper_limit_set, next_freq_set]), self._ahu.fan_freq_lower_limit_set]) return next_freq_set def get_valid_spaces(self) -> list[SpaceATAH]: valid_spaces = list() for sp in self._spaces: if sp.realtime_temperature and sp.temperature_target: valid_spaces.append(sp) return valid_spaces def build_virtual_temperature(self) -> tuple[float, float]: valid_spaces = self.get_valid_spaces() if not valid_spaces: virtual_realtime, virtual_target = np.NAN, np.NAN else: sorted_spaces = sorted(valid_spaces, key=lambda x: x.ahu_temporary_update_time) if sorted_spaces[-1].ahu_temporary_update_time > get_time_str(60 * 60 * 2, flag="ago"): virtual_realtime = sorted_spaces[-1].realtime_temperature virtual_target = sorted_spaces[-1].temperature_target else: virtual_realtime, virtual_target = 0.0, 0.0 total_weight = 0.0 for sp in valid_spaces: weight = sp.ahu_default_weight virtual_realtime += sp.realtime_temperature * weight virtual_target += sp.temperature_target * weight total_weight += weight if total_weight == 0: for sp in valid_spaces: virtual_realtime += sp.realtime_temperature virtual_target += sp.temperature_target valid_spaces_length = len(valid_spaces) virtual_realtime /= valid_spaces_length virtual_target /= valid_spaces_length else: virtual_realtime /= total_weight virtual_target /= total_weight return virtual_realtime, virtual_target def run(self) -> tuple[str, float, float, float]: try: virtual_realtime, virtual_target = self.build_virtual_temperature() new_switch_set = self.get_switch_set() if not self._ahu.return_air_temperature_set: new_freq_set = self.get_freq_set(new_switch_set, virtual_target) new_return_air_temp_set = np.NAN else: new_return_air_temp_set = self.get_return_air_temp_set(virtual_target) new_freq_set = np.NAN new_supply_air_temp_set = self.get_supply_air_temp_set(new_switch_set, virtual_realtime) except TypeError: raise MissingIOTDataError return new_switch_set, new_return_air_temp_set, new_freq_set, new_supply_air_temp_set async def build_acatah_instructions(params: ACATAHInstructionsRequest): space_params = [] for sp in params.spaces: temp_sp = SpaceATAH(**sp.dict()) if temp_sp.temperature_target and temp_sp.realtime_temperature: temp_sp.diff = temp_sp.temperature_target - temp_sp.realtime_temperature space_params.append(temp_sp) ahu = AHU(**params.dict()) ahu_controller = ATAHController(ahu, space_params, params.season) switch_set, return_air_temp_set, freq_set, supply_air_temp_set = ahu_controller.run() instructions = {"switch_set": switch_set} if not np.isnan(return_air_temp_set): instructions.update({"return_air_temp_set": return_air_temp_set}) if not np.isnan(freq_set): instructions.update({"freq_set": freq_set}) if not np.isnan(supply_air_temp_set): instructions.update({"supply_air_temp_set": supply_air_temp_set}) return instructions