# -*- coding: utf-8 -*- import numpy as np from app.api.errors.iot import MissingIOTDataError from app.models.domain.devices import ACATFUFreqSetRequest from app.schemas.season import Season from app.schemas.space import SpaceATFU from app.utils.helpers import is_off_to_on class ACATFUFanFreqController: """ Logic writen by Wuxu. """ def __init__( self, freq: float, fresh_air_temperature: float, season: Season, ) -> None: self._freq = freq self._fresh_air_temperature = fresh_air_temperature self._season = season def get_next_set(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float: try: if self._season == Season.transition: next_freq_set = self.get_transition_logic(spaces_params, on_flag) else: next_freq_set = self.get_cooling_logic(spaces_params, on_flag) next_freq_set = self.hcho_logic(spaces_params, next_freq_set) next_freq_set = max(20.0, next_freq_set) next_freq_set = min(50.0, next_freq_set) except TypeError: raise MissingIOTDataError return next_freq_set def get_transition_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float: temp_avg, co2_avg = self.get_avg(spaces_params) if on_flag: if self._fresh_air_temperature <= 16.0: freq_set = 20.0 elif self._fresh_air_temperature <= 18.0: if temp_avg <= 23.0: freq_set = 20.0 else: freq_set = 25.0 elif self._fresh_air_temperature <= 21.0: if temp_avg <= 23.0: freq_set = 25.0 elif temp_avg <= 25.5: freq_set = 30.0 else: freq_set = 35.0 elif self._fresh_air_temperature <= 24.0: if temp_avg <= 23.0: freq_set = 30.0 elif temp_avg <= 25.5: freq_set = 35.0 else: freq_set = 40.0 else: if temp_avg <= 23.0: freq_set = 35.0 elif temp_avg <= 25.5: freq_set = 30.0 else: freq_set = 25.0 if co2_avg > 750.0: freq_set = max(freq_set, 25.0) else: if self._fresh_air_temperature <= 18.0: if temp_avg <= 23.0: freq_set = 20.0 elif temp_avg <= 25.5: freq_set = self._freq - 2.0 else: freq_set = self._freq + 3.0 elif self._fresh_air_temperature <= 23.0: if temp_avg <= 23.0: freq_set = self._freq - 2.0 elif temp_avg <= 25.5: freq_set = self._freq else: freq_set = self._freq + 2.0 elif self._fresh_air_temperature <= 25.0: if temp_avg <= 23.0: freq_set = self._freq + 2.0 elif temp_avg <= 25.5: freq_set = self._freq else: freq_set = self._freq + 1.0 else: if temp_avg <= 23.0: freq_set = self._freq + 5.0 elif temp_avg <= 25.5: freq_set = self._freq - 2.0 else: freq_set = 20.0 return freq_set def get_cooling_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float: _, co2_avg = self.get_avg(spaces_params) if on_flag: freq_set = 30.0 else: high_co2_ratio = self.get_high_co2_ratio(spaces_params) if high_co2_ratio == 0.0: if co2_avg < 600.0: freq_set = self._freq - 2.0 else: freq_set = self._freq freq_set = max(25.0, freq_set) elif high_co2_ratio < 0.05: freq_set = self._freq + 2.0 freq_set = min(35.0, freq_set) elif high_co2_ratio < 0.1: freq_set = self._freq + 3.0 freq_set = min(40.0, freq_set) elif high_co2_ratio < 0.2: freq_set = self._freq + 5.0 else: freq_set = 50.0 return freq_set def get_heating_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float: # The same with cooling logic. freq_set = self.get_cooling_logic(spaces_params, on_flag) return freq_set @staticmethod def get_avg(spaces_params: list[SpaceATFU]) -> tuple[float, float]: valid_temperature_list, valid_co2_list = list(), list() for space in spaces_params: if space.realtime_temperature: if 0.0 < space.realtime_temperature < 40.0: valid_temperature_list.append(space.realtime_temperature) if space.realtime_co2: if 0.0 < space.realtime_co2 < 5000.0: valid_co2_list.append(space.realtime_co2) if valid_temperature_list: temp_avg = np.mean(valid_temperature_list) else: temp_avg = np.NAN if valid_co2_list: co2_avg = np.mean(valid_co2_list) else: co2_avg = np.NAN return temp_avg, co2_avg @staticmethod def get_high_co2_ratio(spaces_params: list[SpaceATFU]) -> float: valid_co2_count, high_co2_count = 0, 0 for space in spaces_params: if space.realtime_co2: if 0.0 < space.realtime_co2 < 5000.0: valid_co2_count += 1 if space.realtime_co2 > 900.0: high_co2_count += 1 if valid_co2_count == 0: ratio = 0.0 else: ratio = high_co2_count / valid_co2_count return ratio @staticmethod def hcho_logic(spaces_params: list[SpaceATFU], next_freq_set: float) -> float: diff = 0.0 for space in spaces_params: if space.hcho: if space.hcho >= 0.1: diff = 5 break elif space.hcho >= 0.08: diff = 3 else: diff = 0 return next_freq_set + diff async def build_acatfu_freq_set(params: ACATFUFreqSetRequest) -> float: controller = ACATFUFanFreqController(params.freq, params.fresh_air_temperature, params.season) on_flag = is_off_to_on(params.running_status_list) spaces = [SpaceATFU(**sp.dict()) for sp in params.spaces] freq_set = controller.get_next_set(spaces, on_flag) return freq_set