# -*- coding: utf-8 -*- import numpy as np from app.api.errors.iot import MissingIOTDataError from app.models.domain.devices import ACATAHFreqSetRequest from app.schemas.equipment import AHU from app.schemas.system import ACAT class AHUController: def __init__(self, equipment: AHU | None, system: ACAT | None): super(AHUController, self).__init__() self._equipment = equipment self._system = system async def build_freq_set(self, supply_air_temperature_set_duration: list[float], hot_rate: float) -> float: extent = 5 if ( self._equipment.freq_set is None or self._system.supply_static_press is None or self._system.supply_static_press_set is None ): temp_freq_set = 80.0 else: pre_fan_freq_set = self._equipment.freq_set if self._system.supply_static_press < self._system.supply_static_press_set - extent: temp_freq_set = pre_fan_freq_set + 2 elif self._system.supply_static_press > self._system.supply_static_press_set + extent: temp_freq_set = pre_fan_freq_set - 2 else: temp_freq_set = pre_fan_freq_set temperature_value_list = np.array(supply_air_temperature_set_duration) freq_upper_limit = 90.0 try: if ( temperature_value_list.size > 0 and np.all(temperature_value_list == temperature_value_list[0]) and temperature_value_list[0] <= 18.0 and hot_rate >= 0.5 ): freq_upper_limit = 100.0 except TypeError: raise MissingIOTDataError freq_set = min(temp_freq_set, freq_upper_limit) self._equipment.freq_set = freq_set return freq_set async def build_acatah_freq_set(params: ACATAHFreqSetRequest) -> float: acat_system = ACAT( supply_static_press=params.system_supply_static_press, supply_static_press_set=params.system_supply_static_press_set, ) ahu = AHU(freq_set=params.current_freq_set) ahu_controller = AHUController(ahu, acat_system) new_freq_set = await ahu_controller.build_freq_set(params.supply_air_temperature_set_list, params.spaces_hot_rate) return new_freq_set