start.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. from MyUtils.MysqlUtils import MysqlUtils
  2. from MyUtils.ZillionUtil import ZillionUtil
  3. from MyUtils.Dingtalk import send_message_markdown
  4. import datetime
  5. import pytz
  6. import json
  7. import time
  8. import os
  9. SELETE_SQL = "SELECT infos FROM `object_data_equipment_infos` WHERE `category` LIKE '%ACAT%'"
  10. INSERT_SQL = "replace into sagacloud_setup.zj_check_EquipSwitch(EquipLocalID,EquipName,EquipSwitchSet,RunStatus,remark) values "
  11. DELETE_SQL = "truncate table sagacloud_setup.zj_check_equipswitch"
  12. def datetime_now():
  13. # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  14. #容器时间
  15. # tz = pytz.timezone('Asia/Shanghai') # 东八区
  16. datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
  17. pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
  18. return datetime_now
  19. # 获取能耗数据
  20. def get_data_time(zillionUtil,hbase_database, hbase_table, building, meter,funid1,funid2,timedelta_5min):
  21. Criteria = {
  22. "building": building,
  23. "meter": meter,
  24. "funcid": {
  25. "in":[
  26. funid1,
  27. funid2
  28. ]
  29. },
  30. "receivetime": {
  31. "$gte": timedelta_5min
  32. }
  33. }
  34. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  35. return datas
  36. with open("config.json", "r") as f:
  37. data = json.load(f)
  38. hbase_database = data["metadata"]["database"]
  39. url = data["metadata"]["url"]
  40. mysql = data["mysql"]
  41. my_database = mysql["database"]
  42. dingding = data["dingding"]
  43. sleep = data["sleep"]
  44. # mysql = {
  45. # "host": os.getenv("host"),
  46. # "port": int(os.getenv("port")),
  47. # "user": os.getenv("user"),
  48. # "passwd": os.getenv("passwd"),
  49. # "database": os.getenv("database")
  50. # }
  51. # database = os.getenv("database")
  52. # hbase_database = os.getenv("hbase_database")
  53. # url = os.getenv("url")
  54. # sleep = os.getenv("sleep")
  55. # dingding = os.getenv("dingding")
  56. timedelta_5min = (datetime.datetime.now() - datetime.timedelta(minutes=5)).strftime(f"%Y%m%d%H%M")
  57. current_month = datetime.datetime.now().strftime("%Y%m")
  58. while True:
  59. # #连接hbase
  60. zillionUtil = ZillionUtil(url)
  61. # #连接mysql
  62. MysqlUtil = MysqlUtils(**mysql)
  63. object_data_infos = MysqlUtil.query(SELETE_SQL)
  64. sqls = []
  65. for info in object_data_infos:
  66. if "EquipSwitchSet" in json.loads(info[0]) and "RunStatus" in json.loads(info[0]):
  67. info_data = json.loads(info[0])
  68. EquipSwitchSet = info_data["EquipSwitchSet"]
  69. RunStatus = info_data["RunStatus"]
  70. EquipLocalID = info_data["EquipLocalID"]
  71. EquipName = info_data["EquipName"]
  72. if EquipSwitchSet != RunStatus:
  73. EquipSwitchSet_meter,EquipSwitchSet_funcid = str.split(EquipSwitchSet,"-")[0],str.split(EquipSwitchSet,"-")[1]
  74. RunStatus_meter,RunStatus_funcid = str.split(RunStatus,"-")[0],str.split(RunStatus,"-")[1]
  75. datas = get_data_time(zillionUtil, hbase_database, "original_month_"+current_month, "3301100002",EquipSwitchSet_meter,eval(EquipSwitchSet_funcid),eval(RunStatus_funcid),timedelta_5min)
  76. EquipSwitchSet_data_list = []
  77. RunStatus_data_list = []
  78. for i in datas:
  79. if str(i["funcid"]) == EquipSwitchSet_funcid:
  80. EquipSwitchSet_data_list.append(i["data"])
  81. else:
  82. RunStatus_data_list.append(i["data"])
  83. if EquipSwitchSet_funcid and RunStatus_data_list:
  84. diff_data = set(EquipSwitchSet_data_list) | set(RunStatus_data_list)
  85. if len(set(EquipSwitchSet_data_list)) == 1 and len(diff_data) == 2:
  86. remark = "%s %s设定值与运行状态不一致,检查时间%s"%(datetime_now(),EquipSwitchSet_meter,timedelta_5min)
  87. print(remark)
  88. sql = "('%s','%s','%s','%s','%s')" % (EquipLocalID,EquipName,EquipSwitchSet,RunStatus,remark)
  89. sqls.append(sql)
  90. else:
  91. remark = "%s %s数据有丢失,检查时间%s"%(datetime_now(),EquipSwitchSet_meter,timedelta_5min)
  92. print(remark)
  93. sql = "('%s','%s','%s','%s','%s')" % (EquipLocalID, EquipName, EquipSwitchSet, RunStatus, remark)
  94. sqls.append(sql)
  95. if sqls:
  96. print("%s 清空数据库表"%datetime_now())
  97. MysqlUtil.update(DELETE_SQL)
  98. print("%s 数据库表清空完成"%datetime_now())
  99. print("%s 开始往数据库插入数据,共计%s条"%(datetime_now(),len(sqls)))
  100. for i in range(0,len(sqls),1000):
  101. sqlranges = sqls[i:i + 1000]
  102. sqlranges = INSERT_SQL + ",".join(sqlranges)
  103. MysqlUtil.update(sqlranges)
  104. print("%s 数据库插入成功"%datetime_now())
  105. messages = []
  106. message = (" - **类型:** 空调疑似掉线 \n" + " - **数量:** <font color=#FF0000>%s</font> \n")% (len(sqls))
  107. messages.append(message)
  108. messages.append("___")
  109. messages.append("<font color=#0080FF>详情查看MySQL数据库sagacloud_setup里的zj_check_equipswitch表</font>")
  110. messages_info = "\n".join(messages)
  111. title = "## **【之江实验室】空调控制状态报警** "
  112. messages_info = title + "\n" + "___" + "\n" + messages_info
  113. send_message_markdown(title, messages_info, dingding)
  114. time.sleep(int(sleep))
  115. MysqlUtil.close()