123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- import datetime
- import json
- import time
- from MyUtils.ZillionUtil import ZillionUtil
- from MyUtils.MysqlUtils import MysqlUtils
- import pytz
- INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values "
- DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' "
- def datetime_now():
- # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
- #容器时间
- # tz = pytz.timezone('Asia/Shanghai') # 东八区
- datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
- pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
- return datetime_now
- # 获取能耗数据
- def get_data_time(zillionUtil,hbase_database, hbase_table, building,meters, from_time, to_time):
- Criteria = {
- "building": building,
- "meter": {
- "in":meters
- },
- "funcid": 10101,
- "data_time": {
- "$gte": from_time,
- "$lt": to_time
- }
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- return datas
- # 获取点位表
- def get_pointlists(zillionUtil,hbase_database, hbase_table, building):
- Criteria = {
- "building": building,
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- return datas
- with open("config.json", "r") as f:
- data = json.load(f)
- hbase_database = data["metadata"]["database"]
- url = data["metadata"]["url"]
- buildings = data["building"]
- set_time = data["set_time"]
- mysql = data["mysql"]
- my_database = mysql["database"]
- print("同步项目列表:%s"%buildings)
- print(set_time)
- while True:
- time_now = datetime.datetime.fromtimestamp(int(time.time()),
- pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
- # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
- if time_now == set_time:
- # #连接hbase
- zillionUtil = ZillionUtil(url)
- # #连接hbase
- MysqlUtil = MysqlUtils(**mysql)
- for building in buildings:
- pointlists = get_pointlists(zillionUtil,hbase_database,"dy_pointlist",building)
- meters_10101 = []
- for i in pointlists:
- if i["funcid"] == 10101:
- meters_10101.append(i["meter"])
- print("%s,%s项目点位列表%s"%(datetime_now(),building,meters_10101))
- project_id = "Pj" + building
- yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y%m%d")
- today = datetime.date.today().strftime("%Y%m%d")
- datas = get_data_time(zillionUtil,hbase_database,"data_servicedata_1d",building,meters_10101,yesterday,today)
- if datas:
- data_values = [float(i["data_value"]) for i in datas]
- total_energy = sum(data_values)
- sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
- project_id, yesterday,"0","0","0","0",total_energy, datetime_now(), datetime_now())
- print("%s,开始删除项目%s在%s的数据..." % (datetime_now(), building,yesterday))
- delete_sql = DELETE_SQL % (project_id, yesterday, today)
- MysqlUtil.update(delete_sql)
- inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
- print("%s,开始插入项目%s在%s的数据..." % (datetime_now(),building,yesterday))
- MysqlUtil.update(inser_sql)
- print("%s,%s项目的%s数据插入成功..."%(datetime_now(),building,yesterday))
- else:
- print("%s,未查询到项目%s在%s的数据..."%(datetime_now(),building,yesterday))
- else:
- time.sleep(2)
- print("%s,等待下次程序执行" % (datetime_now()))
- MysqlUtil.close()
|