123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- # -*- coding: utf-8 -*-
- import numpy as np
- from loguru import logger
- from app.controllers.equipment.switch import Switch, SwitchSet
- from app.models.domain.devices import ACATFUSwitchSetRequest, ACATFUCO2SwitchSetRequest
- from app.schemas.equipment import PAU
- class PAUSwitch(Switch):
- def __init__(self, equipment: PAU):
- super(PAUSwitch, self).__init__(equipment)
- def break_time_action(self, begin: str, end: str, is_workday: bool) -> SwitchSet:
- if self._equip.in_cloud_status:
- if is_workday:
- if begin and end:
- switch_flag = True
- if begin <= end:
- if begin <= self._now_time <= end:
- switch_flag = False
- else:
- if not end <= self._now_time <= begin:
- switch_flag = False
- if not switch_flag:
- action = SwitchSet.off
- else:
- action = SwitchSet.hold
- else:
- action = SwitchSet.hold
- else:
- action = SwitchSet.hold
- else:
- action = SwitchSet.hold
- return action
- class PAUSwitchWithCO2(PAUSwitch):
- def __int__(self, equipment: PAU):
- super(PAUSwitchWithCO2, self).__init__(equipment)
- @staticmethod
- def co2_logic(co2_list: list[float]) -> SwitchSet:
- co2_list = np.array(co2_list)
- action = SwitchSet.hold
- if co2_list.size > 0:
- if co2_list.mean() >= 800.0 or co2_list.max(initial=0.0) > 1000.0:
- action = SwitchSet.on
- if co2_list.mean() <= 650.0 and co2_list.max(initial=0.0) < 1000.0:
- action = SwitchSet.off
- return action
- async def run(self, is_workday: bool, break_start_time: str, break_end_time: str,
- co2_list: list[float]) -> SwitchSet:
- action = await self.build_next_action(is_workday)
- break_action = self.break_time_action(break_start_time, break_end_time, is_workday)
- if break_action == SwitchSet.off:
- action = SwitchSet.off
- co2_action = self.co2_logic(co2_list)
- if co2_action == SwitchSet.off:
- action = SwitchSet.off
- elif co2_action == SwitchSet.on:
- if action == SwitchSet.off:
- action = SwitchSet.off
- if action == SwitchSet.hold and not self._equip:
- action = SwitchSet.hold
- return action
- @logger.catch()
- async def build_acatfu_switch_set(params: ACATFUSwitchSetRequest) -> SwitchSet:
- pau = PAU(
- running_status=params.running_status,
- in_cloud_status=params.in_cloud_status,
- on_time=params.on_time,
- off_time=params.off_time,
- )
- pau_switch_controller = PAUSwitch(pau)
- action = await pau_switch_controller.build_next_action(params.is_workday)
- break_action = pau_switch_controller.break_time_action(
- params.break_start_time, params.break_end_time, params.is_workday
- )
- if break_action == "off":
- action = SwitchSet.off
- return action
- @logger.catch
- async def build_co2_acatfu_switch(params: ACATFUCO2SwitchSetRequest) -> SwitchSet:
- pau = PAU(**params.dict())
- controller = PAUSwitchWithCO2(pau)
- action = await controller.run(params.is_workday, params.break_start_time, params.break_end_time, params.co2_list)
- return action
|