thermal_comfort.py 10 KB


  1. from abc import ABC
  2. from app.models.domain.diagnosis import ThermalComfortDiagnosisRequest
  3. from app.schemas.diagnosis import FaultCategory
  4. from app.schemas.equipment import AHU, FCU, VAVBox, VRF
  5. from app.services.transfer import Season
  6. class DiagnotorBase(ABC):
  7. def __init__(self):
  8. self._result = FaultCategory()
  9. def controller_check(
  10. self,
  11. setting_value: float,
  12. feedback_value: float,
  13. is_categorical: bool = True,
  14. tolerance_pct: float = 0.0,
  15. ) -> None:
  16. if is_categorical:
  17. if setting_value != feedback_value:
  18. self._result.controller_err = True
  19. else:
  20. if not setting_value * (1 - tolerance_pct) <= feedback_value <= setting_value * (1 + tolerance_pct):
  21. self._result.controller_err = True
  22. class ThermalComfortDiagnotor(DiagnotorBase):
  23. def __init__(self):
  24. super(ThermalComfortDiagnotor, self).__init__()
  25. class FCUDiagnotor(DiagnotorBase):
  26. def __init__(
  27. self,
  28. fcu: FCU,
  29. ahu: AHU,
  30. duration: float,
  31. target_temp: float,
  32. realtime_temp: float,
  33. season: Season,
  34. ):
  35. super(FCUDiagnotor, self).__init__()
  36. self._fcu = fcu
  37. self._ahu = ahu
  38. self._duration = duration
  39. self._target_temp = target_temp
  40. self._realtime_temp = realtime_temp
  41. self._season = season
  42. def low_in_heating(self) -> None:
  43. if self._fcu.air_valve_speed == "off":
  44. if self._fcu.speed_limit == "off":
  45. self._result.over_constrained = True
  46. else:
  47. self._result.control_logic_err = True
  48. else:
  49. if self._fcu.air_valve_speed == self._fcu.recommended_speed:
  50. self.controller_check(
  51. self._fcu.air_valve_speed_set, self._fcu.air_valve_speed
  52. )
  53. if not self._result.controller_err:
  54. if self._fcu.air_valve_speed_set == "high":
  55. if self._duration > 120:
  56. if self._fcu.supply_air_temperature < self._target_temp + 1:
  57. self._result.insufficient_heating = True
  58. else:
  59. self._result.undefined_err = True
  60. else:
  61. self._result.no_problems_found = True
  62. else:
  63. self._result.control_logic_err = True
  64. else:
  65. if self._fcu.speed_limit == self._fcu.air_valve_speed:
  66. self._result.over_constrained = True
  67. else:
  68. self._result.obj_info_err = True
  69. def low_in_cooling(self) -> None:
  70. if self._fcu.recommended_speed == "off":
  71. if self._fcu.air_valve_speed == self._fcu.recommended_speed:
  72. self.controller_check(
  73. self._fcu.air_valve_speed_set, self._fcu.air_valve_speed
  74. )
  75. if not self._result.controller_err:
  76. if self._duration > 120:
  77. self._result.undefined_err = True
  78. else:
  79. self._result.no_problems_found = True
  80. else:
  81. self._result.obj_info_err = True
  82. else:
  83. self._result.control_logic_err = True
  84. def high_in_heating(self) -> None:
  85. self.low_in_cooling()
  86. def high_in_cooling(self) -> None:
  87. self.low_in_heating()
  88. if self._result.insufficient_heating:
  89. self._result.insufficient_heating = False
  90. self._result.insufficient_cooling = True
  91. def run(self) -> None:
  92. if (
  93. self._season == Season.heating
  94. or self._fcu.supply_air_temperature > self._target_temp
  95. ):
  96. if self._realtime_temp < self._target_temp - 1:
  97. self.low_in_heating()
  98. elif self._realtime_temp > self._target_temp + 1:
  99. self.high_in_heating()
  100. elif (
  101. self._season == Season.cooling
  102. or self._fcu.supply_air_temperature < self._target_temp
  103. ):
  104. if self._realtime_temp < self._target_temp - 1:
  105. self.low_in_cooling()
  106. elif self._realtime_temp > self._target_temp + 1:
  107. self.high_in_cooling()
  108. class VAVDiagnotor(DiagnotorBase):
  109. def __init__(
  110. self,
  111. vav: VAVBox,
  112. ahu: AHU,
  113. known_err: FaultCategory,
  114. duration: float,
  115. season: Season,
  116. ):
  117. super(VAVDiagnotor, self).__init__()
  118. self._vav = vav
  119. self._ahu = ahu
  120. self._known_err = known_err
  121. self._duration = duration
  122. self._season = season
  123. def low_in_heating(self) -> None:
  124. self.controller_check(
  125. self._vav.supply_air_flow_set,
  126. self._vav.supply_air_flow,
  127. is_categorical=False,
  128. tolerance_pct=0.1,
  129. )
  130. if (
  131. self._vav.recommended_supply_air_flow
  132. >= self._vav.supply_air_flow_upper_limit
  133. ):
  134. if self._vav.recommended_supply_air_flow == self._vav.supply_air_flow_set:
  135. if not self._result.controller_err:
  136. if self._duration > 120.0:
  137. if (
  138. self._vav.supply_air_temperature
  139. < self._vav.virtual_target_temperature + 1
  140. or self._ahu.supply_air_temperature
  141. < self._vav.virtual_target_temperature + 2
  142. ):
  143. self._result.insufficient_heating = True
  144. else:
  145. if self._vav.supply_air_flow_upper_limit < 1200.0:
  146. self._result.unreasonable_vav_flow_limit = True
  147. else:
  148. if not self._known_err.sensor_err:
  149. self._result.undefined_err = True
  150. else:
  151. self._result.no_problems_found = True
  152. else:
  153. self._result.obj_info_err = True
  154. else:
  155. if self._vav.recommended_supply_air_flow != self._vav.supply_air_flow_set:
  156. self._result.obj_info_err = True
  157. def low_in_cooling(self) -> None:
  158. if self._vav.recommended_supply_air_flow == self._vav.supply_air_flow_set:
  159. self.controller_check(
  160. self._vav.supply_air_flow_set,
  161. self._vav.supply_air_flow,
  162. is_categorical=False,
  163. tolerance_pct=0.1,
  164. )
  165. if self._vav.supply_air_flow_set <= self._vav.supply_air_flow_lower_limit:
  166. if not self._result.controller_err:
  167. if self._duration >= 120.0:
  168. if self._vav.supply_air_flow_lower_limit > 400:
  169. self._result.unreasonable_vav_flow_limit = True
  170. else:
  171. if not self._known_err.sensor_err:
  172. if (
  173. self._vav.supply_air_temperature < 18
  174. or self._ahu.supply_air_temperature < 18
  175. ):
  176. self._result.excessive_cooling = True
  177. else:
  178. self._result.obj_info_err = True
  179. else:
  180. self._result.no_problems_found = True
  181. else:
  182. self._result.no_problems_found = True
  183. else:
  184. self._result.obj_info_err = True
  185. def high_in_heating(self) -> None:
  186. self.low_in_cooling()
  187. if self._result.excessive_cooling:
  188. self._result.excessive_cooling = False
  189. self._result.excessive_heating = True
  190. def high_in_cooling(self) -> None:
  191. self.low_in_heating()
  192. if (
  193. self._vav.recommended_supply_air_flow
  194. >= self._vav.supply_air_flow_upper_limit
  195. ):
  196. if self._vav.recommended_supply_air_flow == self._vav.supply_air_flow_set:
  197. if not self._result.controller_err:
  198. if self._duration > 120.0:
  199. if (
  200. self._vav.supply_air_temperature
  201. > self._vav.virtual_target_temperature + 1
  202. or self._ahu.supply_air_temperature
  203. > self._vav.virtual_target_temperature + 2
  204. ):
  205. self._result.insufficient_cooling = True
  206. def run(self) -> None:
  207. if (
  208. self._season == Season.heating
  209. or self._vav.supply_air_temperature > self._vav.virtual_target_temperature
  210. ):
  211. if (
  212. self._vav.virtual_realtime_temperature
  213. < self._vav.virtual_target_temperature - 1
  214. ):
  215. self.low_in_heating()
  216. elif (
  217. self._vav.virtual_realtime_temperature
  218. > self._vav.virtual_target_temperature + 1
  219. ):
  220. self.high_in_heating()
  221. elif (
  222. self._season == Season.cooling
  223. or self._vav.supply_air_temperature < self._vav.virtual_target_temperature
  224. ):
  225. if (
  226. self._vav.virtual_realtime_temperature
  227. < self._vav.virtual_target_temperature - 1
  228. ):
  229. self.low_in_cooling()
  230. elif (
  231. self._vav.virtual_realtime_temperature
  232. > self._vav.virtual_target_temperature + 1
  233. ):
  234. self.high_in_cooling()
  235. class VRFDiagnotor(DiagnotorBase):
  236. def __init__(self, vrf: VRF):
  237. super(VRFDiagnotor, self).__init__()
  238. self._vrf = vrf
  239. def get_diagnosis_result(params: ThermalComfortDiagnosisRequest) -> FaultCategory:
  240. pass