freq_set.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. # -*- coding: utf-8 -*-
  2. import numpy as np
  3. from app.api.errors.iot import MissingIOTDataError
  4. from app.models.domain.devices import ACATFUFreqSetRequest
  5. from app.schemas.season import Season
  6. from app.schemas.space import SpaceATFU
  7. from app.utils.helpers import is_off_to_on
  8. class ACATFUFanFreqController:
  9. """
  10. Logic writen by Wuxu.
  11. """
  12. def __init__(
  13. self,
  14. freq: float,
  15. fresh_air_temperature: float,
  16. season: Season,
  17. ) -> None:
  18. self._freq = freq
  19. self._fresh_air_temperature = fresh_air_temperature
  20. self._season = season
  21. def get_next_set(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
  22. try:
  23. if self._season == Season.transition:
  24. next_freq_set = self.get_transition_logic(spaces_params, on_flag)
  25. else:
  26. next_freq_set = self.get_cooling_logic(spaces_params, on_flag)
  27. next_freq_set = self.hcho_logic(spaces_params, next_freq_set)
  28. next_freq_set = max(20.0, next_freq_set)
  29. next_freq_set = min(50.0, next_freq_set)
  30. except TypeError:
  31. raise MissingIOTDataError
  32. return next_freq_set
  33. def get_transition_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
  34. temp_avg, co2_avg = self.get_avg(spaces_params)
  35. if on_flag:
  36. if self._fresh_air_temperature <= 16.0:
  37. freq_set = 20.0
  38. elif self._fresh_air_temperature <= 18.0:
  39. if temp_avg <= 23.0:
  40. freq_set = 20.0
  41. else:
  42. freq_set = 25.0
  43. elif self._fresh_air_temperature <= 21.0:
  44. if temp_avg <= 23.0:
  45. freq_set = 25.0
  46. elif temp_avg <= 25.5:
  47. freq_set = 30.0
  48. else:
  49. freq_set = 35.0
  50. elif self._fresh_air_temperature <= 24.0:
  51. if temp_avg <= 23.0:
  52. freq_set = 30.0
  53. elif temp_avg <= 25.5:
  54. freq_set = 35.0
  55. else:
  56. freq_set = 40.0
  57. else:
  58. if temp_avg <= 23.0:
  59. freq_set = 35.0
  60. elif temp_avg <= 25.5:
  61. freq_set = 30.0
  62. else:
  63. freq_set = 25.0
  64. if co2_avg > 750.0:
  65. freq_set = max(freq_set, 25.0)
  66. else:
  67. if self._fresh_air_temperature <= 18.0:
  68. if temp_avg <= 23.0:
  69. freq_set = 20.0
  70. elif temp_avg <= 25.5:
  71. freq_set = self._freq - 2.0
  72. else:
  73. freq_set = self._freq + 3.0
  74. elif self._fresh_air_temperature <= 23.0:
  75. if temp_avg <= 23.0:
  76. freq_set = self._freq - 2.0
  77. elif temp_avg <= 25.5:
  78. freq_set = self._freq
  79. else:
  80. freq_set = self._freq + 2.0
  81. elif self._fresh_air_temperature <= 25.0:
  82. if temp_avg <= 23.0:
  83. freq_set = self._freq + 2.0
  84. elif temp_avg <= 25.5:
  85. freq_set = self._freq
  86. else:
  87. freq_set = self._freq + 1.0
  88. else:
  89. if temp_avg <= 23.0:
  90. freq_set = self._freq + 5.0
  91. elif temp_avg <= 25.5:
  92. freq_set = self._freq - 2.0
  93. else:
  94. freq_set = 20.0
  95. return freq_set
  96. def get_cooling_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
  97. _, co2_avg = self.get_avg(spaces_params)
  98. if on_flag:
  99. freq_set = 30.0
  100. else:
  101. high_co2_ratio = self.get_high_co2_ratio(spaces_params)
  102. if high_co2_ratio == 0.0:
  103. if co2_avg < 600.0:
  104. freq_set = self._freq - 2.0
  105. else:
  106. freq_set = self._freq
  107. freq_set = max(25.0, freq_set)
  108. elif high_co2_ratio < 0.05:
  109. freq_set = self._freq + 2.0
  110. freq_set = min(35.0, freq_set)
  111. elif high_co2_ratio < 0.1:
  112. freq_set = self._freq + 3.0
  113. freq_set = min(40.0, freq_set)
  114. elif high_co2_ratio < 0.2:
  115. freq_set = self._freq + 5.0
  116. else:
  117. freq_set = 50.0
  118. return freq_set
  119. def get_heating_logic(self, spaces_params: list[SpaceATFU], on_flag: bool) -> float:
  120. # The same with cooling logic.
  121. freq_set = self.get_cooling_logic(spaces_params, on_flag)
  122. return freq_set
  123. @staticmethod
  124. def get_avg(spaces_params: list[SpaceATFU]) -> tuple[float, float]:
  125. valid_temperature_list, valid_co2_list = list(), list()
  126. for space in spaces_params:
  127. if space.realtime_temperature:
  128. if 0.0 < space.realtime_temperature < 40.0:
  129. valid_temperature_list.append(space.realtime_temperature)
  130. if space.realtime_co2:
  131. if 0.0 < space.realtime_co2 < 5000.0:
  132. valid_co2_list.append(space.realtime_co2)
  133. if valid_temperature_list:
  134. temp_avg = np.mean(valid_temperature_list)
  135. else:
  136. temp_avg = np.NAN
  137. if valid_co2_list:
  138. co2_avg = np.mean(valid_co2_list)
  139. else:
  140. co2_avg = np.NAN
  141. return temp_avg, co2_avg
  142. @staticmethod
  143. def get_high_co2_ratio(spaces_params: list[SpaceATFU]) -> float:
  144. valid_co2_count, high_co2_count = 0, 0
  145. for space in spaces_params:
  146. if space.realtime_co2:
  147. if 0.0 < space.realtime_co2 < 5000.0:
  148. valid_co2_count += 1
  149. if space.realtime_co2 > 900.0:
  150. high_co2_count += 1
  151. if valid_co2_count == 0:
  152. ratio = 0.0
  153. else:
  154. ratio = high_co2_count / valid_co2_count
  155. return ratio
  156. @staticmethod
  157. def hcho_logic(spaces_params: list[SpaceATFU], next_freq_set: float) -> float:
  158. diff = 0.0
  159. for space in spaces_params:
  160. if space.hcho:
  161. if space.hcho >= 0.1:
  162. diff = 5
  163. break
  164. elif space.hcho >= 0.08:
  165. diff = 3
  166. else:
  167. diff = 0
  168. return next_freq_set + diff
  169. async def build_acatfu_freq_set(params: ACATFUFreqSetRequest) -> float:
  170. controller = ACATFUFanFreqController(params.freq, params.fresh_air_temperature, params.season)
  171. on_flag = is_off_to_on(params.running_status_list)
  172. spaces = [SpaceATFU(**sp.dict()) for sp in params.spaces]
  173. freq_set = controller.get_next_set(spaces, on_flag)
  174. return freq_set