123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- from datetime import datetime
- import arrow
- import numpy as np
- from sqlalchemy.orm import Session
- from app.api.errors.iot import MissingIOTDataError
- from app.controllers.equipment.controller import EquipmentController
- from app.crud.device.device import device
- from app.crud.device.status_timestamp import blowy_feedback_time, high_speed_time
- from app.models.domain.devices import ACATVIInstructionsRequest
- from app.models.domain.feedback import FeedbackValue
- from app.schemas.device.device import DeviceCreate
- from app.schemas.device.status_timestamp import BlowyFeedbackTimeCreate, HighSpeedTimeCreate
- from app.schemas.equipment import VRF, VRFMode
- from app.utils.math import round_half_up
- class VRFController(EquipmentController):
- def __init__(
- self,
- vrf: VRF,
- target: float,
- realtime: float,
- feedback: FeedbackValue,
- on_time: str,
- off_time: str,
- ):
- super(VRFController, self).__init__()
- self.device = vrf
- self.target = target
- self.realtime = realtime
- self.feedback = feedback
- self.on_time = on_time
- self.off_time = off_time
- def get_switch_set(self) -> str:
- if self.feedback.value == FeedbackValue.switch_off:
- if self.device.running_status:
- switch_set = "off"
- else:
- switch_set = "hold"
- elif self.feedback.value == FeedbackValue.switch_on:
- if not self.device.running_status:
- switch_set = "on"
- else:
- switch_set = "hold"
- else:
- utc = arrow.utcnow()
- now = utc.to("Asia/Shanghai")
- ago = now.shift(seconds=-150).format("HHmmSS")
- later = now.shift(seconds=150).format("HHmmSS")
- if self.on_time and ago <= self.on_time <= later:
- if not self.device.running_status:
- switch_set = "on"
- else:
- switch_set = "hold"
- elif self.off_time and ago <= self.off_time <= later:
- if self.device.running_status:
- switch_set = "off"
- else:
- switch_set = "hold"
- else:
- switch_set = "hold"
- self.device.equip_switch_set = switch_set
- return switch_set
- def get_temperature_set(self) -> float:
- if self.device.work_mode == VRFMode.ventilation:
- new_temperature_set = np.NAN
- elif self.device.work_mode == VRFMode.cooling:
- new_temperature_set = np.NAN
- if self.target is None:
- return new_temperature_set
- # Default temperature set.
- if not self.device.running_status:
- new_temperature_set = 26.0
- # feedback
- if self.feedback.value != "null" and self.device.current_temperature_set:
- if self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold:
- if self.feedback == FeedbackValue.so_cold:
- if self.device.speed == "LL":
- new_temperature_set = self.device.current_temperature_set + 2.0
- else:
- new_temperature_set = self.device.current_temperature_set + 1.0
- else:
- new_temperature_set = self.device.current_temperature_set + 1.0
- elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot:
- if self.feedback == FeedbackValue.so_hot and self.device.speed == "HH":
- if self.device.speed == "HH":
- new_temperature_set = self.device.current_temperature_set - 2.0
- else:
- new_temperature_set = self.device.current_temperature_set - 1.0
- else:
- new_temperature_set = self.device.current_temperature_set - 1.0
- if not np.isnan(new_temperature_set):
- new_temperature_set = max(24.0, min(28.0, new_temperature_set))
- elif self.device.work_mode == VRFMode.heating:
- new_temperature_set = np.NAN
- if self.target is None:
- return new_temperature_set
- if not self.device.running_status:
- new_temperature_set = 24.0
- if self.feedback.value != "null" and self.device.current_temperature_set:
- if self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold:
- if self.feedback == FeedbackValue.so_cold:
- if self.device.speed == "M":
- new_temperature_set = self.device.current_temperature_set + 1.0
- else:
- new_temperature_set = self.device.current_temperature_set + 2.0
- else:
- new_temperature_set = self.device.current_temperature_set + 1.0
- elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot:
- if self.feedback == FeedbackValue.so_hot:
- if self.device.speed == "M":
- new_temperature_set = self.device.current_temperature_set - 1.0
- else:
- new_temperature_set = self.device.current_temperature_set - 2.0
- else:
- new_temperature_set = self.device.current_temperature_set - 1.0
- if not np.isnan(new_temperature_set):
- new_temperature_set = max(18.0, min(26.0, new_temperature_set))
- else:
- new_temperature_set = np.NAN
- new_temperature_set = round_half_up(new_temperature_set)
- self.device.temperature_set = new_temperature_set
- return new_temperature_set
- def get_speed_set(self) -> str:
- if self.device.work_mode == VRFMode.ventilation:
- if self.target is None:
- new_speed = "hold"
- else:
- if self.device.running_status:
- new_speed = "hold"
- if (
- self.feedback == FeedbackValue.noisy_or_blowy
- or self.feedback == FeedbackValue.a_little_cold
- or self.feedback == FeedbackValue.so_cold
- ):
- if self.device.speed == "HH":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "LL"
- else:
- new_speed = "hold"
- elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot:
- if self.device.speed == "LL":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "HH"
- else:
- new_speed = "hold"
- else:
- new_speed = "M"
- elif self.device.work_mode == VRFMode.cooling:
- new_speed = "hold"
- if self.target is None:
- return new_speed
- # Default speed set:
- if not self.device.running_status:
- new_speed = "M"
- # Lower limit.
- if self.realtime <= 22.0:
- new_speed = "LL"
- # Feedback.
- if self.feedback == FeedbackValue.so_cold:
- if self.device.return_air_temp and self.device.current_temperature_set:
- if self.device.return_air_temp > self.device.current_temperature_set:
- if self.device.speed == "HH":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "LL"
- elif self.feedback == FeedbackValue.so_hot:
- if self.device.speed == "LL":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "HH"
- elif self.feedback == FeedbackValue.noisy_or_blowy:
- if self.device.speed == "HH":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "LL"
- else:
- new_speed = "LL"
- elif self.device.work_mode == VRFMode.heating:
- new_speed = "hold"
- if self.target is None:
- return new_speed
- # Default speed set:
- if not self.device.running_status:
- new_speed = "M"
- # Lower limit.
- if self.realtime >= 28.0:
- new_speed = "LL"
- # Feedback.
- if self.feedback == FeedbackValue.so_hot:
- if self.device.return_air_temp and self.device.current_temperature_set:
- if self.device.return_air_temp < self.device.current_temperature_set:
- if self.device.speed == "HH":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "LL"
- elif self.feedback == FeedbackValue.so_cold:
- if self.device.speed == "LL":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "HH"
- elif self.feedback == FeedbackValue.noisy_or_blowy:
- if self.device.speed == "HH":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "LL"
- else:
- new_speed = "LL"
- else:
- new_speed = "hold"
- self.device.speed_set = new_speed
- return new_speed
- def ventilation_mode(self) -> str:
- new_speed = "hold"
- if self.target is None:
- return new_speed
- else:
- if not self.device.running_status:
- new_speed = "HH"
- else:
- if (
- self.feedback == FeedbackValue.a_little_cold
- or self.feedback == FeedbackValue.so_cold
- or self.feedback == FeedbackValue.noisy_or_blowy
- ):
- if self.device.speed == "HH":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "LL"
- if self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot:
- if self.device.speed == "LL":
- new_speed = "M"
- elif self.device.speed == "M":
- new_speed = "HH"
- self.device.speed_set = new_speed
- return new_speed
- async def run(self):
- try:
- self.get_switch_set()
- self.get_speed_set()
- self.get_temperature_set()
- except TypeError:
- raise MissingIOTDataError
- def get_results(self):
- return self.device
- class VRFControllerV2(VRFController):
- """
- For Zhijiang.
- """
- def __init__(
- self,
- vrf: VRF,
- target: float,
- realtime: float,
- feedback: FeedbackValue,
- on_time: str,
- off_time: str,
- ):
- super().__init__(vrf, target, realtime, feedback, on_time, off_time)
- def get_temperature_set(self) -> float:
- if self.device.work_mode == VRFMode.ventilation:
- new_temperature_set = np.NAN
- elif self.device.work_mode == VRFMode.cooling or self.device.work_mode == VRFMode.heating:
- new_temperature_set = np.NAN
- if self.target is None:
- return new_temperature_set
- # Default temperature set.
- if not self.device.running_status:
- if self.device.work_mode == VRFMode.cooling:
- new_temperature_set = 26.0
- else:
- new_temperature_set = 20.0
- # feedback
- if self.feedback.value != "null" and self.device.current_temperature_set:
- if self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold:
- new_temperature_set = self.device.current_temperature_set + 1.0
- elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot:
- new_temperature_set = self.device.current_temperature_set - 1.0
- if not np.isnan(new_temperature_set):
- new_temperature_set = max(16.0, min(32.0, new_temperature_set))
- else:
- new_temperature_set = np.NAN
- new_temperature_set = round_half_up(new_temperature_set)
- self.device.temperature_set = new_temperature_set
- return new_temperature_set
- async def query_status_time(db: Session, device_id: str) -> tuple[datetime, datetime]:
- feedback_time_in_db = blowy_feedback_time.get_time_by_device(db, device_id)
- if feedback_time_in_db:
- feedback_time = feedback_time_in_db.timestamp
- else:
- past = arrow.utcnow().shift(hours=-24)
- feedback_time = past.naive
- if not device.get(db, device_id):
- device.create(db=db, obj_in=DeviceCreate(id=device_id))
- blowy_feedback_time.create(
- db=db,
- obj_in=BlowyFeedbackTimeCreate(
- timestamp=feedback_time, device_id=device_id
- ),
- )
- high_speed_time_in_db = high_speed_time.get_time_by_device(db, device_id)
- if high_speed_time_in_db:
- high_time = high_speed_time_in_db.timestamp
- else:
- past = arrow.utcnow().shift(hours=-24)
- high_time = past.naive
- if not device.get(db, device_id):
- device.create(db=db, obj_in=DeviceCreate(id=device_id))
- high_speed_time.create(db=db, obj_in=HighSpeedTimeCreate(timestamp=high_time, device_id=device_id))
- return feedback_time, high_time
- async def build_acatvi_instructions(params: ACATVIInstructionsRequest) -> dict[str, str | float]:
- vrf = VRF(
- return_air_temp=params.return_air_temperature,
- current_temperature_set=params.current_temperature_set,
- speed=params.current_speed,
- running_status=params.running_status,
- work_mode=params.work_mode,
- )
- controller = VRFController(vrf, params.space_temperature_target, params.space_realtime_temperature, params.feedback,
- params.on_time, params.off_time)
- await controller.run()
- regulated_vrf = controller.get_results()
- instructions = dict()
- instructions.update({"switch_set": regulated_vrf.equip_switch_set})
- instructions.update({"speed_set": regulated_vrf.speed_set})
- if regulated_vrf.temperature_set and not np.isnan(regulated_vrf.temperature_set):
- instructions.update({"temperature_set": regulated_vrf.temperature_set})
- return instructions
- async def build_acatvi_instructions_v2(params: ACATVIInstructionsRequest) -> dict[str, str | float]:
- # For Zhijiang.
- vrf = VRF(
- return_air_temp=params.return_air_temperature,
- current_temperature_set=params.current_temperature_set,
- speed=params.current_speed,
- running_status=params.running_status,
- work_mode=params.work_mode,
- )
- controller = VRFControllerV2(vrf, params.space_temperature_target, params.space_realtime_temperature,
- params.feedback,
- params.on_time, params.off_time)
- await controller.run()
- regulated_vrf = controller.get_results()
- instructions = dict()
- instructions.update({"switch_set": regulated_vrf.equip_switch_set})
- instructions.update({"speed_set": regulated_vrf.speed_set})
- if regulated_vrf.temperature_set and not np.isnan(regulated_vrf.temperature_set):
- instructions.update({"temperature_set": regulated_vrf.temperature_set})
- return instructions
|