main.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # -*- coding: utf-8 -*-
  2. """
  3. main
  4. ~~~~
  5. The module handles the main procedure of fan coil unit alarm test.
  6. """
  7. import json
  8. import logging.config
  9. from datetime import datetime
  10. import yaml
  11. from apscheduler.schedulers.blocking import BlockingScheduler
  12. import data_platform_api
  13. from alarm import FanCoilUnitAlarm
  14. with open('logger_config.yaml', 'r') as f:
  15. config = yaml.safe_load(f.read())
  16. logging.config.dictConfig(config)
  17. logger = logging.getLogger(__name__)
  18. def get_alarm_params(project_id, equip_id):
  19. real_time_params = [
  20. 'WaterValveSwitchStatus',
  21. 'ReturnAirTemp',
  22. 'SupplyTemp',
  23. 'WaterOutTemp',
  24. 'WaterInTemp',
  25. ]
  26. real_time_data = data_platform_api.query_real_time_data(project_id, equip_id, real_time_params)
  27. history_params = [
  28. 'WaterValveSwitchStatus',
  29. 'FanGear'
  30. ]
  31. history_data = dict()
  32. for code in history_params:
  33. temp_data = data_platform_api.query_history_time_data(project_id, equip_id, code, 15)
  34. history_data.update({code: temp_data})
  35. params_dic = {
  36. 'water_valve_switch': real_time_data.get('WaterValveSwitchStatus'),
  37. 'water_valve_history_status': history_data.get('WaterValveSwitchStatus'),
  38. 'tap_history_status': history_data.get('FanGear'),
  39. 'return_air_temp': real_time_data.get('ReturnAirTemp'),
  40. 'supply_air_temp': real_time_data.get('SupplyTemp'),
  41. 'water_out_temp': real_time_data.get('WaterOutTemp'),
  42. 'water_in_temp': real_time_data.get('WaterInTemp'),
  43. }
  44. return params_dic
  45. def guard():
  46. project_id = 'Pj3101150005'
  47. devices = [
  48. 'Eq3101150005e8f64436c90b401ba63cf3cc5cdf5cc9',
  49. 'Eq3101150005d9111aa8ae5047548ee25f272a9833ea'
  50. ]
  51. for device in devices:
  52. alarm_params = get_alarm_params(project_id, device)
  53. # logger.info(json.dumps(alarm_params, indent=4))
  54. annunciator = FanCoilUnitAlarm(alarm_params)
  55. annunciator.procedure()
  56. if annunciator.get('alarm_type') == 1:
  57. type_code = '001'
  58. if not data_platform_api.is_alarmed(project_id, device, type_code):
  59. data_platform_api.send_alarm(project_id, device, type_code, '智能诊断风侧脏堵')
  60. return
  61. def recover():
  62. project_id = 'Pj3101150005'
  63. devices = [
  64. 'Eq3101150005e8f64436c90b401ba63cf3cc5cdf5cc9',
  65. 'Eq3101150005d9111aa8ae5047548ee25f272a9833ea'
  66. ]
  67. for device in devices:
  68. alarm_info = data_platform_api.query_redis_alarm(project_id, device)
  69. for k, v in alarm_info.items():
  70. if (datetime.now() - datetime.strptime(v, '%Y%m%d%H%M%S')).total_seconds() >= 30 * 60:
  71. data_platform_api.recover_alarm(project_id, k)
  72. if __name__ == '__main__':
  73. scheduler = BlockingScheduler()
  74. scheduler.add_job(guard, 'interval', seconds=5)
  75. scheduler.add_job(recover, 'interval', minutes=15)
  76. try:
  77. scheduler.start()
  78. except (KeyboardInterrupt, SystemExit):
  79. pass