from datetime import datetime import arrow import numpy as np from sqlalchemy.orm import Session from app.api.errors.iot import MissingIOTDataError from app.controllers.equipment.controller import EquipmentController from app.crud.device.device import device from app.crud.device.status_timestamp import blowy_feedback_time, high_speed_time from app.models.domain.devices import ACATVIInstructionsRequest from app.models.domain.feedback import FeedbackValue from app.schemas.device.device import DeviceCreate from app.schemas.device.status_timestamp import BlowyFeedbackTimeCreate, HighSpeedTimeCreate from app.schemas.equipment import VRF, VRFMode from app.utils.math import round_half_up class VRFController(EquipmentController): def __init__( self, vrf: VRF, target: float, realtime: float, feedback: FeedbackValue, on_time: str, off_time: str, ): super(VRFController, self).__init__() self.device = vrf self.target = target self.realtime = realtime self.feedback = feedback self.on_time = on_time self.off_time = off_time def get_switch_set(self) -> str: if self.feedback.value == FeedbackValue.switch_off: if self.device.running_status: switch_set = "off" else: switch_set = "hold" elif self.feedback.value == FeedbackValue.switch_on: if not self.device.running_status: switch_set = "on" else: switch_set = "hold" else: utc = arrow.utcnow() now = utc.to("Asia/Shanghai") ago = now.shift(seconds=-150).format("HHmmSS") later = now.shift(seconds=150).format("HHmmSS") if self.on_time and ago <= self.on_time <= later: if not self.device.running_status: switch_set = "on" else: switch_set = "hold" elif self.off_time and ago <= self.off_time <= later: if self.device.running_status: switch_set = "off" else: switch_set = "hold" else: switch_set = "hold" self.device.equip_switch_set = switch_set return switch_set def get_temperature_set(self) -> float: if self.device.work_mode == VRFMode.ventilation: new_temperature_set = np.NAN elif self.device.work_mode == VRFMode.cooling: new_temperature_set = np.NAN if self.target is None: return new_temperature_set # Default temperature set. if not self.device.running_status: new_temperature_set = 26.0 # feedback if self.feedback.value != "null" and self.device.current_temperature_set: if self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold: if self.feedback == FeedbackValue.so_cold: if self.device.speed == "LL": new_temperature_set = self.device.current_temperature_set + 2.0 else: new_temperature_set = self.device.current_temperature_set + 1.0 else: new_temperature_set = self.device.current_temperature_set + 1.0 elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot: if self.feedback == FeedbackValue.so_hot and self.device.speed == "HH": if self.device.speed == "HH": new_temperature_set = self.device.current_temperature_set - 2.0 else: new_temperature_set = self.device.current_temperature_set - 1.0 else: new_temperature_set = self.device.current_temperature_set - 1.0 if not np.isnan(new_temperature_set): new_temperature_set = max(24.0, min(28.0, new_temperature_set)) elif self.device.work_mode == VRFMode.heating: new_temperature_set = np.NAN if self.target is None: return new_temperature_set if not self.device.running_status: new_temperature_set = 24.0 if self.feedback.value != "null" and self.device.current_temperature_set: if self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold: if self.feedback == FeedbackValue.so_cold: if self.device.speed == "M": new_temperature_set = self.device.current_temperature_set + 1.0 else: new_temperature_set = self.device.current_temperature_set + 2.0 else: new_temperature_set = self.device.current_temperature_set + 1.0 elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot: if self.feedback == FeedbackValue.so_hot: if self.device.speed == "M": new_temperature_set = self.device.current_temperature_set - 1.0 else: new_temperature_set = self.device.current_temperature_set - 2.0 else: new_temperature_set = self.device.current_temperature_set - 1.0 if not np.isnan(new_temperature_set): new_temperature_set = max(18.0, min(26.0, new_temperature_set)) else: new_temperature_set = np.NAN new_temperature_set = round_half_up(new_temperature_set) self.device.temperature_set = new_temperature_set return new_temperature_set def get_speed_set(self) -> str: if self.device.work_mode == VRFMode.ventilation: if self.target is None: new_speed = "hold" else: if self.device.running_status: new_speed = "hold" if ( self.feedback == FeedbackValue.noisy_or_blowy or self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold ): if self.device.speed == "HH": new_speed = "M" elif self.device.speed == "M": new_speed = "LL" else: new_speed = "hold" elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot: if self.device.speed == "LL": new_speed = "M" elif self.device.speed == "M": new_speed = "HH" else: new_speed = "hold" else: new_speed = "M" elif self.device.work_mode == VRFMode.cooling: new_speed = "hold" if self.target is None: return new_speed # Default speed set: if not self.device.running_status: new_speed = "M" # Lower limit. if self.realtime <= 22.0: new_speed = "LL" # Feedback. if self.feedback == FeedbackValue.so_cold: if self.device.return_air_temp and self.device.current_temperature_set: if self.device.return_air_temp > self.device.current_temperature_set: if self.device.speed == "HH": new_speed = "M" elif self.device.speed == "M": new_speed = "LL" elif self.feedback == FeedbackValue.so_hot: if self.device.speed == "LL": new_speed = "M" elif self.device.speed == "M": new_speed = "HH" elif self.feedback == FeedbackValue.noisy_or_blowy: if self.device.speed == "HH": new_speed = "M" elif self.device.speed == "M": new_speed = "LL" else: new_speed = "LL" elif self.device.work_mode == VRFMode.heating: new_speed = "hold" if self.target is None: return new_speed # Default speed set: if not self.device.running_status: new_speed = "M" # Lower limit. if self.realtime >= 28.0: new_speed = "LL" # Feedback. if self.feedback == FeedbackValue.so_hot: if self.device.return_air_temp and self.device.current_temperature_set: if self.device.return_air_temp < self.device.current_temperature_set: if self.device.speed == "HH": new_speed = "M" elif self.device.speed == "M": new_speed = "LL" elif self.feedback == FeedbackValue.so_cold: if self.device.speed == "LL": new_speed = "M" elif self.device.speed == "M": new_speed = "HH" elif self.feedback == FeedbackValue.noisy_or_blowy: if self.device.speed == "HH": new_speed = "M" elif self.device.speed == "M": new_speed = "LL" else: new_speed = "LL" else: new_speed = "hold" self.device.speed_set = new_speed return new_speed def ventilation_mode(self) -> str: new_speed = "hold" if self.target is None: return new_speed else: if not self.device.running_status: new_speed = "HH" else: if ( self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold or self.feedback == FeedbackValue.noisy_or_blowy ): if self.device.speed == "HH": new_speed = "M" elif self.device.speed == "M": new_speed = "LL" if self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot: if self.device.speed == "LL": new_speed = "M" elif self.device.speed == "M": new_speed = "HH" self.device.speed_set = new_speed return new_speed async def run(self): try: self.get_switch_set() self.get_speed_set() self.get_temperature_set() except TypeError: raise MissingIOTDataError def get_results(self): return self.device class VRFControllerV2(VRFController): """ For Zhijiang. """ def __init__( self, vrf: VRF, target: float, realtime: float, feedback: FeedbackValue, on_time: str, off_time: str, ): super().__init__(vrf, target, realtime, feedback, on_time, off_time) def get_temperature_set(self) -> float: if self.device.work_mode == VRFMode.ventilation: new_temperature_set = np.NAN elif self.device.work_mode == VRFMode.cooling or self.device.work_mode == VRFMode.heating: new_temperature_set = np.NAN if self.target is None: return new_temperature_set # Default temperature set. if not self.device.running_status: if self.device.work_mode == VRFMode.cooling: new_temperature_set = 26.0 else: new_temperature_set = 20.0 # feedback if self.feedback.value != "null" and self.device.current_temperature_set: if self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold: new_temperature_set = self.device.current_temperature_set + 1.0 elif self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot: new_temperature_set = self.device.current_temperature_set - 1.0 if not np.isnan(new_temperature_set): new_temperature_set = max(16.0, min(32.0, new_temperature_set)) else: new_temperature_set = np.NAN new_temperature_set = round_half_up(new_temperature_set) self.device.temperature_set = new_temperature_set return new_temperature_set async def query_status_time(db: Session, device_id: str) -> tuple[datetime, datetime]: feedback_time_in_db = blowy_feedback_time.get_time_by_device(db, device_id) if feedback_time_in_db: feedback_time = feedback_time_in_db.timestamp else: past = arrow.utcnow().shift(hours=-24) feedback_time = past.naive if not device.get(db, device_id): device.create(db=db, obj_in=DeviceCreate(id=device_id)) blowy_feedback_time.create( db=db, obj_in=BlowyFeedbackTimeCreate( timestamp=feedback_time, device_id=device_id ), ) high_speed_time_in_db = high_speed_time.get_time_by_device(db, device_id) if high_speed_time_in_db: high_time = high_speed_time_in_db.timestamp else: past = arrow.utcnow().shift(hours=-24) high_time = past.naive if not device.get(db, device_id): device.create(db=db, obj_in=DeviceCreate(id=device_id)) high_speed_time.create(db=db, obj_in=HighSpeedTimeCreate(timestamp=high_time, device_id=device_id)) return feedback_time, high_time async def build_acatvi_instructions(params: ACATVIInstructionsRequest) -> dict[str, str | float]: vrf = VRF( return_air_temp=params.return_air_temperature, current_temperature_set=params.current_temperature_set, speed=params.current_speed, running_status=params.running_status, work_mode=params.work_mode, ) controller = VRFController(vrf, params.space_temperature_target, params.space_realtime_temperature, params.feedback, params.on_time, params.off_time) await controller.run() regulated_vrf = controller.get_results() instructions = dict() instructions.update({"switch_set": regulated_vrf.equip_switch_set}) instructions.update({"speed_set": regulated_vrf.speed_set}) if regulated_vrf.temperature_set and not np.isnan(regulated_vrf.temperature_set): instructions.update({"temperature_set": regulated_vrf.temperature_set}) return instructions async def build_acatvi_instructions_v2(params: ACATVIInstructionsRequest) -> dict[str, str | float]: # For Zhijiang. vrf = VRF( return_air_temp=params.return_air_temperature, current_temperature_set=params.current_temperature_set, speed=params.current_speed, running_status=params.running_status, work_mode=params.work_mode, ) controller = VRFControllerV2(vrf, params.space_temperature_target, params.space_realtime_temperature, params.feedback, params.on_time, params.off_time) await controller.run() regulated_vrf = controller.get_results() instructions = dict() instructions.update({"switch_set": regulated_vrf.equip_switch_set}) instructions.update({"speed_set": regulated_vrf.speed_set}) if regulated_vrf.temperature_set and not np.isnan(regulated_vrf.temperature_set): instructions.update({"temperature_set": regulated_vrf.temperature_set}) return instructions