123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- from operator import attrgetter
- import numpy as np
- from app.api.errors.iot import MissingIOTDataError
- from app.controllers.equipment.ashp.basic import ASHPGroupController
- from app.models.domain.devices import FeedWaterPumpRequest
- from app.schemas.equipment import FeedWaterPump, ASHP
- from app.schemas.season import Season
- class FeedWaterPumpController:
- def __init__(self, pump_list: list[FeedWaterPump]):
- self._pump_list = pump_list
- def get_switch_set(self, ashp_list: list[ASHP]):
- ashp_on_count = 0
- ashp_off_count = 0
- for item in ashp_list:
- if item.equip_switch_set:
- ashp_on_count += 1
- if not item.running_status:
- ashp_off_count += 1
- ashp_num = len(ashp_list)
- pump_num = len(self._pump_list)
- if ashp_off_count == ashp_num:
- self.turn_off_all()
- else:
- if pump_num == 2:
- if ashp_on_count < ashp_num / 2:
- pump_need_num = 1
- else:
- pump_need_num = 2
- elif pump_num == 3:
- if ashp_on_count <= ashp_num // 3:
- pump_need_num = 1
- elif ashp_on_count > 2 * ashp_num // 3:
- pump_need_num = 3
- else:
- pump_need_num = 2
- else:
- pump_need_num = 1
- self._pump_list = sorted(self._pump_list, key=attrgetter("acc_run_time"))
- for device in self._pump_list:
- if not device.running_status:
- device.equip_switch_set = True
- pump_need_num -= 1
- if pump_need_num == 0:
- break
- def turn_off_all(self):
- for device in self._pump_list:
- device.equip_switch_set = False
- def is_init(self):
- on_flag = False
- for device in self._pump_list:
- if device.running_status:
- on_flag = True
- init_flag = False
- if not on_flag:
- for device in self._pump_list:
- if device.equip_switch_set:
- init_flag = True
- return init_flag
- def get_freq_set(self, ashp_list: list[ASHP], season: Season, init: float = 30.0):
- if self.is_init():
- self.set_freq(0, init)
- else:
- in_temp_avg = np.mean([device.in_temp for device in ashp_list])
- out_temp_avg = np.mean([device.out_temp for device in ashp_list])
- if season == Season.cooling:
- temp_diff = in_temp_avg - out_temp_avg
- else:
- temp_diff = out_temp_avg - in_temp_avg
- if temp_diff > 6.0:
- diff = 5.0
- elif temp_diff < 4.0:
- diff = -5.0
- else:
- diff = 0.0
- self.set_freq(diff, 0)
- def set_freq(self, diff: float, base: float):
- for device in self._pump_list:
- if device.equip_switch_set:
- if base == 0:
- device.work_freq_set = device.work_freq_set + diff
- else:
- device.work_freq_set = base + diff
- def get_result(self):
- return self._pump_list
- class FeedWaterPumpBRController(FeedWaterPumpController):
- def __init__(self, pump_list: list[FeedWaterPump]):
- super().__init__(pump_list)
- def get_switch_set(self, ashp_list: list[ASHP]):
- ashp_on_count = 0
- ashp_off_count = 0
- for item in ashp_list:
- if item.equip_switch_set:
- ashp_on_count += 1
- if not item.running_status:
- ashp_off_count += 1
- ashp_num = len(ashp_list)
- pump_num = len(self._pump_list)
- if ashp_off_count == ashp_num:
- self.turn_off_all()
- else:
- if pump_num == 2:
- pump_need_num = 2
- elif pump_num == 3:
- if ashp_on_count <= ashp_num // 3:
- pump_need_num = 2
- elif ashp_on_count > 2 * ashp_num // 3:
- pump_need_num = 3
- else:
- pump_need_num = 2
- else:
- pump_need_num = 2
- self._pump_list = sorted(self._pump_list, key=attrgetter("acc_run_time"))
- for device in self._pump_list:
- if not device.running_status:
- device.equip_switch_set = True
- pump_need_num -= 1
- if pump_need_num == 0:
- break
- async def build_feed_water_pump_instructions(params: FeedWaterPumpRequest):
- ashp_controller = ASHPGroupController(params.ashp_list, params.outdoor_temp, params.season)
- try:
- ashp_controller.run()
- except (TypeError, IndexError):
- raise MissingIOTDataError
- ashp_list, _, _ = ashp_controller.get_results()
- pump_controller = FeedWaterPumpController(params.pump_list)
- pump_controller.get_freq_set(params.ashp_list, params.season)
- pump_controller.get_switch_set(params.ashp_list)
- return pump_controller.get_result()
- async def build_feed_water_pump_br_instructions(params: FeedWaterPumpRequest):
- ashp_controller = ASHPGroupController(params.ashp_list, params.outdoor_temp, params.season)
- try:
- ashp_controller.run()
- except (TypeError, IndexError):
- raise MissingIOTDataError
- ashp_list, _, _ = ashp_controller.get_results()
- pump_controller = FeedWaterPumpController(params.pump_list)
- pump_controller.get_freq_set(params.ashp_list, params.season, 40.0)
- pump_controller.get_switch_set(params.ashp_list)
- return pump_controller.get_result()
|