|
- # -*- coding: utf-8 -*-
- from abc import ABCMeta, abstractmethod
- import arrow
- import numpy as np
- import pandas as pd
- from httpx import AsyncClient
- from loguru import logger
- from sqlalchemy.orm import Session
- from app.crud.space.weight import get_weights_by_space, update_weight
- from app.models.domain.feedback import FeedbackValue
- from app.resources.params import TEMPERATURE_TARGET_WEIGHT
- from app.schemas.sapce_weight import SpaceWeightUpdate
- from app.schemas.target import TemperatureTarget
- from app.services.platform import DataPlatformService
- from app.services.transfer import SpaceInfoService, Duoduo, Season
- from app.utils.date import get_time_str, TIME_FMT
- class StepSizeCalculator:
- """
- Calculate adjustment step size of environment target when a user send feedback.
- """
- def __init__(self, weight: dict[str, int]):
- self.weight = weight
- def run(self, realtime_temperature: float, comfortable_temperature: float, feedback: FeedbackValue) -> float:
- if feedback == FeedbackValue.so_hot or feedback == FeedbackValue.a_little_hot:
- base_step_size = 1.8 / (1 + np.exp((comfortable_temperature - realtime_temperature) / 2))
- else:
- base_step_size = 1.8 / (1 + np.exp((realtime_temperature - comfortable_temperature) / 2))
- return self.weight.get(str(feedback.value)) * base_step_size
- class SimpleStepSizeCalculator:
- """
- Zhijiang, this is for you!
- """
- def __init__(self):
- pass
- @staticmethod
- def run(feedback: FeedbackValue) -> float:
- if feedback == FeedbackValue.so_hot or feedback == FeedbackValue.a_little_hot:
- step_size = -1
- else:
- step_size = 1
- return step_size
- class NewTargetBuilder(metaclass=ABCMeta):
- """
- Calculate a new target value.
- """
- @abstractmethod
- def build(self) -> float:
- raise NotImplementedError
- class Clipper:
- """
- Return a number which is in the range of [min, max].
- """
- def __init__(self, upper_limit: float = 32.0, lower_limit: float = 16.0):
- self.upper_limit = upper_limit
- self.lower_limit = lower_limit
- def cut(self, num: float) -> float:
- num = min(num, self.upper_limit)
- num = max(num, self.lower_limit)
- return num
- class NewTemperatureTargetBuilder(NewTargetBuilder):
- """
- Calculate a new temperature target value.
- """
- def __init__(
- self, realtime_temperature: float, actual_target: float, step_size: float
- ):
- self.realtime_temperature = realtime_temperature
- self.actual_target = actual_target
- self.step_size = step_size
- def build(self) -> float:
- new_actual_target = self.actual_target
- if self.step_size > 0:
- if self.realtime_temperature + self.step_size > self.actual_target:
- new_actual_target = self.realtime_temperature + self.step_size
- elif self.step_size < 0:
- if self.realtime_temperature + self.step_size < self.actual_target:
- new_actual_target = self.realtime_temperature + self.step_size
- clipper = Clipper()
- return clipper.cut(new_actual_target)
- class NewTempTargetBuilderV2(NewTargetBuilder):
- """
- Calculate a new temperature target value for zhijiang.
- """
- def __init__(self, actual_target: float, step_sze: float):
- self.actual_target = actual_target
- self.step_size = step_sze
- def build(self) -> float:
- new_actual_target = self.actual_target + self.step_size
- clipper = Clipper()
- return clipper.cut(new_actual_target)
- class TemporaryTargetInit:
- """
- Initialize temporary temperature target.
- """
- def __init__(self, step_size: float, default_target: float = 24.0):
- self.step_size = step_size
- self.default_target = default_target
- def build(
- self,
- extent: float,
- season: Season,
- realtime_temperature: float,
- ) -> tuple[float, float]:
- if np.isnan(realtime_temperature):
- upper_bound, lower_bound = (
- self.default_target + 1.0,
- self.default_target - 1.0,
- )
- else:
- actual_target = np.NAN
- if season == Season.cooling:
- actual_target = realtime_temperature - self.step_size
- elif season == Season.heating:
- actual_target = realtime_temperature + self.step_size
- clipper = Clipper()
- actual_target = clipper.cut(actual_target)
- upper_bound, lower_bound = actual_target + (extent / 2), actual_target - (
- extent / 2
- )
- return lower_bound, upper_bound
- class GlobalTargetBaseBuilder(metaclass=ABCMeta):
- """
- Generate global target and format it for sending to TransferServer.
- """
- @abstractmethod
- def build(self, new_actual_target: float) -> dict:
- raise NotImplementedError
- class SimpleGlobalTemperatureTargetBuilder(GlobalTargetBaseBuilder):
- """
- Set all day temperature target same.
- """
- def __init__(self, current_global_target: TemperatureTarget):
- self.current_global_target = current_global_target
- def build(self, new_actual_target: float) -> dict:
- result = {}
- half_extent = self.current_global_target.extent / 2
- for time_index in self.current_global_target.target_schedule["temperatureMin"].keys():
- result.update({time_index: [new_actual_target - half_extent, new_actual_target + half_extent]})
- return result
- class ExpSmoothingTemperatureTargetBuilder(GlobalTargetBaseBuilder):
- """
- Exponential smooth previous changes and set them as new global target.
- """
- def __init__(
- self, current_global_target: TemperatureTarget, previous_changes: pd.DataFrame
- ):
- self.current_global_target = current_global_target
- self.previous_changes = previous_changes
- def build(self, new_actual_target: float) -> dict:
- now_time = arrow.get(get_time_str(), TIME_FMT).time().strftime("%H%M%S")
- half_extent = self.current_global_target.extent / 2
- previous_changes = pd.concat(
- [
- pd.DataFrame({"timestamp": [now_time], "value": [new_actual_target]}),
- self.previous_changes,
- ]
- )
- previous_changes.reset_index(inplace=True)
- previous_changes["weight1"] = previous_changes["index"].apply(
- lambda x: (1 / (x + 1)) ** 3
- )
- new_targets = {}
- time_index = self.current_global_target.target_schedule["temperatureMin"].keys()
- for item in time_index:
- previous_changes["delta"] = previous_changes["timestamp"].apply(
- lambda x: abs(arrow.get(str(x), "HHmmss") - arrow.get(item, "HHmmss")).seconds // (15 * 60)
- )
- previous_changes["weight2"] = previous_changes["delta"].apply(lambda x: 0.5 ** x)
- previous_changes["weight"] = (previous_changes["weight1"] * previous_changes["weight2"])
- temp_target = (
- previous_changes["value"] * previous_changes["weight"]
- ).sum() / previous_changes["weight"].sum()
- new_targets.update(
- {item: [temp_target - half_extent, temp_target + half_extent]}
- )
- return new_targets
- class TemporaryTargetBuilder:
- """
- Generate global target and format it for sending to TransferServer.
- """
- def __init__(self, lower_target: float, upper_target: float):
- self.lower_target = lower_target
- self.upper_target = upper_target
- def build(self) -> dict:
- now_str = get_time_str()
- time_index = (
- arrow.get(
- arrow.get(now_str, TIME_FMT).shift(minutes=15).timestamp()
- // (15 * 60)
- * (15 * 60)
- )
- .time()
- .strftime("%H%M%S")
- )
- return {time_index: [self.lower_target, self.upper_target]}
- class Carrier(metaclass=ABCMeta):
- """
- Fetch all you need data by one http client.
- """
- @abstractmethod
- async def fetch_all(self) -> None:
- raise NotImplementedError
- class Packer(metaclass=ABCMeta):
- """
- Arrange raw data for using.
- """
- @abstractmethod
- def run(self) -> dict:
- raise NotImplementedError
- class AdjustmentController(metaclass=ABCMeta):
- """
- Fetch some data, assemble target adjustment related functions and classes,
- send the new target to transfer server,
- and return a flag which denote whether transfer server need to request room/control.
- """
- @abstractmethod
- async def run(self) -> bool:
- raise NotImplementedError
- class TemperatureTargetCarrier(Carrier):
- """
- Fetch all the data that temperature target adjustment will use.
- """
- def __init__(self, project_id: str, object_id: str):
- self.project_id = project_id
- self.object_id = object_id
- self.result = {}
- async def fetch_all(self) -> None:
- async with AsyncClient() as client:
- transfer = SpaceInfoService(client, self.project_id, self.object_id)
- duoduo = Duoduo(client, self.project_id)
- platform = DataPlatformService(client, self.project_id)
- realtime_temperature = await platform.get_realtime_temperature(
- self.object_id
- )
- targets = await transfer.get_custom_target()
- all_day_targets = targets.get("normal_targets")
- current_target = await transfer.get_current_temperature_target()
- is_customized = await duoduo.is_customized(self.object_id)
- is_temporary = await transfer.is_temporary()
- season = await transfer.get_season()
- self.result = {
- "realtime_temperature": realtime_temperature,
- "all_day_targets": all_day_targets,
- "current_target": current_target,
- "is_customized": is_customized,
- "is_temporary": is_temporary,
- "season": season,
- }
- async def get_result(self) -> dict:
- await self.fetch_all()
- return self.result
- class TemperatureTargetV2Carrier(TemperatureTargetCarrier):
- """
- Add previous adjustment result to result.
- """
- async def fetch_previous_changes(self) -> None:
- async with AsyncClient() as client:
- transfer = SpaceInfoService(client, self.project_id, self.object_id)
- previous_changes = await transfer.env_database_get()
- self.result.update({"previous_changes": previous_changes["temperature"]})
- async def get_result(self) -> dict:
- await self.fetch_all()
- await self.fetch_previous_changes()
- return self.result
- class TemperatureTargetPacker:
- """
- Arrange raw data for temperature target adjustment.
- """
- def __init__(self, data):
- self.result = data
- def get_temperature_target(self):
- all_day_targets = self.result["all_day_targets"]
- if len(all_day_targets) > 0:
- extent = (
- all_day_targets["temperatureMax"].iloc[0]
- - all_day_targets["temperatureMin"].iloc[0]
- )
- temperature_all_day_targets = (
- all_day_targets[["temperatureMin", "temperatureMax"]].copy().to_dict()
- )
- else:
- extent = 2.0
- temperature_all_day_targets = {}
- target_params = {
- "is_customized": self.result["is_customized"],
- "is_temporary": self.result["is_temporary"],
- "target_schedule": temperature_all_day_targets,
- "extent": extent,
- }
- target = TemperatureTarget(**target_params)
- self.result.update({"target": target})
- def get_result(self) -> dict:
- self.get_temperature_target()
- return self.result
- class TargetDeliver:
- """
- Send target adjustment result to transfer.
- """
- def __init__(self, project_id: str, space_id: str):
- self.project_id = project_id
- self.space_id = space_id
- async def send(self, controlled_result: dict):
- async with AsyncClient() as client:
- transfer = SpaceInfoService(client, self.project_id, self.space_id)
- if controlled_result["need_switch_off"]:
- await transfer.set_temporary_custom()
- if controlled_result["new_temporary_target"]:
- transfer.set_custom_target(
- "temperature", controlled_result["new_temporary_target"], "0"
- )
- if controlled_result["new_global_target"]:
- transfer.set_custom_target(
- "temperature", controlled_result["new_global_target"], "1"
- )
- if (
- controlled_result["new_actual_target"] > 0
- and controlled_result["need_run_room_control"]
- ):
- await transfer.env_database_set(
- "temperature", controlled_result["new_actual_target"]
- )
- class WeightFlagDeliver:
- """
- Change a space temporary weight when the space receives a feedback about
- temperature.
- """
- def __init__(self, db: Session, feedback: FeedbackValue):
- self.db = db
- self.feedback = feedback
- def is_temperature_feedback(self) -> bool:
- if (
- self.feedback == FeedbackValue.a_little_hot
- or self.feedback == FeedbackValue.so_hot
- or self.feedback == FeedbackValue.a_little_cold
- or self.feedback == FeedbackValue.so_cold
- ):
- flag = True
- else:
- flag = False
- return flag
- def save(self, space: str):
- if self.is_temperature_feedback():
- weights = get_weights_by_space(self.db, space_id=space)
- for weight in weights:
- weight_in = SpaceWeightUpdate(temporary_weight=1.0)
- update_weight(self.db, db_weight=weight, weight_in=weight_in)
- class TemperatureTargetController:
- """
- Primary flow of temperature target adjustment for Sequoia.
- """
- def __init__(self, data: dict):
- self.data = data
- self.result = {}
- def run(self, feedback: FeedbackValue):
- need_switch_off = False
- new_temporary_target = {}
- new_global_target = {}
- new_actual_target = 0
- if feedback == FeedbackValue.switch_off:
- need_switch_off = True
- need_run_room_control = True
- elif feedback == FeedbackValue.switch_on:
- need_run_room_control = True
- if not self.data["is_customized"]:
- new_lower, new_upper = TemporaryTargetInit(1, 24).build(
- self.data["extent"],
- self.data["season"],
- self.data["realtime_temperature"],
- )
- new_temporary_target = TemporaryTargetBuilder(
- new_lower, new_upper
- ).build()
- elif (
- feedback == FeedbackValue.a_little_hot
- or feedback == FeedbackValue.a_little_cold
- or feedback == FeedbackValue.so_hot
- or feedback == FeedbackValue.so_cold
- ):
- step_size = StepSizeCalculator(TEMPERATURE_TARGET_WEIGHT).run(
- self.data["realtime_temperature"], 25.0, feedback
- )
- new_actual_target = NewTemperatureTargetBuilder(
- self.data["realtime_temperature"],
- self.data["current_target"],
- step_size,
- ).build()
- need_run_room_control = True
- if new_actual_target != self.data["current_target"]:
- new_global_target = SimpleGlobalTemperatureTargetBuilder(
- self.data["target"]
- ).build(new_actual_target)
- else:
- need_run_room_control = False
- self.result.update(
- {
- "need_switch_off": need_switch_off,
- "new_temporary_target": new_temporary_target,
- "new_global_target": new_global_target,
- "new_actual_target": new_actual_target,
- "need_run_room_control": need_run_room_control,
- }
- )
- def get_result(self) -> dict:
- return self.result
- class TemperatureTargetControllerV2:
- """
- Primary flow of temperature target adjustment for Zhonghai.
- """
- def __init__(self, data: dict):
- self.data = data
- self.result = {}
- def run(self, feedback: FeedbackValue):
- need_switch_off = False
- new_temporary_target = {}
- new_global_target = {}
- new_actual_target = 0
- if feedback == FeedbackValue.switch_off:
- need_switch_off = True
- need_run_room_control = True
- elif feedback == FeedbackValue.switch_on:
- need_run_room_control = True
- if not self.data["target"].is_customized:
- new_lower, new_upper = TemporaryTargetInit(1, 24).build(
- self.data["target"].extent,
- self.data["season"],
- self.data["realtime_temperature"],
- )
- new_temporary_target = TemporaryTargetBuilder(
- new_lower, new_upper
- ).build()
- elif (
- feedback == FeedbackValue.a_little_hot
- or feedback == FeedbackValue.a_little_cold
- or feedback == FeedbackValue.so_hot
- or feedback == FeedbackValue.so_cold
- ):
- step_size = StepSizeCalculator(TEMPERATURE_TARGET_WEIGHT).run(
- self.data["realtime_temperature"], 25.0, feedback
- )
- new_actual_target = NewTemperatureTargetBuilder(
- self.data["realtime_temperature"],
- self.data["current_target"],
- step_size,
- ).build()
- need_run_room_control = True
- if new_actual_target != self.data["current_target"]:
- new_global_target = ExpSmoothingTemperatureTargetBuilder(
- self.data["target"], self.data["previous_changes"]
- ).build(new_actual_target)
- else:
- need_run_room_control = False
- self.result.update(
- {
- "need_switch_off": need_switch_off,
- "new_temporary_target": new_temporary_target,
- "new_global_target": new_global_target,
- "new_actual_target": new_actual_target,
- "need_run_room_control": need_run_room_control,
- }
- )
- def get_result(self) -> dict:
- return self.result
- class TemperatureTargetControllerV3:
- """
- Primary flow of temperature target adjustment for Zhijiang.
- """
- def __init__(self, data: dict):
- self.data = data
- self.result = {}
- def run(self, feedback: FeedbackValue):
- need_switch_off = False
- new_temporary_target = {}
- new_global_target = {}
- new_actual_target = 0
- if feedback == FeedbackValue.switch_off:
- need_switch_off = True
- need_run_room_control = True
- elif feedback == FeedbackValue.switch_on:
- need_run_room_control = True
- if not self.data["is_customized"]:
- new_lower, new_upper = TemporaryTargetInit(1, 24).build(
- self.data["extent"],
- self.data["season"],
- self.data["realtime_temperature"],
- )
- new_temporary_target = TemporaryTargetBuilder(
- new_lower, new_upper
- ).build()
- elif (
- feedback == FeedbackValue.a_little_hot
- or feedback == FeedbackValue.a_little_cold
- or feedback == FeedbackValue.so_hot
- or feedback == FeedbackValue.so_cold
- ):
- step_size = SimpleStepSizeCalculator.run(feedback)
- new_actual_target = NewTempTargetBuilderV2(self.data["current_target"], step_size).build()
- need_run_room_control = True
- if new_actual_target != self.data["current_target"]:
- new_global_target = SimpleGlobalTemperatureTargetBuilder(
- self.data["target"]
- ).build(new_actual_target)
- else:
- need_run_room_control = False
- self.result.update(
- {
- "need_switch_off": need_switch_off,
- "new_temporary_target": new_temporary_target,
- "new_global_target": new_global_target,
- "new_actual_target": new_actual_target,
- "need_run_room_control": need_run_room_control,
- }
- )
- def get_result(self) -> dict:
- return self.result
- @logger.catch()
- async def temperature_target_control_v1(
- project_id: str, space_id: str, feedback: FeedbackValue
- ) -> bool:
- temperature_target_raw_data = await TemperatureTargetCarrier(
- project_id, space_id
- ).get_result()
- temperature_target_data = TemperatureTargetPacker(
- temperature_target_raw_data
- ).get_result()
- controller = TemperatureTargetController(temperature_target_data)
- controller.run(feedback)
- controlled_result = controller.get_result()
- await TargetDeliver(project_id, space_id).send(controlled_result)
- # WeightFlagDeliver(db, feedback).save(space_id)
- return controlled_result["need_run_room_control"]
- @logger.catch()
- async def temperature_target_control_v2(
- project_id: str, space_id: str, feedback: FeedbackValue
- ) -> bool:
- temperature_target_raw_data = await TemperatureTargetV2Carrier(
- project_id, space_id
- ).get_result()
- temperature_target_data = TemperatureTargetPacker(
- temperature_target_raw_data
- ).get_result()
- controller = TemperatureTargetControllerV2(temperature_target_data)
- controller.run(feedback)
- controlled_result = controller.get_result()
- await TargetDeliver(project_id, space_id).send(controlled_result)
- return controlled_result["need_run_room_control"]
- @logger.catch()
- async def temperature_target_control_v3(
- project_id: str, space_id: str, feedback: FeedbackValue
- ) -> bool:
- temperature_target_raw_data = await TemperatureTargetCarrier(
- project_id, space_id
- ).get_result()
- temperature_target_data = TemperatureTargetPacker(
- temperature_target_raw_data
- ).get_result()
- controller = TemperatureTargetControllerV3(temperature_target_data)
- controller.run(feedback)
- controlled_result = controller.get_result()
- await TargetDeliver(project_id, space_id).send(controlled_result)
- return controlled_result["need_run_room_control"]
- @logger.catch()
- async def get_target_after_feedback(
- project_id: str, space_id: str, feedback: FeedbackValue
- ) -> float:
- if (project_id == "Pj1101050030" or project_id == "Pj1101140020" or project_id == "Pj1101050039"
- or project_id == "Pj3301100002"):
- temperature_target_raw_data = await TemperatureTargetCarrier(
- project_id, space_id
- ).get_result()
- else:
- temperature_target_raw_data = await TemperatureTargetV2Carrier(
- project_id, space_id
- ).get_result()
- temperature_target_data = TemperatureTargetPacker(
- temperature_target_raw_data
- ).get_result()
- if project_id == "Pj1101050030" or project_id == 'Pj1101140020' or project_id == 'Pj1101050039':
- controller = TemperatureTargetController(temperature_target_data)
- elif project_id == "Pj3301100002":
- controller = TemperatureTargetControllerV3(temperature_target_data)
- else:
- controller = TemperatureTargetControllerV2(temperature_target_data)
- controller.run(feedback)
- controlled_result = controller.get_result()
- return controlled_result.get("new_actual_target")
|