123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- # -*- coding: utf-8 -*-
- from abc import abstractmethod
- from typing import Dict, Tuple, Optional
- import arrow
- import numpy as np
- import pandas as pd
- from httpx import AsyncClient
- from loguru import logger
- from app.controllers.controller import Controller
- from app.resources.params import (
- TEMPERATURE_RELATED_FEEDBACK_WEIGHT,
- TEMPERATURE_RELATED_FEEDBACK,
- CO2_RELATED_FEEDBACK_WEIGHT,
- SWITCH_RELATED_FEEDBACK
- )
- from app.services.platform import DataPlatformService
- from app.services.transfer import SpaceInfoService, Season
- from app.utils.date import get_time_str, get_quarter_minutes, TIME_FMT
- class TargetController(Controller):
- def __init__(
- self,
- realtime_data: float,
- feedback: Dict,
- is_customized: bool,
- is_temporary: bool,
- current_targets: pd.DataFrame,
- ) -> None:
- super(TargetController, self).__init__()
- self._realtime_data = realtime_data
- self._feedback = feedback
- self._is_customized = is_customized
- self._is_temporary = is_temporary
- self._current_targets = current_targets
- self._now_time = arrow.get(get_time_str(), TIME_FMT).time().strftime('%H%M%S')
- self._quarter_time = get_quarter_minutes(get_time_str())
- async def calculate_diff(self, weight: Dict) -> float:
- related_feedback = [v for k, v in self._feedback.items() if k in weight]
- related_feedback = np.array(related_feedback)
- weight = np.array(list(weight.values()))
- feedback_count = related_feedback.sum()
- diff = 0
- if feedback_count > 0:
- diff = np.dot(related_feedback, weight) / feedback_count
- return diff
- @abstractmethod
- async def init_temporary(self):
- pass
- @abstractmethod
- async def get_targets(self) -> float:
- pass
- async def generate_temporary(self, lower, upper):
- now_str = get_time_str()
- time_index = arrow.get(arrow.get(now_str, TIME_FMT).shift(minutes=15).timestamp
- // (15 * 60) * (15 * 60)).time().strftime('%H%M%S')
- result = {time_index: [lower, upper]}
- self._results.update({'temporary_targets': result})
- async def readjust_global(self, latest_change: float, previous_changes: pd.DataFrame):
- previous_changes = pd.concat([
- pd.DataFrame({'timestamp': [self._now_time], 'value': [latest_change]}),
- previous_changes,
- ])
- previous_changes.reset_index(inplace=True)
- previous_changes['weight1'] = previous_changes['index'].apply(lambda x: (1 / (x + 1)) ** 3)
- new_targets = []
- time_index = self._current_targets.reset_index()['time']
- for item in time_index:
- previous_changes['delta'] = previous_changes['timestamp'].apply(
- lambda x: abs(arrow.get(str(x), 'HHmmss') - arrow.get(item, 'HHmmss')).seconds // (15 * 60)
- )
- previous_changes['weight2'] = previous_changes['delta'].apply(lambda x: 0.5 ** x)
- previous_changes['weight'] = previous_changes['weight1'] * previous_changes['weight2']
- new_targets.append(
- (previous_changes['value'] * previous_changes['weight']).sum() / previous_changes['weight'].sum()
- )
- self._current_targets['new_targets'] = new_targets
- @abstractmethod
- async def run(self):
- pass
- class TemperatureTargetController(TargetController):
- def __init__(
- self,
- realtime_data: float,
- feedback: Dict,
- is_customized: bool,
- is_temporary: bool,
- current_targets: pd.DataFrame,
- season: Season,
- previous_changes: Optional[pd.DataFrame] = None
- ) -> None:
- super(TemperatureTargetController, self).__init__(
- realtime_data,
- feedback,
- is_customized,
- is_temporary,
- current_targets
- )
- self._season = season
- self._previous_changes = previous_changes
- @staticmethod
- def _cut(value: float) -> float:
- _LOWER_LIMIT = 22.0
- _UPPER_LIMIT = 28.0
- value = min(value, _UPPER_LIMIT)
- value = max(value, _LOWER_LIMIT)
- return value
- async def init_temporary(self) -> Tuple[float, float]:
- _VAR = 2
- _RANGE = 1
- new_target = 24.0
- new_lower_bound, new_upper_bound = new_target - 1.0, new_target + 1.0
- if not np.isnan(self._realtime_data):
- if self._season == Season.cooling:
- if ('a little hot' in self._feedback
- or 'so hot' in self._feedback
- or 'switch on' in self._feedback):
- mid = self._realtime_data - _VAR
- mid = self._cut(mid)
- new_lower_bound = mid - _RANGE
- new_upper_bound = mid + _RANGE
- elif self._season == Season.heating:
- if ('a little cold' in self._feedback
- or 'so cold' in self._feedback
- or 'switch on' in self._feedback):
- mid = self._realtime_data + _VAR
- mid = self._cut(mid)
- new_lower_bound = mid - _RANGE
- new_upper_bound = mid + _RANGE
- return new_lower_bound, new_upper_bound
- async def get_targets(self) -> float:
- current_lower_target = self._current_targets['temperatureMin'].loc[self._quarter_time]
- current_upper_target = self._current_targets['temperatureMax'].loc[self._quarter_time]
- if np.isnan(current_lower_target):
- current_lower_target = 23.0
- if np.isnan(current_upper_target):
- current_upper_target = 25.0
- return (current_lower_target + current_upper_target) / 2
- async def readjust_current(self, current: float, diff: float) -> float:
- _RANGE = 2
- new_target = current
- if not np.isnan(self._realtime_data):
- if self._season == Season.cooling:
- standard = current + 1.0
- elif self._season == Season.heating:
- standard = current - 1.0
- else:
- standard = current
- if (diff > 0 and self._realtime_data + _RANGE > standard
- or diff < 0 and self._realtime_data - _RANGE < standard):
- new_target = self._realtime_data + diff
- return new_target
- async def generate_global(self):
- _RANGE = 1
- new_targets = self._current_targets['new_targets'].apply(lambda x: [self._cut(x) - _RANGE,
- self._cut(x) + _RANGE])
- time_index = self._current_targets.reset_index()['time']
- result = {}
- for i in range(len(time_index)):
- result.update({time_index[i]: new_targets[i]})
- self._results.update({'global_targets': result})
- async def run(self):
- diff = await self.calculate_diff(TEMPERATURE_RELATED_FEEDBACK_WEIGHT)
- if diff != 0:
- if not self._is_customized:
- lower_bound, upper_bound = await self.init_temporary()
- await self.generate_temporary(lower_bound, upper_bound)
- else:
- current_target = await self.get_targets()
- new_target = await self.readjust_current(current_target, diff)
- new_target = self._cut(new_target)
- if not self._is_temporary:
- self._results.update({'latest_change': new_target})
- await self.readjust_global(new_target, self._previous_changes)
- await self.generate_global()
- else:
- await self.generate_temporary(new_target - 1.0, new_target + 1.0)
- else:
- return
- class Co2TargetController(TargetController):
- def __init__(
- self,
- realtime_data: float,
- feedback: Dict,
- is_customized: bool,
- is_temporary: bool,
- current_targets: pd.DataFrame,
- previous_changes: Optional[pd.DataFrame] = None
- ) -> None:
- super(Co2TargetController, self).__init__(
- realtime_data,
- feedback,
- is_customized,
- is_temporary,
- current_targets
- )
- self._previous_changes = previous_changes
- @staticmethod
- def _cut(value: float) -> float:
- _UPPER_LIMIT = 1000.0
- value = min(value, _UPPER_LIMIT)
- return value
- async def init_temporary(self) -> float:
- new_target = 1000
- diff = await self.calculate_diff(CO2_RELATED_FEEDBACK_WEIGHT)
- if not np.isnan(self._realtime_data):
- new_target += diff
- return self._cut(new_target)
- async def get_targets(self) -> float:
- current_upper_target = self._current_targets['co2Max'].loc[self._quarter_time]
- if np.isnan(current_upper_target):
- current_upper_target = 500.0
- return current_upper_target
- async def readjust_current(self, lower: float, upper: float, diff: float) -> float:
- new_target = upper - lower
- if np.isnan(self._realtime_data):
- new_target += diff
- else:
- if (diff > 50 and self._realtime_data + 100 > upper
- or diff < -50 and self._realtime_data - 100 < upper):
- new_target = self._realtime_data + diff
- return self._cut(new_target)
- async def generate_global(self):
- new_targets = self._current_targets['new_targets'].apply(lambda x: [0, x])
- time_index = self._current_targets.reset_index()['time']
- result = {}
- for i in range(len(time_index)):
- result.update({time_index[i]: new_targets[i]})
- self._results.update({'global_targets': result})
- async def run(self):
- diff = await self.calculate_diff(CO2_RELATED_FEEDBACK_WEIGHT)
- if diff != 0:
- if not self._is_customized:
- upper_bound = await self.init_temporary()
- await self.generate_temporary(0, upper_bound)
- else:
- current_upper = await self.get_targets()
- upper_bound = await self.readjust_current(0, current_upper, diff)
- if not self._is_temporary:
- self._results.update({'latest_change': upper_bound})
- await self.readjust_global(upper_bound, self._previous_changes)
- await self.generate_global()
- else:
- await self.generate_temporary(0, upper_bound)
- else:
- return
- @logger.catch()
- async def readjust_all_target(
- project_id: str,
- space_id: str,
- wechat_time: Optional[str] = None,
- feedback: Optional[Dict] = None
- ):
- async with AsyncClient() as client:
- transfer = SpaceInfoService(client, project_id, space_id)
- platform = DataPlatformService(client, project_id)
- realtime_temperature = await platform.get_realtime_temperature(space_id)
- current_targets = await transfer.get_custom_target()
- if wechat_time:
- feedback = await transfer.get_feedback(wechat_time)
- is_customized = await transfer.is_customized()
- is_temporary = await transfer.is_temporary()
- season = await transfer.get_season()
- previous_changes = await transfer.env_database_get()
- if feedback.get('switch off') and feedback.get('switch off') > 0:
- need_switch_off = True
- for item in SWITCH_RELATED_FEEDBACK:
- if feedback.get(item) and feedback.get(item) > 0:
- need_switch_off = False
- break
- else:
- need_switch_off = False
- need_run_room_control = False
- if need_switch_off:
- need_run_room_control = True
- async with AsyncClient() as client:
- transfer = SpaceInfoService(client, project_id, space_id)
- await transfer.set_temporary_custom()
- return need_run_room_control
- temperature_results = {}
- for item in TEMPERATURE_RELATED_FEEDBACK:
- if feedback.get(item) and feedback.get(item) > 0:
- temperature_controller = TemperatureTargetController(
- realtime_temperature,
- feedback,
- is_customized,
- is_temporary,
- current_targets[['temperatureMin', 'temperatureMax']].copy(),
- season,
- previous_changes['temperature']
- )
- await temperature_controller.run()
- temperature_results = temperature_controller.get_results()
- break
- if temperature_results:
- need_run_room_control = True
- async with AsyncClient() as client:
- transfer = SpaceInfoService(client, project_id, space_id)
- if temperature_results.get('temporary_targets'):
- await transfer.set_custom_target('temperature', temperature_results.get('temporary_targets'), '0')
- if temperature_results.get('global_targets'):
- await transfer.set_custom_target('temperature', temperature_results.get('global_targets'), '1')
- if temperature_results.get('latest_change'):
- await transfer.env_database_set('temperature', temperature_results.get('latest_change'))
- return need_run_room_control
|