123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- # -*- coding: utf-8 -*-
- """
- main
- ~~~~
- The module handles the main procedure of fan coil unit alarm test.
- """
- import json
- import logging.config
- from datetime import datetime
- import yaml
- from apscheduler.schedulers.blocking import BlockingScheduler
- import data_platform_api
- from alarm import FanCoilUnitAlarm
- with open('logger_config.yaml', 'r') as f:
- config = yaml.safe_load(f.read())
- logging.config.dictConfig(config)
- logger = logging.getLogger(__name__)
- def get_alarm_params(project_id, equip_id):
- real_time_params = [
- 'WaterValveSwitchStatus',
- 'ReturnAirTemp',
- 'SupplyTemp',
- 'WaterOutTemp',
- 'WaterInTemp',
- ]
- real_time_data = data_platform_api.query_real_time_data(project_id, equip_id, real_time_params)
- history_params = [
- 'WaterValveSwitchStatus',
- 'FanGear'
- ]
- history_data = dict()
- for code in history_params:
- temp_data = data_platform_api.query_history_time_data(project_id, equip_id, code, 15)
- history_data.update({code: temp_data})
- params_dic = {
- 'water_valve_switch': real_time_data.get('WaterValveSwitchStatus'),
- 'water_valve_history_status': history_data.get('WaterValveSwitchStatus'),
- 'tap_history_status': history_data.get('FanGear'),
- 'return_air_temp': real_time_data.get('ReturnAirTemp'),
- 'supply_air_temp': real_time_data.get('SupplyTemp'),
- 'water_out_temp': real_time_data.get('WaterOutTemp'),
- 'water_in_temp': real_time_data.get('WaterInTemp'),
- }
- return params_dic
- def guard():
- project_id = 'Pj3101150005'
- devices = [
- 'Eq3101150005e8f64436c90b401ba63cf3cc5cdf5cc9',
- 'Eq3101150005d9111aa8ae5047548ee25f272a9833ea'
- ]
- for device in devices:
- alarm_params = get_alarm_params(project_id, device)
- # logger.info(json.dumps(alarm_params, indent=4))
- annunciator = FanCoilUnitAlarm(alarm_params)
- annunciator.procedure()
- if annunciator.get('alarm_type') == 1:
- type_code = '001'
- if not data_platform_api.is_alarmed(project_id, device, type_code):
- data_platform_api.send_alarm(project_id, device, type_code, '智能诊断风侧脏堵')
- return
- def recover():
- project_id = 'Pj3101150005'
- devices = [
- 'Eq3101150005e8f64436c90b401ba63cf3cc5cdf5cc9',
- 'Eq3101150005d9111aa8ae5047548ee25f272a9833ea'
- ]
- for device in devices:
- alarm_info = data_platform_api.query_redis_alarm(project_id, device)
- for k, v in alarm_info.items():
- if (datetime.now() - datetime.strptime(v, '%Y%m%d%H%M%S')).total_seconds() >= 30 * 60:
- data_platform_api.recover_alarm(project_id, k)
- if __name__ == '__main__':
- scheduler = BlockingScheduler()
- scheduler.add_job(guard, 'interval', seconds=5)
- scheduler.add_job(recover, 'interval', minutes=15)
- try:
- scheduler.start()
- except (KeyboardInterrupt, SystemExit):
- pass
|