switch.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. # -*- coding: utf-8 -*-
  2. from app.api.errors.iot import MissingIOTDataError
  3. from app.controllers.equipment.switch import Switch, SwitchSet
  4. from app.models.domain.devices import ACATAHSwitchSetRequest
  5. from app.schemas.equipment import AHU
  6. class AHUSwitch(Switch):
  7. def __init__(self, equipment: AHU):
  8. super(AHUSwitch, self).__init__(equipment)
  9. def break_time_action(self, begin: str, end: str, is_workday: bool) -> SwitchSet:
  10. try:
  11. if self._equip.in_cloud_status:
  12. if is_workday:
  13. if begin and end:
  14. switch_flag = True
  15. if begin <= end:
  16. if begin <= self._now_time <= end:
  17. switch_flag = False
  18. else:
  19. if not end <= self._now_time <= begin:
  20. switch_flag = False
  21. if not switch_flag:
  22. action = SwitchSet.off
  23. else:
  24. action = SwitchSet.hold
  25. else:
  26. action = SwitchSet.hold
  27. else:
  28. action = SwitchSet.hold
  29. else:
  30. action = SwitchSet.hold
  31. except TypeError:
  32. raise MissingIOTDataError
  33. return action
  34. async def build_acatah_switch_set(params: ACATAHSwitchSetRequest) -> SwitchSet:
  35. ahu = AHU(
  36. running_status=params.running_status,
  37. in_cloud_status=params.in_cloud_status,
  38. on_time=params.on_time,
  39. off_time=params.off_time,
  40. )
  41. ahu_switch_controller = AHUSwitch(ahu)
  42. action = await ahu_switch_controller.build_next_action(params.is_workday)
  43. break_action = ahu_switch_controller.break_time_action(
  44. params.break_start_time, params.break_end_time, params.is_workday
  45. )
  46. if break_action == "off":
  47. action = SwitchSet.off
  48. return action