123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import arrow
- import numpy as np
- from loguru import logger
- from app.api.errors.iot import MissingIOTDataError
- from app.controllers.equipment.ahu.thermal_mode import count_vav_box_weight
- from app.models.domain.devices import ThermalMode, ACATAHSupplyAirTempSetRequest
- from app.schemas.equipment import VAVBox
- from app.services.transfer import Season
- from app.utils.date import get_time_str, TIME_FMT
- class ACATAHSupplyAirTemperatureController:
- """
- Supply air temperature setting logic version 2 by WuXu.
- """
- def __init__(
- self,
- vav_boxes_list: list[VAVBox],
- current_set: float,
- return_air: float,
- thermal_mode: ThermalMode,
- is_off_to_on: bool,
- is_thermal_mode_switched: bool,
- season: Season,
- ):
- super(ACATAHSupplyAirTemperatureController, self).__init__()
- self.vav_boxes_list = vav_boxes_list
- self.current_set = current_set
- self.return_air = return_air
- self.thermal_mode = thermal_mode
- self.is_off_to_on = is_off_to_on
- self.is_thermal_mode_switched = is_thermal_mode_switched
- self.season = season
- def calculate_by_cold_vav(self, cold_ratio: float) -> float:
- if self.thermal_mode == ThermalMode.cooling:
- if cold_ratio < 0.3:
- new = self.current_set - 1.0
- elif cold_ratio < 0.45:
- new = self.current_set - 0.5
- elif cold_ratio <= 0.55:
- new = self.current_set
- elif cold_ratio <= 0.7:
- new = self.current_set + 1.0
- elif cold_ratio <= 1.0:
- new = self.current_set + 1.5
- else:
- new = self.current_set
- elif self.thermal_mode == ThermalMode.heating:
- if cold_ratio < 0.3:
- new = self.return_air
- elif cold_ratio < 0.45:
- new = self.current_set - 1.0
- elif cold_ratio <= 0.55:
- new = self.current_set
- elif cold_ratio <= 0.7:
- new = self.current_set + 0.5
- elif cold_ratio <= 1.0:
- new = self.current_set + 1.0
- else:
- new = self.current_set
- else:
- new = self.current_set
- return new
- def get_cold_ratio(self):
- cold, total = 0, 0
- for box in self.vav_boxes_list:
- temp = count_vav_box_weight(
- box.virtual_realtime_temperature,
- box.virtual_target_temperature,
- box.supply_air_flow_upper_limit,
- box.supply_air_flow_lower_limit,
- box.supply_air_flow_set,
- box.valve_opening,
- box.supply_air_temperature
- )
- cold += temp if temp < 0 else 0
- total += abs(temp)
- try:
- cold_ratio = abs(cold / total)
- except ZeroDivisionError:
- cold_ratio = np.NAN
- logger.debug(f"cold ratio: {cold_ratio}")
- return cold_ratio
- def get_normal_ratio(self):
- normal = 0
- for box in self.vav_boxes_list:
- if abs(box.virtual_realtime_temperature - box.virtual_target_temperature) <= 1:
- normal += 1
- try:
- ratio = normal / len(self.vav_boxes_list)
- except ZeroDivisionError:
- ratio = np.NAN
- return ratio
- def build(self) -> float:
- try:
- if not self.is_off_to_on:
- normal_ratio = self.get_normal_ratio()
- if normal_ratio < 0.9:
- cold_ratio = self.get_cold_ratio()
- temperature = self.calculate_by_cold_vav(cold_ratio)
- else:
- temperature = self.current_set
- else:
- if self.season == Season.heating:
- temperature = 27.0
- elif self.season == Season.cooling:
- temperature = 20.0
- else:
- temperature = 25.0
- if self.season == Season.heating:
- temperature = max(20.0, min(30.0, temperature))
- else:
- temperature = max(18.0, min(25.0, temperature))
- except TypeError:
- raise MissingIOTDataError
- return temperature
- class ACATAHSupplyAirTemperatureDefaultController:
- """
- Determine supply air temperature when missing data.
- """
- def __init__(self, is_clear_day: bool):
- super(ACATAHSupplyAirTemperatureDefaultController, self).__init__()
- self.is_clear_day = is_clear_day
- def build(self) -> float:
- now = get_time_str()
- now_time_str = arrow.get(now, TIME_FMT).time().strftime("%H%M%S")
- if "080000" <= now_time_str < "100000":
- is_morning = True
- else:
- is_morning = False
- if is_morning:
- temperature = 27.0
- else:
- if self.is_clear_day:
- temperature = 23.0
- else:
- temperature = 25.0
- return temperature
- def build_acatah_supply_air_temperature_set(params: ACATAHSupplyAirTempSetRequest) -> float:
- try:
- vav_list = list()
- for raw_vav in params.vav_list:
- vav = VAVBox(**raw_vav.dict())
- if not vav.supply_air_temperature:
- vav.supply_air_temperature = params.supply_air_temperature
- vav.virtual_target_temperature = raw_vav.virtual_temperature_target
- if vav.virtual_target_temperature and vav.virtual_realtime_temperature:
- vav_list.append(vav)
- if params.chill_water_valve_opening_set_list[-1] == 0.0:
- thermal_mode = ThermalMode.heating
- else:
- thermal_mode = ThermalMode.cooling
- is_off_to_on = False
- if params.equip_switch_set_list[-1] == 1.0:
- for item in params.equip_switch_set_list[::-1]:
- if item == 0.0:
- is_off_to_on = True
- break
- is_thermal_mode_switched = False
- if len(set([item for item in params.hot_water_valve_opening_set_list])) > 1:
- is_thermal_mode_switched = True
- if len(set([item for item in params.chill_water_valve_opening_set_list])) > 1:
- is_thermal_mode_switched = True
- controller = ACATAHSupplyAirTemperatureController(
- vav_list,
- params.supply_air_temperature_set,
- params.return_air_temperature,
- thermal_mode,
- is_off_to_on,
- is_thermal_mode_switched,
- Season(params.season),
- )
- supply_air_temperature_set = controller.build()
- except (KeyError, IndexError):
- controller = ACATAHSupplyAirTemperatureDefaultController(params.is_clear_day)
- supply_air_temperature_set = controller.build()
- return supply_air_temperature_set
|