# -*- coding: utf-8 -*- from abc import ABCMeta, abstractmethod import arrow import numpy as np import pandas as pd from httpx import AsyncClient from loguru import logger from sqlalchemy.orm import Session from app.crud.space.weight import get_weights_by_space, update_weight from app.models.domain.feedback import FeedbackValue from app.resources.params import TEMPERATURE_TARGET_WEIGHT from app.schemas.sapce_weight import SpaceWeightUpdate from app.schemas.target import TemperatureTarget from app.services.platform import DataPlatformService from app.services.transfer import SpaceInfoService, Duoduo, Season from app.utils.date import get_time_str, TIME_FMT class StepSizeCalculator: """ Calculate adjustment step size of environment target when a user send feedback. """ def __init__(self, weight: dict[str, int]): self.weight = weight def run(self, realtime_temperature: float, comfortable_temperature: float, feedback: FeedbackValue) -> float: if feedback == FeedbackValue.so_hot or feedback == FeedbackValue.a_little_hot: base_step_size = 1.8 / (1 + np.exp((comfortable_temperature - realtime_temperature) / 2)) else: base_step_size = 1.8 / (1 + np.exp((realtime_temperature - comfortable_temperature) / 2)) return self.weight.get(str(feedback.value)) * base_step_size class SimpleStepSizeCalculator: """ Zhijiang, this is for you! """ def __init__(self): pass @staticmethod def run(feedback: FeedbackValue) -> float: if feedback == FeedbackValue.so_hot or feedback == FeedbackValue.a_little_hot: step_size = -1 else: step_size = 1 return step_size class NewTargetBuilder(metaclass=ABCMeta): """ Calculate a new target value. """ @abstractmethod def build(self) -> float: raise NotImplementedError class Clipper: """ Return a number which is in the range of [min, max]. """ def __init__(self, upper_limit: float = 32.0, lower_limit: float = 16.0): self.upper_limit = upper_limit self.lower_limit = lower_limit def cut(self, num: float) -> float: num = min(num, self.upper_limit) num = max(num, self.lower_limit) return num class NewTemperatureTargetBuilder(NewTargetBuilder): """ Calculate a new temperature target value. """ def __init__( self, realtime_temperature: float, actual_target: float, step_size: float ): self.realtime_temperature = realtime_temperature self.actual_target = actual_target self.step_size = step_size def build(self) -> float: new_actual_target = self.actual_target if self.step_size > 0: if self.realtime_temperature + self.step_size > self.actual_target: new_actual_target = self.realtime_temperature + self.step_size elif self.step_size < 0: if self.realtime_temperature + self.step_size < self.actual_target: new_actual_target = self.realtime_temperature + self.step_size clipper = Clipper() return clipper.cut(new_actual_target) class NewTempTargetBuilderV2(NewTargetBuilder): """ Calculate a new temperature target value for zhijiang. """ def __init__(self, actual_target: float, step_sze: float): self.actual_target = actual_target self.step_size = step_sze def build(self) -> float: new_actual_target = self.actual_target + self.step_size clipper = Clipper() return clipper.cut(new_actual_target) class TemporaryTargetInit: """ Initialize temporary temperature target. """ def __init__(self, step_size: float, default_target: float = 24.0): self.step_size = step_size self.default_target = default_target def build( self, extent: float, season: Season, realtime_temperature: float, ) -> tuple[float, float]: if np.isnan(realtime_temperature): upper_bound, lower_bound = ( self.default_target + 1.0, self.default_target - 1.0, ) else: actual_target = np.NAN if season == Season.cooling: actual_target = realtime_temperature - self.step_size elif season == Season.heating: actual_target = realtime_temperature + self.step_size clipper = Clipper() actual_target = clipper.cut(actual_target) upper_bound, lower_bound = actual_target + (extent / 2), actual_target - ( extent / 2 ) return lower_bound, upper_bound class GlobalTargetBaseBuilder(metaclass=ABCMeta): """ Generate global target and format it for sending to TransferServer. """ @abstractmethod def build(self, new_actual_target: float) -> dict: raise NotImplementedError class SimpleGlobalTemperatureTargetBuilder(GlobalTargetBaseBuilder): """ Set all day temperature target same. """ def __init__(self, current_global_target: TemperatureTarget): self.current_global_target = current_global_target def build(self, new_actual_target: float) -> dict: result = {} half_extent = self.current_global_target.extent / 2 for time_index in self.current_global_target.target_schedule["temperatureMin"].keys(): result.update({time_index: [new_actual_target - half_extent, new_actual_target + half_extent]}) return result class ExpSmoothingTemperatureTargetBuilder(GlobalTargetBaseBuilder): """ Exponential smooth previous changes and set them as new global target. """ def __init__( self, current_global_target: TemperatureTarget, previous_changes: pd.DataFrame ): self.current_global_target = current_global_target self.previous_changes = previous_changes def build(self, new_actual_target: float) -> dict: now_time = arrow.get(get_time_str(), TIME_FMT).time().strftime("%H%M%S") half_extent = self.current_global_target.extent / 2 previous_changes = pd.concat( [ pd.DataFrame({"timestamp": [now_time], "value": [new_actual_target]}), self.previous_changes, ] ) previous_changes.reset_index(inplace=True) previous_changes["weight1"] = previous_changes["index"].apply( lambda x: (1 / (x + 1)) ** 3 ) new_targets = {} time_index = self.current_global_target.target_schedule["temperatureMin"].keys() for item in time_index: previous_changes["delta"] = previous_changes["timestamp"].apply( lambda x: abs(arrow.get(str(x), "HHmmss") - arrow.get(item, "HHmmss")).seconds // (15 * 60) ) previous_changes["weight2"] = previous_changes["delta"].apply(lambda x: 0.5 ** x) previous_changes["weight"] = (previous_changes["weight1"] * previous_changes["weight2"]) temp_target = ( previous_changes["value"] * previous_changes["weight"] ).sum() / previous_changes["weight"].sum() new_targets.update( {item: [temp_target - half_extent, temp_target + half_extent]} ) return new_targets class TemporaryTargetBuilder: """ Generate global target and format it for sending to TransferServer. """ def __init__(self, lower_target: float, upper_target: float): self.lower_target = lower_target self.upper_target = upper_target def build(self) -> dict: now_str = get_time_str() time_index = ( arrow.get( arrow.get(now_str, TIME_FMT).shift(minutes=15).timestamp() // (15 * 60) * (15 * 60) ) .time() .strftime("%H%M%S") ) return {time_index: [self.lower_target, self.upper_target]} class Carrier(metaclass=ABCMeta): """ Fetch all you need data by one http client. """ @abstractmethod async def fetch_all(self) -> None: raise NotImplementedError class Packer(metaclass=ABCMeta): """ Arrange raw data for using. """ @abstractmethod def run(self) -> dict: raise NotImplementedError class AdjustmentController(metaclass=ABCMeta): """ Fetch some data, assemble target adjustment related functions and classes, send the new target to transfer server, and return a flag which denote whether transfer server need to request room/control. """ @abstractmethod async def run(self) -> bool: raise NotImplementedError class TemperatureTargetCarrier(Carrier): """ Fetch all the data that temperature target adjustment will use. """ def __init__(self, project_id: str, object_id: str): self.project_id = project_id self.object_id = object_id self.result = {} async def fetch_all(self) -> None: async with AsyncClient() as client: transfer = SpaceInfoService(client, self.project_id, self.object_id) duoduo = Duoduo(client, self.project_id) platform = DataPlatformService(client, self.project_id) realtime_temperature = await platform.get_realtime_temperature( self.object_id ) targets = await transfer.get_custom_target() all_day_targets = targets.get("normal_targets") current_target = await transfer.get_current_temperature_target() is_customized = await duoduo.is_customized(self.object_id) is_temporary = await transfer.is_temporary() season = await transfer.get_season() self.result = { "realtime_temperature": realtime_temperature, "all_day_targets": all_day_targets, "current_target": current_target, "is_customized": is_customized, "is_temporary": is_temporary, "season": season, } async def get_result(self) -> dict: await self.fetch_all() return self.result class TemperatureTargetV2Carrier(TemperatureTargetCarrier): """ Add previous adjustment result to result. """ async def fetch_previous_changes(self) -> None: async with AsyncClient() as client: transfer = SpaceInfoService(client, self.project_id, self.object_id) previous_changes = await transfer.env_database_get() self.result.update({"previous_changes": previous_changes["temperature"]}) async def get_result(self) -> dict: await self.fetch_all() await self.fetch_previous_changes() return self.result class TemperatureTargetPacker: """ Arrange raw data for temperature target adjustment. """ def __init__(self, data): self.result = data def get_temperature_target(self): all_day_targets = self.result["all_day_targets"] if len(all_day_targets) > 0: extent = ( all_day_targets["temperatureMax"].iloc[0] - all_day_targets["temperatureMin"].iloc[0] ) temperature_all_day_targets = ( all_day_targets[["temperatureMin", "temperatureMax"]].copy().to_dict() ) else: extent = 2.0 temperature_all_day_targets = {} target_params = { "is_customized": self.result["is_customized"], "is_temporary": self.result["is_temporary"], "target_schedule": temperature_all_day_targets, "extent": extent, } target = TemperatureTarget(**target_params) self.result.update({"target": target}) def get_result(self) -> dict: self.get_temperature_target() return self.result class TargetDeliver: """ Send target adjustment result to transfer. """ def __init__(self, project_id: str, space_id: str): self.project_id = project_id self.space_id = space_id async def send(self, controlled_result: dict): async with AsyncClient() as client: transfer = SpaceInfoService(client, self.project_id, self.space_id) if controlled_result["need_switch_off"]: await transfer.set_temporary_custom() if controlled_result["new_temporary_target"]: transfer.set_custom_target( "temperature", controlled_result["new_temporary_target"], "0" ) if controlled_result["new_global_target"]: transfer.set_custom_target( "temperature", controlled_result["new_global_target"], "1" ) if ( controlled_result["new_actual_target"] > 0 and controlled_result["need_run_room_control"] ): await transfer.env_database_set( "temperature", controlled_result["new_actual_target"] ) class WeightFlagDeliver: """ Change a space temporary weight when the space receives a feedback about temperature. """ def __init__(self, db: Session, feedback: FeedbackValue): self.db = db self.feedback = feedback def is_temperature_feedback(self) -> bool: if ( self.feedback == FeedbackValue.a_little_hot or self.feedback == FeedbackValue.so_hot or self.feedback == FeedbackValue.a_little_cold or self.feedback == FeedbackValue.so_cold ): flag = True else: flag = False return flag def save(self, space: str): if self.is_temperature_feedback(): weights = get_weights_by_space(self.db, space_id=space) for weight in weights: weight_in = SpaceWeightUpdate(temporary_weight=1.0) update_weight(self.db, db_weight=weight, weight_in=weight_in) class TemperatureTargetController: """ Primary flow of temperature target adjustment for Sequoia. """ def __init__(self, data: dict): self.data = data self.result = {} def run(self, feedback: FeedbackValue): need_switch_off = False new_temporary_target = {} new_global_target = {} new_actual_target = 0 if feedback == FeedbackValue.switch_off: need_switch_off = True need_run_room_control = True elif feedback == FeedbackValue.switch_on: need_run_room_control = True if not self.data["is_customized"]: new_lower, new_upper = TemporaryTargetInit(1, 24).build( self.data["extent"], self.data["season"], self.data["realtime_temperature"], ) new_temporary_target = TemporaryTargetBuilder( new_lower, new_upper ).build() elif ( feedback == FeedbackValue.a_little_hot or feedback == FeedbackValue.a_little_cold or feedback == FeedbackValue.so_hot or feedback == FeedbackValue.so_cold ): step_size = StepSizeCalculator(TEMPERATURE_TARGET_WEIGHT).run( self.data["realtime_temperature"], 25.0, feedback ) new_actual_target = NewTemperatureTargetBuilder( self.data["realtime_temperature"], self.data["current_target"], step_size, ).build() need_run_room_control = True if new_actual_target != self.data["current_target"]: new_global_target = SimpleGlobalTemperatureTargetBuilder( self.data["target"] ).build(new_actual_target) else: need_run_room_control = False self.result.update( { "need_switch_off": need_switch_off, "new_temporary_target": new_temporary_target, "new_global_target": new_global_target, "new_actual_target": new_actual_target, "need_run_room_control": need_run_room_control, } ) def get_result(self) -> dict: return self.result class TemperatureTargetControllerV2: """ Primary flow of temperature target adjustment for Zhonghai. """ def __init__(self, data: dict): self.data = data self.result = {} def run(self, feedback: FeedbackValue): need_switch_off = False new_temporary_target = {} new_global_target = {} new_actual_target = 0 if feedback == FeedbackValue.switch_off: need_switch_off = True need_run_room_control = True elif feedback == FeedbackValue.switch_on: need_run_room_control = True if not self.data["target"].is_customized: new_lower, new_upper = TemporaryTargetInit(1, 24).build( self.data["target"].extent, self.data["season"], self.data["realtime_temperature"], ) new_temporary_target = TemporaryTargetBuilder( new_lower, new_upper ).build() elif ( feedback == FeedbackValue.a_little_hot or feedback == FeedbackValue.a_little_cold or feedback == FeedbackValue.so_hot or feedback == FeedbackValue.so_cold ): step_size = StepSizeCalculator(TEMPERATURE_TARGET_WEIGHT).run( self.data["realtime_temperature"], 25.0, feedback ) new_actual_target = NewTemperatureTargetBuilder( self.data["realtime_temperature"], self.data["current_target"], step_size, ).build() need_run_room_control = True if new_actual_target != self.data["current_target"]: new_global_target = ExpSmoothingTemperatureTargetBuilder( self.data["target"], self.data["previous_changes"] ).build(new_actual_target) else: need_run_room_control = False self.result.update( { "need_switch_off": need_switch_off, "new_temporary_target": new_temporary_target, "new_global_target": new_global_target, "new_actual_target": new_actual_target, "need_run_room_control": need_run_room_control, } ) def get_result(self) -> dict: return self.result class TemperatureTargetControllerV3: """ Primary flow of temperature target adjustment for Zhijiang. """ def __init__(self, data: dict): self.data = data self.result = {} def run(self, feedback: FeedbackValue): need_switch_off = False new_temporary_target = {} new_global_target = {} new_actual_target = 0 if feedback == FeedbackValue.switch_off: need_switch_off = True need_run_room_control = True elif feedback == FeedbackValue.switch_on: need_run_room_control = True if not self.data["is_customized"]: new_lower, new_upper = TemporaryTargetInit(1, 24).build( self.data["extent"], self.data["season"], self.data["realtime_temperature"], ) new_temporary_target = TemporaryTargetBuilder( new_lower, new_upper ).build() elif ( feedback == FeedbackValue.a_little_hot or feedback == FeedbackValue.a_little_cold or feedback == FeedbackValue.so_hot or feedback == FeedbackValue.so_cold ): step_size = SimpleStepSizeCalculator.run(feedback) new_actual_target = NewTempTargetBuilderV2(self.data["current_target"], step_size).build() need_run_room_control = True if new_actual_target != self.data["current_target"]: new_global_target = SimpleGlobalTemperatureTargetBuilder( self.data["target"] ).build(new_actual_target) else: need_run_room_control = False self.result.update( { "need_switch_off": need_switch_off, "new_temporary_target": new_temporary_target, "new_global_target": new_global_target, "new_actual_target": new_actual_target, "need_run_room_control": need_run_room_control, } ) def get_result(self) -> dict: return self.result @logger.catch() async def temperature_target_control_v1( project_id: str, space_id: str, feedback: FeedbackValue ) -> bool: temperature_target_raw_data = await TemperatureTargetCarrier( project_id, space_id ).get_result() temperature_target_data = TemperatureTargetPacker( temperature_target_raw_data ).get_result() controller = TemperatureTargetController(temperature_target_data) controller.run(feedback) controlled_result = controller.get_result() await TargetDeliver(project_id, space_id).send(controlled_result) # WeightFlagDeliver(db, feedback).save(space_id) return controlled_result["need_run_room_control"] @logger.catch() async def temperature_target_control_v2( project_id: str, space_id: str, feedback: FeedbackValue ) -> bool: temperature_target_raw_data = await TemperatureTargetV2Carrier( project_id, space_id ).get_result() temperature_target_data = TemperatureTargetPacker( temperature_target_raw_data ).get_result() controller = TemperatureTargetControllerV2(temperature_target_data) controller.run(feedback) controlled_result = controller.get_result() await TargetDeliver(project_id, space_id).send(controlled_result) return controlled_result["need_run_room_control"] @logger.catch() async def temperature_target_control_v3( project_id: str, space_id: str, feedback: FeedbackValue ) -> bool: temperature_target_raw_data = await TemperatureTargetCarrier( project_id, space_id ).get_result() temperature_target_data = TemperatureTargetPacker( temperature_target_raw_data ).get_result() controller = TemperatureTargetControllerV3(temperature_target_data) controller.run(feedback) controlled_result = controller.get_result() await TargetDeliver(project_id, space_id).send(controlled_result) return controlled_result["need_run_room_control"] @logger.catch() async def get_target_after_feedback( project_id: str, space_id: str, feedback: FeedbackValue ) -> float: if (project_id == "Pj1101050030" or project_id == "Pj1101140020" or project_id == "Pj1101050039" or project_id == "Pj3301100002"): temperature_target_raw_data = await TemperatureTargetCarrier( project_id, space_id ).get_result() else: temperature_target_raw_data = await TemperatureTargetV2Carrier( project_id, space_id ).get_result() temperature_target_data = TemperatureTargetPacker( temperature_target_raw_data ).get_result() if project_id == "Pj1101050030" or project_id == 'Pj1101140020' or project_id == 'Pj1101050039': controller = TemperatureTargetController(temperature_target_data) elif project_id == "Pj3301100002": controller = TemperatureTargetControllerV3(temperature_target_data) else: controller = TemperatureTargetControllerV2(temperature_target_data) controller.run(feedback) controlled_result = controller.get_result() return controlled_result.get("new_actual_target")