# -*- coding: utf-8 -*- from typing import Tuple import numpy as np from httpx import AsyncClient from loguru import logger from app.controllers.equipment.controller import EquipmentController from app.schemas.equipment import VAVBox, FCU from app.schemas.space import Space from app.services.platform import DataPlatformService from app.services.transfer import EquipmentInfoService, SpaceInfoService class VAVController(EquipmentController): def __init__(self, equipment: VAVBox): super(VAVController, self).__init__() self._equipment = equipment async def get_strategy(self): strategy = 'Plan A' for space in self._equipment.spaces: for eq in space.equipment: if isinstance(eq, FCU): strategy = 'Plan B' break return strategy async def get_temperature(self) -> Tuple[float, float]: target_list, realtime_list = [], [] buffer_list = [] strategy = await self.get_strategy() for space in self._equipment.spaces: if not np.isnan(space.temperature_target): target_list.append(space.temperature_target) realtime_list.append(space.realtime_temperature) if strategy == 'Plan B': for eq in space.equipment: if isinstance(eq, FCU): buffer = (4 - eq.air_valve_speed) / 4 buffer_list.append(buffer) break logger.info(f'target list: {target_list}') logger.info(f'realtime list: {realtime_list}') logger.info(f'buffer list: {buffer_list}') total_target = buffer_list + target_list total_realtime = buffer_list + realtime_list if total_target and total_realtime: target_result = np.array(total_target).sum() / len(target_list) realtime_result = np.array(total_realtime).sum() / len(realtime_list) self._equipment.setting_temperature = target_result else: target_result, realtime_result = np.NAN, np.NAN return target_result, realtime_result async def get_supply_air_flow_set(self) -> float: temperature_set, temperature_realtime = await self.get_temperature() if np.isnan(temperature_set) or np.isnan(temperature_realtime): supply_air_flow_set = 0.0 else: temperature_supply = self._equipment.supply_air_temperature if np.isnan(temperature_supply): temperature_supply = 19.0 logger.info(f'supply air flow: {self._equipment.supply_air_flow}') logger.info(f'supply air temperature: {temperature_supply}') logger.info(f'set temperature: {temperature_set}') logger.info(f'realtime temperature: {temperature_realtime}') supply_air_flow_set = self._equipment.supply_air_flow * ((temperature_supply - temperature_realtime) / (temperature_supply - temperature_set)) supply_air_flow_set = max(self._equipment.supply_air_flow_lower_limit, supply_air_flow_set) supply_air_flow_set = min(self._equipment.supply_air_flow_upper_limit, supply_air_flow_set) self._equipment.supply_air_flow_set = supply_air_flow_set return supply_air_flow_set async def run(self): await self.get_supply_air_flow_set() self._equipment.running_status = True def get_results(self): return self._equipment @logger.catch() async def get_vav_control_result(project_id: str, equipment_id: str) -> VAVBox: async with AsyncClient() as client: duo_duo = EquipmentInfoService(client, project_id) platform = DataPlatformService(client, project_id) _AHU_LIST = [ 'Eq1101050030846e0a94670842109f7c8d8db0d44cf5', 'Eq1101050030b6b2f1db3d6944afa71e213e0d45d565' ] realtime_supply_air_temperature_list = [] for eq in _AHU_LIST: realtime_supply_air_temperature_list.append(await platform.get_realtime_supply_air_temperature(eq)) realtime_supply_air_temperature = np.array(realtime_supply_air_temperature_list).mean() realtime_supply_air_flow = await platform.get_realtime_supply_air_flow(equipment_id) lower_limit, upper_limit = await platform.get_air_flow_limit(equipment_id) served_spaces = await duo_duo.get_space_by_equipment(equipment_id) space_objects = [] for sp in served_spaces: sp_id = sp.get('id') transfer = SpaceInfoService(client, project_id, sp_id) current_target = await transfer.get_current_temperature_target() realtime_temperature = await platform.get_realtime_temperature(sp_id) related_equipment = await transfer.get_equipment() equipment_objects = [] for eq in related_equipment: if eq.get('category') == 'ACATFC': speed = await platform.get_fan_speed(eq.get('id')) temp_fcu_params = {'id': eq.get('id'), 'air_valve_speed': speed} fcu = FCU(**temp_fcu_params) equipment_objects.append(fcu) temp_space_params = { 'id': sp_id, 'realtime_temperature': realtime_temperature, 'equipment': equipment_objects, 'temperature_target': current_target } space = Space(**temp_space_params) space_objects.append(space) temp_vav_params = { 'id': equipment_id, 'spaces': space_objects, 'supply_air_temperature': realtime_supply_air_temperature, 'supply_air_flow': realtime_supply_air_flow, 'supply_air_flow_lower_limit': lower_limit, 'supply_air_flow_upper_limit': upper_limit, } vav = VAVBox(**temp_vav_params) vav_controller = VAVController(vav) await vav_controller.run() regulated_vav = vav_controller.get_results() return regulated_vav