import arrow import numpy as np from loguru import logger from app.api.errors.iot import MissingIOTDataError from app.controllers.equipment.ahu.thermal_mode import count_vav_box_weight from app.models.domain.devices import ThermalMode, ACATAHSupplyAirTempSetRequest from app.schemas.equipment import VAVBox from app.services.transfer import Season from app.utils.date import get_time_str, TIME_FMT class ACATAHSupplyAirTemperatureController: """ Supply air temperature setting logic version 2 by WuXu. """ def __init__( self, vav_boxes_list: list[VAVBox], current_set: float, return_air: float, thermal_mode: ThermalMode, is_off_to_on: bool, is_thermal_mode_switched: bool, season: Season, ): super(ACATAHSupplyAirTemperatureController, self).__init__() self.vav_boxes_list = vav_boxes_list self.current_set = current_set self.return_air = return_air self.thermal_mode = thermal_mode self.is_off_to_on = is_off_to_on self.is_thermal_mode_switched = is_thermal_mode_switched self.season = season def calculate_by_cold_vav(self, cold_ratio: float) -> float: if self.thermal_mode == ThermalMode.cooling: if cold_ratio < 0.3: new = self.current_set - 1.0 elif cold_ratio < 0.45: new = self.current_set - 0.5 elif cold_ratio <= 0.55: new = self.current_set elif cold_ratio <= 0.7: new = self.current_set + 1.0 elif cold_ratio <= 1.0: new = self.current_set + 1.5 else: new = self.current_set elif self.thermal_mode == ThermalMode.heating: if cold_ratio < 0.3: new = self.return_air elif cold_ratio < 0.45: new = self.current_set - 1.0 elif cold_ratio <= 0.55: new = self.current_set elif cold_ratio <= 0.7: new = self.current_set + 0.5 elif cold_ratio <= 1.0: new = self.current_set + 1.0 else: new = self.current_set else: new = self.current_set return new def get_cold_ratio(self): cold, total = 0, 0 for box in self.vav_boxes_list: temp = count_vav_box_weight( box.virtual_realtime_temperature, box.virtual_target_temperature, box.supply_air_flow_upper_limit, box.supply_air_flow_lower_limit, box.supply_air_flow_set, box.valve_opening, box.supply_air_temperature ) cold += temp if temp < 0 else 0 total += abs(temp) try: cold_ratio = abs(cold / total) except ZeroDivisionError: cold_ratio = np.NAN logger.debug(f"cold ratio: {cold_ratio}") return cold_ratio def get_normal_ratio(self): normal = 0 for box in self.vav_boxes_list: if abs(box.virtual_realtime_temperature - box.virtual_target_temperature) <= 1: normal += 1 try: ratio = normal / len(self.vav_boxes_list) except ZeroDivisionError: ratio = np.NAN return ratio def build(self) -> float: try: if not self.is_off_to_on: normal_ratio = self.get_normal_ratio() if normal_ratio < 0.9: cold_ratio = self.get_cold_ratio() temperature = self.calculate_by_cold_vav(cold_ratio) else: temperature = self.current_set else: if self.season == Season.heating: temperature = 27.0 elif self.season == Season.cooling: temperature = 20.0 else: temperature = 25.0 if self.season == Season.heating: temperature = max(20.0, min(30.0, temperature)) else: temperature = max(18.0, min(25.0, temperature)) except TypeError: raise MissingIOTDataError return temperature class ACATAHSupplyAirTemperatureDefaultController: """ Determine supply air temperature when missing data. """ def __init__(self, is_clear_day: bool): super(ACATAHSupplyAirTemperatureDefaultController, self).__init__() self.is_clear_day = is_clear_day def build(self) -> float: now = get_time_str() now_time_str = arrow.get(now, TIME_FMT).time().strftime("%H%M%S") if "080000" <= now_time_str < "100000": is_morning = True else: is_morning = False if is_morning: temperature = 27.0 else: if self.is_clear_day: temperature = 23.0 else: temperature = 25.0 return temperature def build_acatah_supply_air_temperature_set(params: ACATAHSupplyAirTempSetRequest) -> float: try: vav_list = list() for raw_vav in params.vav_list: vav = VAVBox(**raw_vav.dict()) if not vav.supply_air_temperature: vav.supply_air_temperature = params.supply_air_temperature vav.virtual_target_temperature = raw_vav.virtual_temperature_target if vav.virtual_target_temperature and vav.virtual_realtime_temperature: vav_list.append(vav) if params.chill_water_valve_opening_set_list[-1] == 0.0: thermal_mode = ThermalMode.heating else: thermal_mode = ThermalMode.cooling is_off_to_on = False if params.equip_switch_set_list[-1] == 1.0: for item in params.equip_switch_set_list[::-1]: if item == 0.0: is_off_to_on = True break is_thermal_mode_switched = False if len(set([item for item in params.hot_water_valve_opening_set_list])) > 1: is_thermal_mode_switched = True if len(set([item for item in params.chill_water_valve_opening_set_list])) > 1: is_thermal_mode_switched = True controller = ACATAHSupplyAirTemperatureController( vav_list, params.supply_air_temperature_set, params.return_air_temperature, thermal_mode, is_off_to_on, is_thermal_mode_switched, Season(params.season), ) supply_air_temperature_set = controller.build() except (KeyError, IndexError): controller = ACATAHSupplyAirTemperatureDefaultController(params.is_clear_day) supply_air_temperature_set = controller.build() return supply_air_temperature_set