123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- # -*- coding: utf-8 -*-
- import numpy as np
- from app.api.errors.iot import MissingIOTDataError
- from app.models.domain.devices import ACATFUFreqSetRequest
- from app.schemas.season import Season
- from app.schemas.space import SpaceATFU
- from app.utils.helpers import is_off_to_on
- class ACATFUFanFreqController:
- """
- Logic writen by Wuxu.
- """
- def __init__(
- self,
- freq: float,
- fresh_air_temperature: float,
- season: Season,
- ) -> None:
- self._freq = freq
- self._fresh_air_temperature = fresh_air_temperature
- self._season = season
- def get_next_set(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
- try:
- if self._season == Season.transition:
- next_freq_set = self.get_transition_logic(spaces_params, on_flag)
- else:
- next_freq_set = self.get_cooling_logic(spaces_params, on_flag)
- next_freq_set = self.hcho_logic(spaces_params, next_freq_set)
- next_freq_set = max(20.0, next_freq_set)
- next_freq_set = min(50.0, next_freq_set)
- except TypeError:
- raise MissingIOTDataError
- return next_freq_set
- def get_transition_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
- temp_avg, co2_avg = self.get_avg(spaces_params)
- if on_flag:
- if self._fresh_air_temperature <= 16.0:
- freq_set = 20.0
- elif self._fresh_air_temperature <= 18.0:
- if temp_avg <= 23.0:
- freq_set = 20.0
- else:
- freq_set = 25.0
- elif self._fresh_air_temperature <= 21.0:
- if temp_avg <= 23.0:
- freq_set = 25.0
- elif temp_avg <= 25.5:
- freq_set = 30.0
- else:
- freq_set = 35.0
- elif self._fresh_air_temperature <= 24.0:
- if temp_avg <= 23.0:
- freq_set = 30.0
- elif temp_avg <= 25.5:
- freq_set = 35.0
- else:
- freq_set = 40.0
- else:
- if temp_avg <= 23.0:
- freq_set = 35.0
- elif temp_avg <= 25.5:
- freq_set = 30.0
- else:
- freq_set = 25.0
- if co2_avg > 750.0:
- freq_set = max(freq_set, 25.0)
- else:
- if self._fresh_air_temperature <= 18.0:
- if temp_avg <= 23.0:
- freq_set = 20.0
- elif temp_avg <= 25.5:
- freq_set = self._freq - 2.0
- else:
- freq_set = self._freq + 3.0
- elif self._fresh_air_temperature <= 23.0:
- if temp_avg <= 23.0:
- freq_set = self._freq - 2.0
- elif temp_avg <= 25.5:
- freq_set = self._freq
- else:
- freq_set = self._freq + 2.0
- elif self._fresh_air_temperature <= 25.0:
- if temp_avg <= 23.0:
- freq_set = self._freq + 2.0
- elif temp_avg <= 25.5:
- freq_set = self._freq
- else:
- freq_set = self._freq + 1.0
- else:
- if temp_avg <= 23.0:
- freq_set = self._freq + 5.0
- elif temp_avg <= 25.5:
- freq_set = self._freq - 2.0
- else:
- freq_set = 20.0
- return freq_set
- def get_cooling_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
- _, co2_avg = self.get_avg(spaces_params)
- if on_flag:
- freq_set = 30.0
- else:
- high_co2_ratio = self.get_high_co2_ratio(spaces_params)
- if high_co2_ratio == 0.0:
- if co2_avg < 600.0:
- freq_set = self._freq - 2.0
- else:
- freq_set = self._freq
- freq_set = max(25.0, freq_set)
- elif high_co2_ratio < 0.05:
- freq_set = self._freq + 2.0
- freq_set = min(35.0, freq_set)
- elif high_co2_ratio < 0.1:
- freq_set = self._freq + 3.0
- freq_set = min(40.0, freq_set)
- elif high_co2_ratio < 0.2:
- freq_set = self._freq + 5.0
- else:
- freq_set = 50.0
- return freq_set
- def get_heating_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
- # The same with cooling logic.
- freq_set = self.get_cooling_logic(spaces_params, on_flag)
- return freq_set
- @staticmethod
- def get_avg(spaces_params: list[SpaceATFU]) -> tuple[float, float]:
- valid_temperature_list, valid_co2_list = list(), list()
- for space in spaces_params:
- if space.realtime_temperature:
- if 0.0 < space.realtime_temperature < 40.0:
- valid_temperature_list.append(space.realtime_temperature)
- if space.realtime_co2:
- if 0.0 < space.realtime_co2 < 5000.0:
- valid_co2_list.append(space.realtime_co2)
- if valid_temperature_list:
- temp_avg = np.mean(valid_temperature_list)
- else:
- temp_avg = np.NAN
- if valid_co2_list:
- co2_avg = np.mean(valid_co2_list)
- else:
- co2_avg = np.NAN
- return temp_avg, co2_avg
- @staticmethod
- def get_high_co2_ratio(spaces_params: list[SpaceATFU]) -> float:
- valid_co2_count, high_co2_count = 0, 0
- for space in spaces_params:
- if space.realtime_co2:
- if 0.0 < space.realtime_co2 < 5000.0:
- valid_co2_count += 1
- if space.realtime_co2 > 900.0:
- high_co2_count += 1
- if valid_co2_count == 0:
- ratio = 0.0
- else:
- ratio = high_co2_count / valid_co2_count
- return ratio
- @staticmethod
- def hcho_logic(spaces_params: list[SpaceATFU], next_freq_set: float) -> float:
- diff = 0.0
- for space in spaces_params:
- if space.hcho:
- if space.hcho >= 0.1:
- diff = 5
- break
- elif space.hcho >= 0.08:
- diff = 3
- else:
- diff = 0
- return next_freq_set + diff
- async def build_acatfu_freq_set(params: ACATFUFreqSetRequest) -> float:
- controller = ACATFUFanFreqController(params.freq, params.fresh_air_temperature, params.season)
- on_flag = is_off_to_on(params.running_status_list)
- spaces = [SpaceATFU(**sp.dict()) for sp in params.spaces]
- freq_set = controller.get_next_set(spaces, on_flag)
- return freq_set
|