123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- # -*- coding: utf-8 -*-
- import numpy as np
- from loguru import logger
- from app.api.errors.iot import MissingIOTDataError
- from app.models.domain.devices import ACATAHInstructionsRequest
- from app.schemas.equipment import AHU
- from app.schemas.season import Season
- from app.schemas.space import SpaceATAH
- from app.utils.date import get_time_str
- class ATAHController:
- def __init__(self, ahu: AHU, spaces: list[SpaceATAH], season: Season) -> None:
- self._ahu = ahu
- self._spaces = spaces
- self._season = season
- def get_switch_set(self) -> str:
- switch_set = "off"
- for sp in self._spaces:
- if sp.temperature_target:
- switch_set = "on"
- return switch_set
- @staticmethod
- def get_return_air_temp_set(virtual_target: float) -> float:
- return virtual_target
- def get_supply_air_temp_set(self, next_switch_set: str, virtual_realtime: float) -> float:
- if next_switch_set == "on" and not self._ahu.running_status:
- if self._season == Season.cooling:
- next_supply_set = 20.0
- elif self._season == Season.transition:
- next_supply_set = 22.0
- else:
- next_supply_set = 28.0
- else:
- next_supply_set = self._ahu.supply_air_temperature_set
- diff = self._ahu.return_air_temperature - virtual_realtime
- if self._season == Season.cooling:
- if diff > 1.0 and self._ahu.freq >= self._ahu.fan_freq_upper_limit_set:
- next_supply_set -= 1.0
- if diff < -1.0 and self._ahu.freq <= self._ahu.fan_freq_lower_limit_set:
- next_supply_set += 1.0
- next_supply_set = np.max([np.min([23.0, next_supply_set]), 18.0])
- logger.debug(next_supply_set)
- elif self._season == Season.heating:
- if diff > 1.0 and self._ahu.freq <= self._ahu.fan_freq_lower_limit_set:
- next_supply_set -= 1.0
- if diff < 1.0 and self._ahu.freq >= self._ahu.fan_freq_upper_limit_set:
- next_supply_set += 1.0
- next_supply_set = np.max([np.min([28.0, next_supply_set]), 22.0])
- return next_supply_set
- def get_freq_set(self, next_switch_set: str, virtual_target: float) -> float:
- if next_switch_set == "on" and not self._ahu.running_status:
- next_freq_set = 40.0
- else:
- diff = self._ahu.return_air_temperature - virtual_target
- if self._season == Season.heating:
- if diff > 1.0:
- adjust = -2.0
- elif diff < -1.0:
- adjust = 2.0
- else:
- adjust = 0.0
- elif self._season == Season.cooling:
- if diff > 1.0:
- adjust = 2.0
- elif diff < 1.0:
- adjust = -2.0
- else:
- adjust = 0.0
- else:
- if diff > 1.0:
- if self._ahu.supply_air_temperature > self._ahu.return_air_temperature:
- adjust = -2.0
- elif self._ahu.supply_air_temperature < self._ahu.return_air_temperature - 1.0:
- adjust = 2.0
- else:
- adjust = 1.0
- elif diff < -1.0:
- if self._ahu.supply_air_temperature_set < self._ahu.return_air_temperature:
- adjust = -2.0
- elif self._ahu.supply_air_temperature > self._ahu.return_air_temperature + 1:
- adjust = 2.0
- else:
- adjust = 1.0
- else:
- adjust = 0.0
- next_freq_set = self._ahu.freq_set + adjust
- next_freq_set = np.max(
- [np.min([self._ahu.fan_freq_upper_limit_set, next_freq_set]), self._ahu.fan_freq_lower_limit_set])
- return next_freq_set
- def get_valid_spaces(self) -> list[SpaceATAH]:
- valid_spaces = list()
- for sp in self._spaces:
- if sp.realtime_temperature and sp.temperature_target:
- valid_spaces.append(sp)
- return valid_spaces
- def build_virtual_temperature(self) -> tuple[float, float]:
- valid_spaces = self.get_valid_spaces()
- if not valid_spaces:
- virtual_realtime, virtual_target = np.NAN, np.NAN
- else:
- sorted_spaces = sorted(valid_spaces, key=lambda x: x.ahu_temporary_update_time)
- if sorted_spaces[-1].ahu_temporary_update_time > get_time_str(60 * 60 * 2, flag="ago"):
- virtual_realtime = sorted_spaces[-1].realtime_temperature
- virtual_target = sorted_spaces[-1].temperature_target
- else:
- virtual_realtime, virtual_target = 0.0, 0.0
- total_weight = 0.0
- for sp in valid_spaces:
- weight = sp.ahu_default_weight
- virtual_realtime += sp.realtime_temperature * weight
- virtual_target += sp.temperature_target * weight
- total_weight += weight
- if total_weight == 0:
- for sp in valid_spaces:
- virtual_realtime += sp.realtime_temperature
- virtual_target += sp.temperature_target
- valid_spaces_length = len(valid_spaces)
- virtual_realtime /= valid_spaces_length
- virtual_target /= valid_spaces_length
- else:
- virtual_realtime /= total_weight
- virtual_target /= total_weight
- return virtual_realtime, virtual_target
- def run(self) -> tuple[str, float, float, float]:
- try:
- virtual_realtime, virtual_target = self.build_virtual_temperature()
- new_switch_set = self.get_switch_set()
- if not self._ahu.return_air_temperature_set:
- new_freq_set = self.get_freq_set(new_switch_set, virtual_target)
- new_return_air_temp_set = np.NAN
- else:
- new_return_air_temp_set = self.get_return_air_temp_set(virtual_target)
- new_freq_set = np.NAN
- new_supply_air_temp_set = self.get_supply_air_temp_set(new_switch_set, virtual_realtime)
- except TypeError:
- raise MissingIOTDataError
- return new_switch_set, new_return_air_temp_set, new_freq_set, new_supply_air_temp_set
- async def build_acatah_instructions(params: ACATAHInstructionsRequest):
- space_params = []
- for sp in params.spaces:
- temp_sp = SpaceATAH(**sp.dict())
- if temp_sp.temperature_target and temp_sp.realtime_temperature:
- temp_sp.diff = temp_sp.temperature_target - temp_sp.realtime_temperature
- space_params.append(temp_sp)
- ahu = AHU(**params.dict())
- ahu_controller = ATAHController(ahu, space_params, params.season)
- switch_set, return_air_temp_set, freq_set, supply_air_temp_set = ahu_controller.run()
- instructions = {"switch_set": switch_set}
- if not np.isnan(return_air_temp_set):
- instructions.update({"return_air_temp_set": return_air_temp_set})
- if not np.isnan(freq_set):
- instructions.update({"freq_set": freq_set})
- if not np.isnan(supply_air_temp_set):
- instructions.update({"supply_air_temp_set": supply_air_temp_set})
- return instructions
|