from MyUtils.MysqlUtils import MysqlUtils from MyUtils.ZillionUtil import ZillionUtil from MyUtils.Dingtalk import send_message_markdown import datetime import pytz import json import time import os SELETE_SQL = "SELECT infos FROM `object_data_equipment_infos` WHERE `category` LIKE '%ACAT%'" INSERT_SQL = "replace into sagacloud_setup.zj_check_EquipSwitch(EquipLocalID,EquipName,EquipSwitchSet,RunStatus,remark) values " DELETE_SQL = "truncate table sagacloud_setup.zj_check_equipswitch" def datetime_now(): # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #容器时间 # tz = pytz.timezone('Asia/Shanghai') # 东八区 datetime_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') return datetime_now # 获取能耗数据 def get_data_time(zillionUtil,hbase_database, hbase_table, building, meter,funid1,funid2,timedelta_5min): Criteria = { "building": building, "meter": meter, "funcid": { "in":[ funid1, funid2 ] }, "receivetime": { "$gte": timedelta_5min } } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas with open("config.json", "r") as f: data = json.load(f) hbase_database = data["metadata"]["database"] url = data["metadata"]["url"] mysql = data["mysql"] my_database = mysql["database"] dingding = data["dingding"] sleep = data["sleep"] # mysql = { # "host": os.getenv("host"), # "port": int(os.getenv("port")), # "user": os.getenv("user"), # "passwd": os.getenv("passwd"), # "database": os.getenv("database") # } # database = os.getenv("database") # hbase_database = os.getenv("hbase_database") # url = os.getenv("url") # sleep = os.getenv("sleep") # dingding = os.getenv("dingding") timedelta_5min = (datetime.datetime.now() - datetime.timedelta(minutes=5)).strftime(f"%Y%m%d%H%M") current_month = datetime.datetime.now().strftime("%Y%m") while True: # #连接hbase zillionUtil = ZillionUtil(url) # #连接mysql MysqlUtil = MysqlUtils(**mysql) object_data_infos = MysqlUtil.query(SELETE_SQL) sqls = [] for info in object_data_infos: if "EquipSwitchSet" in json.loads(info[0]) and "RunStatus" in json.loads(info[0]): info_data = json.loads(info[0]) EquipSwitchSet = info_data["EquipSwitchSet"] RunStatus = info_data["RunStatus"] EquipLocalID = info_data["EquipLocalID"] EquipName = info_data["EquipName"] if EquipSwitchSet != RunStatus: EquipSwitchSet_meter,EquipSwitchSet_funcid = str.split(EquipSwitchSet,"-")[0],str.split(EquipSwitchSet,"-")[1] RunStatus_meter,RunStatus_funcid = str.split(RunStatus,"-")[0],str.split(RunStatus,"-")[1] datas = get_data_time(zillionUtil, hbase_database, "original_month_"+current_month, "3301100002",EquipSwitchSet_meter,eval(EquipSwitchSet_funcid),eval(RunStatus_funcid),timedelta_5min) EquipSwitchSet_data_list = [] RunStatus_data_list = [] for i in datas: if str(i["funcid"]) == EquipSwitchSet_funcid: EquipSwitchSet_data_list.append(i["data"]) else: RunStatus_data_list.append(i["data"]) if EquipSwitchSet_funcid and RunStatus_data_list: diff_data = set(EquipSwitchSet_data_list) | set(RunStatus_data_list) if len(set(EquipSwitchSet_data_list)) == 1 and len(diff_data) == 2: remark = "%s %s设定值与运行状态不一致,检查时间%s"%(datetime_now(),EquipSwitchSet_meter,timedelta_5min) print(remark) sql = "('%s','%s','%s','%s','%s')" % (EquipLocalID,EquipName,EquipSwitchSet,RunStatus,remark) sqls.append(sql) else: remark = "%s %s数据有丢失,检查时间%s"%(datetime_now(),EquipSwitchSet_meter,timedelta_5min) print(remark) sql = "('%s','%s','%s','%s','%s')" % (EquipLocalID, EquipName, EquipSwitchSet, RunStatus, remark) sqls.append(sql) if sqls: print("%s 清空数据库表"%datetime_now()) MysqlUtil.update(DELETE_SQL) print("%s 数据库表清空完成"%datetime_now()) print("%s 开始往数据库插入数据,共计%s条"%(datetime_now(),len(sqls))) for i in range(0,len(sqls),1000): sqlranges = sqls[i:i + 1000] sqlranges = INSERT_SQL + ",".join(sqlranges) MysqlUtil.update(sqlranges) print("%s 数据库插入成功"%datetime_now()) messages = [] message = (" - **类型:** 空调疑似掉线 \n" + " - **数量:** %s \n")% (len(sqls)) messages.append(message) messages.append("___") messages.append("详情查看MySQL数据库sagacloud_setup里的zj_check_equipswitch表") messages_info = "\n".join(messages) title = "## **【之江实验室】空调控制状态报警** " messages_info = title + "\n" + "___" + "\n" + messages_info send_message_markdown(title, messages_info, dingding) time.sleep(int(sleep)) MysqlUtil.close()