import datetime import json,pymysql import os import time from MyUtils.ZillionUtil import ZillionUtil from MyUtils.MysqlUtils import MysqlUtils from MyUtils.Dingtalk import send_message import pytz import multiprocessing INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values " DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' " SELETE_COUNTLASTDATA_SQL = "SELECT count(*) FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'" SELETE_SUMLASTDATA_SQL = "SELECT SUM(energy_ac_terminal)+SUM(energy_heating)+SUM(energy_cooling)+SUM(energy_light)+sum(energy_others) as last_data FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'" def datetime_now(): # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #容器时间 # tz = pytz.timezone('Asia/Shanghai') # 东八区 datetime_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') return datetime_now # 获取能耗数据 def get_data_time(zillionUtil,hbase_database, hbase_table, building, from_time, to_time): Criteria = { "building": building, "energyModelSign": building + "EM01", "energyModelNodeSign": { "in":[ "EI10102010101001", "EI10102010102001", "EI1010202001", "EI1001" ] }, "data_time": { "$gte": from_time, "$lt": to_time } } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas # # mysql插入数据 # def insert_mysql(sqls,building,my_table): # print("%s,开始往mysql插入%s的%s数据..."%(datetime_now(),building,my_table)) # for i in range(0, len(sqls), 1000): # sqlranges = sqls[i:i + 1000] # sqlranges = INSERT_SQL % (my_database,my_table) + ",".join(sqlranges) # MysqlUtil.update(sqlranges) # print("%s,mysql数据%s,项目%s插入成功,合计%s条..." % (datetime_now(),my_table,building,len(sqls))) # with open("config.json", "r") as f: # data = json.load(f) # hbase_database = data["metadata"]["database"] # url = data["metadata"]["url"] # building = data["building"] # mysql = data["mysql"] # my_database = mysql["database"] # dingding = data["dingding"] # at_mobiles = data["at_mobiles"] hbase_database = os.getenv("hbase_database") url = os.getenv("url") building = os.getenv("building") dingding = os.getenv("dingding") at_mobiles = os.getenv("at_mobiles") mysql = { "host": os.getenv("host"), "port": os.getenv("port"), "user": os.getenv("user"), "passwd": os.getenv("passwd"), "database": os.getenv("database") } my_database = os.getenv("database") def check_energy(): while True: project_id = "Pj" + building # 容器里获取时间 today = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')) # 获取星期几0~6 0代表星期一 weekday = today.weekday() if weekday == 0: time_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S') if time_now == "09:00:00": ##查询本周能耗和上周能耗 lastmonday = today - datetime.timedelta(days=weekday + 7) conn = pymysql.connect(**mysql) mysql_cur = conn.cursor() count_sql = SELETE_COUNTLASTDATA_SQL % (project_id, lastmonday.strftime("%Y%m%d"), today.strftime("%Y%m%d")) mysql_cur.execute(count_sql) lastdatacount = mysql_cur.fetchall()[0][0] print("上周能耗天数%s"%lastdatacount) if lastdatacount == 7: lastdata_sql = SELETE_SUMLASTDATA_SQL % ( project_id, lastmonday.strftime("%Y%m%d"), today.strftime("%Y%m%d")) mysql_cur.execute(lastdata_sql) lastdata = mysql_cur.fetchall()[0][0] last_lastmonday = lastmonday - datetime.timedelta(days=weekday + 7) last_lastdata_sql = SELETE_SUMLASTDATA_SQL % ( project_id, last_lastmonday.strftime("%Y%m%d"), lastmonday.strftime("%Y%m%d")) mysql_cur.execute(last_lastdata_sql) last_lastdata = mysql_cur.fetchall()[0][0] message = "【博锐周报数据】 %s,上上周能耗数据:%s,上周能耗数据:%s %s" % (datetime_now(), last_lastdata, lastdata, at_mobiles) print(message) send_message(message, dingding, at_mobiles) else: message = "【博锐周报数据】 %s,上周能耗数据有缺失,请及时核对!!!%s" % (datetime_now(), at_mobiles) print(message) send_message(message, dingding, at_mobiles) mysql_cur.close() conn.close() time.sleep(2) def transfer_energy(): print("同步%s项目数据"%(building)) while True: project_id = "Pj" + building time_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S') # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新 am_set_time = "08:00:00" pm_set_time = "22:00:00" if time_now == am_set_time or time_now ==pm_set_time:#此处设置每天定时的时间 today = datetime.date.today().strftime("%Y%m%d") yesterday = (datetime.date.today()-datetime.timedelta(days=1)).strftime("%Y%m%d") # #连接hbase zillionUtil = ZillionUtil(url) # #连接hbase MysqlUtil = MysqlUtils(**mysql) datas = get_data_time(zillionUtil,hbase_database, "data_energydata_1d", building, yesterday, today) if datas: # 删除上月数据 print("%s,开始删除%s的数据..."%(datetime_now(),yesterday)) delete_sql = DELETE_SQL% (project_id,yesterday,today) MysqlUtil.update(delete_sql) # sqls = [] energy_cooling = "0" energy_heating = "0" energy_ac_terminal = "0" energy_light = "0" sum_data_value = "0" for i in datas: if i["energyModelNodeSign"] == "EI1001": sum_data_value = i["data_value"] #冷热源 if i["energyModelNodeSign"] == "EI10102010101001": energy_cooling = i["data_value"] # #热 # if i["energyModelNodeSign"] == "EI1010201010101001": # energy_heating = i["data_value"] #空调末端 if i["energyModelNodeSign"] == "EI10102010102001": energy_ac_terminal = i["data_value"] #照明 if i["energyModelNodeSign"] == "EI1010202001": energy_light = i["data_value"] energy_other = float(sum_data_value) - float(energy_cooling) - float(energy_heating) -float(energy_light) - float(energy_ac_terminal) sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( project_id, yesterday,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_other, datetime_now(), datetime_now()) inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql print("%s,开始插入数据..."%datetime_now()) MysqlUtil.update(inser_sql) else: print("%s,没有查询到数据...") time.sleep(2) print("%s,等待下次程序执行" % (datetime_now())) if __name__ == '__main__': check_energy_process = multiprocessing.Process(target=check_energy) transfer_energy_process = multiprocessing.Process(target=transfer_energy) check_energy_process.start() transfer_energy_process.start()