import datetime import json import time from MyUtils.ZillionUtil import ZillionUtil from MyUtils.MysqlUtils import MysqlUtils from MyUtils.DateUtils import get_day import pytz INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values " DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' " def datetime_now(): # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #容器时间 # tz = pytz.timezone('Asia/Shanghai') # 东八区 datetime_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') return datetime_now # 获取能耗数据 def get_data_time(zillionUtil,hbase_database, hbase_table, building,meters, from_time, to_time): Criteria = { "building": building, "meter": { "in":meters }, "funcid": 10101, "data_time": { "$gte": from_time, "$lt": to_time } } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas # 获取点位表 def get_pointlists(zillionUtil,hbase_database, hbase_table, building): Criteria = { "building": building, } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas with open("config.json", "r") as f: data = json.load(f) hbase_database = data["metadata"]["database"] url = data["metadata"]["url"] buildings = data["building"] set_time = data["set_time"] mysql = data["mysql"] my_database = mysql["database"] print("同步项目列表:%s"%buildings) time_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S') # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新 start_time = "20221201000000" end_time = "20230110000000" range_days = get_day(start_time,end_time) # #连接hbase zillionUtil = ZillionUtil(url) # #连接hbase MysqlUtil = MysqlUtils(**mysql) for i in range_days: yesterday, today = i[0], i[1] yesterday = yesterday[0:8] today = today[0:8] for building in buildings: pointlists = get_pointlists(zillionUtil,hbase_database,"dy_pointlist",building) meters_10101 = [] for i in pointlists: if i["funcid"] == 10101: meters_10101.append(i["meter"]) print("%s,%s项目点位列表%s"%(datetime_now(),building,meters_10101)) project_id = "Pj" + building # yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y%m%d") # today = datetime.date.today().strftime("%Y%m%d") datas = get_data_time(zillionUtil,hbase_database,"data_servicedata_1d",building,meters_10101,yesterday,today) if datas: data_values = [float(i["data_value"]) for i in datas] total_energy = sum(data_values) sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % ( project_id, yesterday,"0","0","0","0",total_energy, datetime_now(), datetime_now()) print("%s,开始删除项目%s在%s的数据..." % (datetime_now(), building,yesterday)) delete_sql = DELETE_SQL % (project_id, yesterday, today) MysqlUtil.update(delete_sql) inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql print("%s,开始插入项目%s在%s的数据..." % (datetime_now(),building,yesterday)) MysqlUtil.update(inser_sql) print("%s,%s项目的%s数据插入成功..."%(datetime_now(),building,yesterday)) else: print("%s,未查询到项目%s在%s的数据..."%(datetime_now(),building,yesterday)) else: time.sleep(2) print("%s,等待下次程序执行" % (datetime_now())) MysqlUtil.close()