targets.py 14 KB


  1. # -*- coding: utf-8 -*-
  2. import time
  3. from abc import abstractmethod
  4. from typing import Dict, Tuple, Optional
  5. import arrow
  6. import numpy as np
  7. import pandas as pd
  8. from httpx import AsyncClient
  9. from loguru import logger
  10. from app.controllers.controller import Controller
  11. from app.resources.params import (
  12. TEMPERATURE_RELATED_FEEDBACK_WEIGHT,
  13. TEMPERATURE_RELATED_FEEDBACK,
  14. CO2_RELATED_FEEDBACK_WEIGHT,
  15. SWITCH_RELATED_FEEDBACK
  16. )
  17. from app.services.platform import DataPlatformService
  18. from app.services.transfer import SpaceInfoService, Season
  19. from app.utils.date import get_time_str, get_quarter_minutes, TIME_FMT
  20. class TargetController(Controller):
  21. def __init__(
  22. self,
  23. realtime_data: float,
  24. feedback: Dict,
  25. is_customized: bool,
  26. is_temporary: bool,
  27. current_targets: pd.DataFrame,
  28. ) -> None:
  29. super(TargetController, self).__init__()
  30. self._realtime_data = realtime_data
  31. self._feedback = feedback
  32. self._is_customized = is_customized
  33. self._is_temporary = is_temporary
  34. self._current_targets = current_targets
  35. self._now_time = arrow.get(get_time_str(), TIME_FMT).time().strftime('%H%M%S')
  36. self._quarter_time = get_quarter_minutes(get_time_str())
  37. async def calculate_diff(self, weight: Dict) -> float:
  38. related_feedback = [v for k, v in self._feedback.items() if k in weight]
  39. related_feedback = np.array(related_feedback)
  40. weight = np.array(list(weight.values()))
  41. feedback_count = related_feedback.sum()
  42. diff = 0
  43. if feedback_count > 0:
  44. diff = np.dot(related_feedback, weight) / feedback_count
  45. return diff
  46. @abstractmethod
  47. async def init_temporary(self):
  48. pass
  49. @abstractmethod
  50. async def get_targets(self) -> float:
  51. pass
  52. async def generate_temporary(self, lower, upper):
  53. now_str = get_time_str()
  54. time_index = arrow.get(arrow.get(now_str, TIME_FMT).shift(minutes=15).timestamp
  55. // (15 * 60) * (15 * 60)).time().strftime('%H%M%S')
  56. result = {time_index: [lower, upper]}
  57. self._results.update({'temporary_targets': result})
  58. async def readjust_global(self, latest_change: float, previous_changes: pd.DataFrame):
  59. previous_changes = pd.concat([
  60. pd.DataFrame({'timestamp': [self._now_time], 'value': [latest_change]}),
  61. previous_changes,
  62. ])
  63. previous_changes.reset_index(inplace=True)
  64. previous_changes['weight1'] = previous_changes['index'].apply(lambda x: (1 / (x + 1)) ** 3)
  65. new_targets = []
  66. time_index = self._current_targets.reset_index()['time']
  67. for item in time_index:
  68. previous_changes['delta'] = previous_changes['timestamp'].apply(
  69. lambda x: abs(arrow.get(str(x), 'HHmmss') - arrow.get(item, 'HHmmss')).seconds // (15 * 60)
  70. )
  71. previous_changes['weight2'] = previous_changes['delta'].apply(lambda x: 0.5 ** x)
  72. previous_changes['weight'] = previous_changes['weight1'] * previous_changes['weight2']
  73. new_targets.append(
  74. (previous_changes['value'] * previous_changes['weight']).sum() / previous_changes['weight'].sum()
  75. )
  76. self._current_targets['new_targets'] = new_targets
  77. @abstractmethod
  78. async def run(self):
  79. pass
  80. class TemperatureTargetController(TargetController):
  81. def __init__(
  82. self,
  83. realtime_data: float,
  84. feedback: Dict,
  85. is_customized: bool,
  86. is_temporary: bool,
  87. current_targets: pd.DataFrame,
  88. season: Season,
  89. previous_changes: Optional[pd.DataFrame] = None
  90. ) -> None:
  91. super(TemperatureTargetController, self).__init__(
  92. realtime_data,
  93. feedback,
  94. is_customized,
  95. is_temporary,
  96. current_targets
  97. )
  98. self._season = season
  99. self._previous_changes = previous_changes
  100. @staticmethod
  101. def _cut(value: float) -> float:
  102. _LOWER_LIMIT = 22.0
  103. _UPPER_LIMIT = 28.0
  104. value = min(value, _UPPER_LIMIT)
  105. value = max(value, _LOWER_LIMIT)
  106. return value
  107. async def init_temporary(self) -> Tuple[float, float]:
  108. _VAR = 2
  109. _RANGE = 1
  110. new_target = 24.0
  111. new_lower_bound, new_upper_bound = new_target - 1.0, new_target + 1.0
  112. if not np.isnan(self._realtime_data):
  113. if self._season == Season.cooling:
  114. if ('a little hot' in self._feedback
  115. or 'so hot' in self._feedback
  116. or 'switch on' in self._feedback):
  117. mid = self._realtime_data - _VAR
  118. new_lower_bound = mid - _RANGE
  119. new_upper_bound = mid + _RANGE
  120. elif self._season == Season.heating:
  121. if ('a little cold' in self._feedback
  122. or 'so cold' in self._feedback
  123. or 'switch on' in self._feedback):
  124. mid = self._realtime_data + _VAR
  125. new_lower_bound = mid - _RANGE
  126. new_upper_bound = mid + _RANGE
  127. return self._cut(new_lower_bound), self._cut(new_upper_bound)
  128. async def get_targets(self) -> float:
  129. current_lower_target = self._current_targets['temperatureMin'].loc[self._quarter_time]
  130. current_upper_target = self._current_targets['temperatureMax'].loc[self._quarter_time]
  131. if np.isnan(current_lower_target):
  132. current_lower_target = 23.0
  133. if np.isnan(current_upper_target):
  134. current_upper_target = 25.0
  135. return (current_lower_target + current_upper_target) / 2
  136. async def readjust_current(self, current: float, diff: float) -> float:
  137. _RANGE = 2
  138. new_target = current
  139. if np.isnan(self._realtime_data):
  140. new_target += diff
  141. else:
  142. if self._season == Season.cooling:
  143. standard = current + 1.0
  144. elif self._season == Season.heating:
  145. standard = current - 1.0
  146. else:
  147. standard = current
  148. if (diff > 0 and self._realtime_data + _RANGE > standard
  149. or diff < 0 and self._realtime_data - _RANGE < standard):
  150. new_target += diff
  151. return new_target
  152. async def generate_global(self):
  153. _RANGE = 1
  154. new_targets = self._current_targets['new_targets'].apply(lambda x: [self._cut(x - _RANGE),
  155. self._cut(x + _RANGE)])
  156. time_index = self._current_targets.reset_index()['time']
  157. result = {}
  158. for i in range(len(time_index)):
  159. result.update({time_index[i]: new_targets[i]})
  160. self._results.update({'global_targets': result})
  161. async def run(self):
  162. diff = await self.calculate_diff(TEMPERATURE_RELATED_FEEDBACK_WEIGHT)
  163. if diff != 0:
  164. if not self._is_customized:
  165. lower_bound, upper_bound = await self.init_temporary()
  166. await self.generate_temporary(lower_bound, upper_bound)
  167. else:
  168. current_target = await self.get_targets()
  169. new_target = await self.readjust_current(current_target, diff)
  170. if not self._is_temporary:
  171. self._results.update({'latest_change': new_target})
  172. await self.readjust_global(new_target, self._previous_changes)
  173. await self.generate_global()
  174. else:
  175. await self.generate_temporary(self._cut(new_target) - 1.0, self._cut(new_target + 1.0))
  176. else:
  177. return
  178. class Co2TargetController(TargetController):
  179. def __init__(
  180. self,
  181. realtime_data: float,
  182. feedback: Dict,
  183. is_customized: bool,
  184. is_temporary: bool,
  185. current_targets: pd.DataFrame,
  186. previous_changes: Optional[pd.DataFrame] = None
  187. ) -> None:
  188. super(Co2TargetController, self).__init__(
  189. realtime_data,
  190. feedback,
  191. is_customized,
  192. is_temporary,
  193. current_targets
  194. )
  195. self._previous_changes = previous_changes
  196. @staticmethod
  197. def _cut(value: float) -> float:
  198. _UPPER_LIMIT = 1000.0
  199. value = min(value, _UPPER_LIMIT)
  200. return value
  201. async def init_temporary(self) -> float:
  202. new_target = 1000
  203. diff = await self.calculate_diff(CO2_RELATED_FEEDBACK_WEIGHT)
  204. if not np.isnan(self._realtime_data):
  205. new_target += diff
  206. return self._cut(new_target)
  207. async def get_targets(self) -> float:
  208. current_upper_target = self._current_targets['co2Max'].loc[self._quarter_time]
  209. if np.isnan(current_upper_target):
  210. current_upper_target = 500.0
  211. return current_upper_target
  212. async def readjust_current(self, lower: float, upper: float, diff: float) -> float:
  213. new_target = upper - lower
  214. if np.isnan(self._realtime_data):
  215. new_target += diff
  216. else:
  217. if (diff > 50 and self._realtime_data + 100 > upper
  218. or diff < -50 and self._realtime_data - 100 < upper):
  219. new_target = self._realtime_data + diff
  220. return self._cut(new_target)
  221. async def generate_global(self):
  222. new_targets = self._current_targets['new_targets'].apply(lambda x: [0, x])
  223. time_index = self._current_targets.reset_index()['time']
  224. result = {}
  225. for i in range(len(time_index)):
  226. result.update({time_index[i]: new_targets[i]})
  227. self._results.update({'global_targets': result})
  228. async def run(self):
  229. diff = await self.calculate_diff(CO2_RELATED_FEEDBACK_WEIGHT)
  230. if diff != 0:
  231. if not self._is_customized:
  232. upper_bound = await self.init_temporary()
  233. await self.generate_temporary(0, upper_bound)
  234. else:
  235. current_upper = await self.get_targets()
  236. upper_bound = await self.readjust_current(0, current_upper, diff)
  237. if not self._is_temporary:
  238. self._results.update({'latest_change': upper_bound})
  239. await self.readjust_global(upper_bound, self._previous_changes)
  240. await self.generate_global()
  241. else:
  242. await self.generate_temporary(0, upper_bound)
  243. else:
  244. return
  245. @logger.catch()
  246. async def readjust_all_target(project_id: str, space_id: str, wechat_time: str):
  247. start = time.perf_counter()
  248. async with AsyncClient() as client:
  249. transfer = SpaceInfoService(client, project_id, space_id)
  250. platform = DataPlatformService(client, project_id)
  251. nl = '\n'
  252. realtime_temperature = await platform.get_realtime_temperature(space_id)
  253. logger.debug(f'realtime temperature: {realtime_temperature}')
  254. current_targets = await transfer.get_custom_target()
  255. logger.debug(f'current targets: {nl}{current_targets}')
  256. feedback = await transfer.get_feedback(wechat_time)
  257. feedback_for_log = {k: v for k, v in feedback.items() if v > 0}
  258. logger.debug(f'feedback: {feedback_for_log}')
  259. is_customized = await transfer.is_customized()
  260. logger.debug(f'is customized: {is_customized}')
  261. is_temporary = await transfer.is_temporary()
  262. logger.debug(f'is temporary: {is_temporary}')
  263. season = await transfer.get_season()
  264. logger.debug(f'season: {season}')
  265. previous_changes = await transfer.env_database_get()
  266. temperature_changes = previous_changes.get('temperature')
  267. logger.debug(f'temperature previous changes: {nl}{temperature_changes}')
  268. logger.debug(f'request web service elapsed: {time.perf_counter() - start}')
  269. if feedback.get('switch off') and feedback.get('switch off') > 0:
  270. need_switch_off = True
  271. for item in SWITCH_RELATED_FEEDBACK:
  272. if feedback.get(item) and feedback.get(item) > 0:
  273. need_switch_off = False
  274. break
  275. else:
  276. need_switch_off = False
  277. need_run_room_control = False
  278. if need_switch_off:
  279. async with AsyncClient() as client:
  280. transfer = SpaceInfoService(client, project_id, space_id)
  281. await transfer.set_temporary_custom()
  282. return need_run_room_control
  283. temperature_results = {}
  284. for item in TEMPERATURE_RELATED_FEEDBACK:
  285. if feedback.get(item) and feedback.get(item) > 0:
  286. temperature_controller = TemperatureTargetController(
  287. realtime_temperature,
  288. feedback,
  289. is_customized,
  290. is_temporary,
  291. current_targets[['temperatureMin', 'temperatureMax']].copy(),
  292. season,
  293. previous_changes['temperature']
  294. )
  295. await temperature_controller.run()
  296. temperature_results = temperature_controller.get_results()
  297. break
  298. if temperature_results:
  299. need_run_room_control = True
  300. async with AsyncClient() as client:
  301. transfer = SpaceInfoService(client, project_id, space_id)
  302. if temperature_results.get('temporary_targets'):
  303. temporary_targets = temperature_results.get('temporary_targets')
  304. logger.debug(f'temperature temporary targets: {nl}{temporary_targets}')
  305. await transfer.set_custom_target('temperature', temperature_results.get('temporary_targets'), '0')
  306. if temperature_results.get('global_targets'):
  307. global_targets = temperature_results.get('global_targets')
  308. logger.debug(f'temperature global targets: {nl}{global_targets}')
  309. await transfer.set_custom_target('temperature', temperature_results.get('global_targets'), '1')
  310. if temperature_results.get('latest_change'):
  311. latest_change = temperature_results.get('latest_change')
  312. logger.debug(f'temperature latest change: {latest_change}')
  313. await transfer.env_database_set('temperature', temperature_results.get('latest_change'))
  314. return need_run_room_control