# -*- coding: utf-8 -*- import numpy as np from loguru import logger from app.controllers.equipment.switch import Switch, SwitchSet from app.models.domain.devices import ACATFUSwitchSetRequest, ACATFUCO2SwitchSetRequest from app.schemas.equipment import PAU class PAUSwitch(Switch): def __init__(self, equipment: PAU): super(PAUSwitch, self).__init__(equipment) def break_time_action(self, begin: str, end: str, is_workday: bool) -> SwitchSet: if self._equip.in_cloud_status: if is_workday: if begin and end: switch_flag = True if begin <= end: if begin <= self._now_time <= end: switch_flag = False else: if not end <= self._now_time <= begin: switch_flag = False if not switch_flag: action = SwitchSet.off else: action = SwitchSet.hold else: action = SwitchSet.hold else: action = SwitchSet.hold else: action = SwitchSet.hold return action class PAUSwitchWithCO2(PAUSwitch): def __int__(self, equipment: PAU): super(PAUSwitchWithCO2, self).__init__(equipment) @staticmethod def co2_logic(co2_list: list[float]) -> SwitchSet: co2_list = np.array(co2_list) action = SwitchSet.hold if co2_list.size > 0: if co2_list.mean() >= 800.0 or co2_list.max(initial=0.0) > 1000.0: action = SwitchSet.on if co2_list.mean() <= 650.0 and co2_list.max(initial=0.0) < 1000.0: action = SwitchSet.off return action async def run(self, is_workday: bool, break_start_time: str, break_end_time: str, co2_list: list[float]) -> SwitchSet: action = await self.build_next_action(is_workday) break_action = self.break_time_action(break_start_time, break_end_time, is_workday) if break_action == SwitchSet.off: action = SwitchSet.off co2_action = self.co2_logic(co2_list) if co2_action == SwitchSet.off: action = SwitchSet.off elif co2_action == SwitchSet.on: if action == SwitchSet.off: action = SwitchSet.off if action == SwitchSet.hold and not self._equip: action = SwitchSet.hold return action @logger.catch() async def build_acatfu_switch_set(params: ACATFUSwitchSetRequest) -> SwitchSet: pau = PAU( running_status=params.running_status, in_cloud_status=params.in_cloud_status, on_time=params.on_time, off_time=params.off_time, ) pau_switch_controller = PAUSwitch(pau) action = await pau_switch_controller.build_next_action(params.is_workday) break_action = pau_switch_controller.break_time_action( params.break_start_time, params.break_end_time, params.is_workday ) if break_action == "off": action = SwitchSet.off return action @logger.catch async def build_co2_acatfu_switch(params: ACATFUCO2SwitchSetRequest) -> SwitchSet: pau = PAU(**params.dict()) controller = PAUSwitchWithCO2(pau) action = await controller.run(params.is_workday, params.break_start_time, params.break_end_time, params.co2_list) return action