common.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # -*- coding: utf-8 -*-
  2. import numpy as np
  3. from loguru import logger
  4. from app.api.errors.iot import MissingIOTDataError
  5. from app.models.domain.devices import ACATAHInstructionsRequest
  6. from app.schemas.equipment import AHU
  7. from app.schemas.season import Season
  8. from app.schemas.space import SpaceATAH
  9. from app.utils.date import get_time_str
  10. class ATAHController:
  11. def __init__(self, ahu: AHU, spaces: list[SpaceATAH], season: Season) -> None:
  12. self._ahu = ahu
  13. self._spaces = spaces
  14. self._season = season
  15. def get_switch_set(self) -> str:
  16. switch_set = "off"
  17. for sp in self._spaces:
  18. if sp.temperature_target:
  19. switch_set = "on"
  20. return switch_set
  21. @staticmethod
  22. def get_return_air_temp_set(virtual_target: float) -> float:
  23. return virtual_target
  24. def get_supply_air_temp_set(self, next_switch_set: str, virtual_realtime: float) -> float:
  25. if next_switch_set == "on" and not self._ahu.running_status:
  26. if self._season == Season.cooling:
  27. next_supply_set = 20.0
  28. elif self._season == Season.transition:
  29. next_supply_set = 22.0
  30. else:
  31. next_supply_set = 28.0
  32. else:
  33. next_supply_set = self._ahu.supply_air_temperature_set
  34. diff = self._ahu.return_air_temperature - virtual_realtime
  35. if self._season == Season.cooling:
  36. if diff > 1.0 and self._ahu.freq >= self._ahu.fan_freq_upper_limit_set:
  37. next_supply_set -= 1.0
  38. if diff < -1.0 and self._ahu.freq <= self._ahu.fan_freq_lower_limit_set:
  39. next_supply_set += 1.0
  40. next_supply_set = np.max([np.min([23.0, next_supply_set]), 18.0])
  41. logger.debug(next_supply_set)
  42. elif self._season == Season.heating:
  43. if diff > 1.0 and self._ahu.freq <= self._ahu.fan_freq_lower_limit_set:
  44. next_supply_set -= 1.0
  45. if diff < 1.0 and self._ahu.freq >= self._ahu.fan_freq_upper_limit_set:
  46. next_supply_set += 1.0
  47. next_supply_set = np.max([np.min([28.0, next_supply_set]), 22.0])
  48. return next_supply_set
  49. def get_freq_set(self, next_switch_set: str, virtual_target: float) -> float:
  50. if next_switch_set == "on" and not self._ahu.running_status:
  51. next_freq_set = 40.0
  52. else:
  53. diff = self._ahu.return_air_temperature - virtual_target
  54. if self._season == Season.heating:
  55. if diff > 1.0:
  56. adjust = -2.0
  57. elif diff < -1.0:
  58. adjust = 2.0
  59. else:
  60. adjust = 0.0
  61. elif self._season == Season.cooling:
  62. if diff > 1.0:
  63. adjust = 2.0
  64. elif diff < 1.0:
  65. adjust = -2.0
  66. else:
  67. adjust = 0.0
  68. else:
  69. if diff > 1.0:
  70. if self._ahu.supply_air_temperature > self._ahu.return_air_temperature:
  71. adjust = -2.0
  72. elif self._ahu.supply_air_temperature < self._ahu.return_air_temperature - 1.0:
  73. adjust = 2.0
  74. else:
  75. adjust = 1.0
  76. elif diff < -1.0:
  77. if self._ahu.supply_air_temperature_set < self._ahu.return_air_temperature:
  78. adjust = -2.0
  79. elif self._ahu.supply_air_temperature > self._ahu.return_air_temperature + 1:
  80. adjust = 2.0
  81. else:
  82. adjust = 1.0
  83. else:
  84. adjust = 0.0
  85. next_freq_set = self._ahu.freq_set + adjust
  86. next_freq_set = np.max(
  87. [np.min([self._ahu.fan_freq_upper_limit_set, next_freq_set]), self._ahu.fan_freq_lower_limit_set])
  88. return next_freq_set
  89. def get_valid_spaces(self) -> list[SpaceATAH]:
  90. valid_spaces = list()
  91. for sp in self._spaces:
  92. if sp.realtime_temperature and sp.temperature_target:
  93. valid_spaces.append(sp)
  94. return valid_spaces
  95. def build_virtual_temperature(self) -> tuple[float, float]:
  96. valid_spaces = self.get_valid_spaces()
  97. if not valid_spaces:
  98. virtual_realtime, virtual_target = np.NAN, np.NAN
  99. else:
  100. sorted_spaces = sorted(valid_spaces, key=lambda x: x.ahu_temporary_update_time)
  101. if sorted_spaces[-1].ahu_temporary_update_time > get_time_str(60 * 60 * 2, flag="ago"):
  102. virtual_realtime = sorted_spaces[-1].realtime_temperature
  103. virtual_target = sorted_spaces[-1].temperature_target
  104. else:
  105. virtual_realtime, virtual_target = 0.0, 0.0
  106. total_weight = 0.0
  107. for sp in valid_spaces:
  108. weight = sp.ahu_default_weight
  109. virtual_realtime += sp.realtime_temperature * weight
  110. virtual_target += sp.temperature_target * weight
  111. total_weight += weight
  112. if total_weight == 0:
  113. for sp in valid_spaces:
  114. virtual_realtime += sp.realtime_temperature
  115. virtual_target += sp.temperature_target
  116. valid_spaces_length = len(valid_spaces)
  117. virtual_realtime /= valid_spaces_length
  118. virtual_target /= valid_spaces_length
  119. else:
  120. virtual_realtime /= total_weight
  121. virtual_target /= total_weight
  122. return virtual_realtime, virtual_target
  123. def run(self) -> tuple[str, float, float, float]:
  124. try:
  125. virtual_realtime, virtual_target = self.build_virtual_temperature()
  126. new_switch_set = self.get_switch_set()
  127. if not self._ahu.return_air_temperature_set:
  128. new_freq_set = self.get_freq_set(new_switch_set, virtual_target)
  129. new_return_air_temp_set = np.NAN
  130. else:
  131. new_return_air_temp_set = self.get_return_air_temp_set(virtual_target)
  132. new_freq_set = np.NAN
  133. new_supply_air_temp_set = self.get_supply_air_temp_set(new_switch_set, virtual_realtime)
  134. except TypeError:
  135. raise MissingIOTDataError
  136. return new_switch_set, new_return_air_temp_set, new_freq_set, new_supply_air_temp_set
  137. async def build_acatah_instructions(params: ACATAHInstructionsRequest):
  138. space_params = []
  139. for sp in params.spaces:
  140. temp_sp = SpaceATAH(**sp.dict())
  141. if temp_sp.temperature_target and temp_sp.realtime_temperature:
  142. temp_sp.diff = temp_sp.temperature_target - temp_sp.realtime_temperature
  143. space_params.append(temp_sp)
  144. ahu = AHU(**params.dict())
  145. ahu_controller = ATAHController(ahu, space_params, params.season)
  146. switch_set, return_air_temp_set, freq_set, supply_air_temp_set = ahu_controller.run()
  147. instructions = {"switch_set": switch_set}
  148. if not np.isnan(return_air_temp_set):
  149. instructions.update({"return_air_temp_set": return_air_temp_set})
  150. if not np.isnan(freq_set):
  151. instructions.update({"freq_set": freq_set})
  152. if not np.isnan(supply_air_temp_set):
  153. instructions.update({"supply_air_temp_set": supply_air_temp_set})
  154. return instructions