# -*- coding: utf-8 -*- import time from abc import abstractmethod from typing import Dict, Tuple, Optional import arrow import numpy as np import pandas as pd from httpx import AsyncClient from loguru import logger from app.controllers.controller import Controller from app.resources.params import ( TEMPERATURE_RELATED_FEEDBACK_WEIGHT, TEMPERATURE_RELATED_FEEDBACK, CO2_RELATED_FEEDBACK_WEIGHT, SWITCH_RELATED_FEEDBACK ) from app.services.platform import DataPlatformService from app.services.transfer import SpaceInfoService, Season from app.utils.date import get_time_str, get_quarter_minutes, TIME_FMT class TargetController(Controller): def __init__( self, realtime_data: float, feedback: Dict, is_customized: bool, is_temporary: bool, current_targets: pd.DataFrame, ) -> None: super(TargetController, self).__init__() self._realtime_data = realtime_data self._feedback = feedback self._is_customized = is_customized self._is_temporary = is_temporary self._current_targets = current_targets self._now_time = arrow.get(get_time_str(), TIME_FMT).time().strftime('%H%M%S') self._quarter_time = get_quarter_minutes(get_time_str()) async def calculate_diff(self, weight: Dict) -> float: related_feedback = [v for k, v in self._feedback.items() if k in weight] related_feedback = np.array(related_feedback) weight = np.array(list(weight.values())) feedback_count = related_feedback.sum() diff = 0 if feedback_count > 0: diff = np.dot(related_feedback, weight) / feedback_count return diff @abstractmethod async def init_temporary(self): pass @abstractmethod async def get_targets(self) -> float: pass async def generate_temporary(self, lower, upper): now_str = get_time_str() time_index = arrow.get(arrow.get(now_str, TIME_FMT).shift(minutes=15).timestamp // (15 * 60) * (15 * 60)).time().strftime('%H%M%S') result = {time_index: [lower, upper]} self._results.update({'temporary_targets': result}) async def readjust_global(self, latest_change: float, previous_changes: pd.DataFrame): previous_changes = pd.concat([ pd.DataFrame({'timestamp': [self._now_time], 'value': [latest_change]}), previous_changes, ]) previous_changes.reset_index(inplace=True) previous_changes['weight1'] = previous_changes['index'].apply(lambda x: (1 / (x + 1)) ** 3) new_targets = [] time_index = self._current_targets.reset_index()['time'] for item in time_index: previous_changes['delta'] = previous_changes['timestamp'].apply( lambda x: abs(arrow.get(str(x), 'HHmmss') - arrow.get(item, 'HHmmss')).seconds // (15 * 60) ) previous_changes['weight2'] = previous_changes['delta'].apply(lambda x: 0.5 ** x) previous_changes['weight'] = previous_changes['weight1'] * previous_changes['weight2'] new_targets.append( (previous_changes['value'] * previous_changes['weight']).sum() / previous_changes['weight'].sum() ) self._current_targets['new_targets'] = new_targets @abstractmethod async def run(self): pass class TemperatureTargetController(TargetController): def __init__( self, realtime_data: float, feedback: Dict, is_customized: bool, is_temporary: bool, current_targets: pd.DataFrame, season: Season, previous_changes: Optional[pd.DataFrame] = None ) -> None: super(TemperatureTargetController, self).__init__( realtime_data, feedback, is_customized, is_temporary, current_targets ) self._season = season self._previous_changes = previous_changes @staticmethod def _cut(value: float) -> float: _LOWER_LIMIT = 22.0 _UPPER_LIMIT = 28.0 value = min(value, _UPPER_LIMIT) value = max(value, _LOWER_LIMIT) return value async def init_temporary(self) -> Tuple[float, float]: _VAR = 2 _RANGE = 1 new_target = 24.0 new_lower_bound, new_upper_bound = new_target - 1.0, new_target + 1.0 if not np.isnan(self._realtime_data): if self._season == Season.cooling: if ('a little hot' in self._feedback or 'so hot' in self._feedback or 'switch on' in self._feedback): mid = self._realtime_data - _VAR new_lower_bound = mid - _RANGE new_upper_bound = mid + _RANGE elif self._season == Season.heating: if ('a little cold' in self._feedback or 'so cold' in self._feedback or 'switch on' in self._feedback): mid = self._realtime_data + _VAR new_lower_bound = mid - _RANGE new_upper_bound = mid + _RANGE return self._cut(new_lower_bound), self._cut(new_upper_bound) async def get_targets(self) -> float: current_lower_target = self._current_targets['temperatureMin'].loc[self._quarter_time] current_upper_target = self._current_targets['temperatureMax'].loc[self._quarter_time] if np.isnan(current_lower_target): current_lower_target = 23.0 if np.isnan(current_upper_target): current_upper_target = 25.0 return (current_lower_target + current_upper_target) / 2 async def readjust_current(self, current: float, diff: float) -> float: _RANGE = 2 new_target = current if np.isnan(self._realtime_data): new_target += diff else: if self._season == Season.cooling: standard = current + 1.0 elif self._season == Season.heating: standard = current - 1.0 else: standard = current if (diff > 0 and self._realtime_data + _RANGE > standard or diff < 0 and self._realtime_data - _RANGE < standard): new_target += diff return new_target async def generate_global(self): _RANGE = 1 new_targets = self._current_targets['new_targets'].apply(lambda x: [self._cut(x - _RANGE), self._cut(x + _RANGE)]) time_index = self._current_targets.reset_index()['time'] result = {} for i in range(len(time_index)): result.update({time_index[i]: new_targets[i]}) self._results.update({'global_targets': result}) async def run(self): diff = await self.calculate_diff(TEMPERATURE_RELATED_FEEDBACK_WEIGHT) if diff != 0: if not self._is_customized: lower_bound, upper_bound = await self.init_temporary() await self.generate_temporary(lower_bound, upper_bound) else: current_target = await self.get_targets() new_target = await self.readjust_current(current_target, diff) if not self._is_temporary: self._results.update({'latest_change': new_target}) await self.readjust_global(new_target, self._previous_changes) await self.generate_global() else: await self.generate_temporary(self._cut(new_target) - 1.0, self._cut(new_target + 1.0)) else: return class Co2TargetController(TargetController): def __init__( self, realtime_data: float, feedback: Dict, is_customized: bool, is_temporary: bool, current_targets: pd.DataFrame, previous_changes: Optional[pd.DataFrame] = None ) -> None: super(Co2TargetController, self).__init__( realtime_data, feedback, is_customized, is_temporary, current_targets ) self._previous_changes = previous_changes @staticmethod def _cut(value: float) -> float: _UPPER_LIMIT = 1000.0 value = min(value, _UPPER_LIMIT) return value async def init_temporary(self) -> float: new_target = 1000 diff = await self.calculate_diff(CO2_RELATED_FEEDBACK_WEIGHT) if not np.isnan(self._realtime_data): new_target += diff return self._cut(new_target) async def get_targets(self) -> float: current_upper_target = self._current_targets['co2Max'].loc[self._quarter_time] if np.isnan(current_upper_target): current_upper_target = 500.0 return current_upper_target async def readjust_current(self, lower: float, upper: float, diff: float) -> float: new_target = upper - lower if np.isnan(self._realtime_data): new_target += diff else: if (diff > 50 and self._realtime_data + 100 > upper or diff < -50 and self._realtime_data - 100 < upper): new_target = self._realtime_data + diff return self._cut(new_target) async def generate_global(self): new_targets = self._current_targets['new_targets'].apply(lambda x: [0, x]) time_index = self._current_targets.reset_index()['time'] result = {} for i in range(len(time_index)): result.update({time_index[i]: new_targets[i]}) self._results.update({'global_targets': result}) async def run(self): diff = await self.calculate_diff(CO2_RELATED_FEEDBACK_WEIGHT) if diff != 0: if not self._is_customized: upper_bound = await self.init_temporary() await self.generate_temporary(0, upper_bound) else: current_upper = await self.get_targets() upper_bound = await self.readjust_current(0, current_upper, diff) if not self._is_temporary: self._results.update({'latest_change': upper_bound}) await self.readjust_global(upper_bound, self._previous_changes) await self.generate_global() else: await self.generate_temporary(0, upper_bound) else: return @logger.catch() async def readjust_all_target(project_id: str, space_id: str, wechat_time: str): start = time.perf_counter() async with AsyncClient() as client: transfer = SpaceInfoService(client, project_id, space_id) platform = DataPlatformService(client, project_id) nl = '\n' realtime_temperature = await platform.get_realtime_temperature(space_id) logger.debug(f'realtime temperature: {realtime_temperature}') current_targets = await transfer.get_custom_target() logger.debug(f'current targets: {nl}{current_targets}') feedback = await transfer.get_feedback(wechat_time) feedback_for_log = {k: v for k, v in feedback.items() if v > 0} logger.debug(f'feedback: {feedback_for_log}') is_customized = await transfer.is_customized() logger.debug(f'is customized: {is_customized}') is_temporary = await transfer.is_temporary() logger.debug(f'is temporary: {is_temporary}') season = await transfer.get_season() logger.debug(f'season: {season}') previous_changes = await transfer.env_database_get() temperature_changes = previous_changes.get('temperature') logger.debug(f'temperature previous changes: {nl}{temperature_changes}') logger.debug(f'request web service elapsed: {time.perf_counter() - start}') if feedback.get('switch off') and feedback.get('switch off') > 0: need_switch_off = True for item in SWITCH_RELATED_FEEDBACK: if feedback.get(item) and feedback.get(item) > 0: need_switch_off = False break else: need_switch_off = False need_run_room_control = False if need_switch_off: async with AsyncClient() as client: transfer = SpaceInfoService(client, project_id, space_id) await transfer.set_temporary_custom() return need_run_room_control temperature_results = {} for item in TEMPERATURE_RELATED_FEEDBACK: if feedback.get(item) and feedback.get(item) > 0: temperature_controller = TemperatureTargetController( realtime_temperature, feedback, is_customized, is_temporary, current_targets[['temperatureMin', 'temperatureMax']].copy(), season, previous_changes['temperature'] ) await temperature_controller.run() temperature_results = temperature_controller.get_results() break if temperature_results: need_run_room_control = True async with AsyncClient() as client: transfer = SpaceInfoService(client, project_id, space_id) if temperature_results.get('temporary_targets'): temporary_targets = temperature_results.get('temporary_targets') logger.debug(f'temperature temporary targets: {nl}{temporary_targets}') await transfer.set_custom_target('temperature', temperature_results.get('temporary_targets'), '0') if temperature_results.get('global_targets'): global_targets = temperature_results.get('global_targets') logger.debug(f'temperature global targets: {nl}{global_targets}') await transfer.set_custom_target('temperature', temperature_results.get('global_targets'), '1') if temperature_results.get('latest_change'): latest_change = temperature_results.get('latest_change') logger.debug(f'temperature latest change: {latest_change}') await transfer.env_database_set('temperature', temperature_results.get('latest_change')) return need_run_room_control