123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import datetime
- import json,pymysql
- import os
- import time
- from MyUtils.ZillionUtil import ZillionUtil
- from MyUtils.MysqlUtils import MysqlUtils
- from MyUtils.DateUtils import get_day
- from MyUtils.Dingtalk import send_message
- import pytz
- import multiprocessing
- INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values "
- DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' "
- SELETE_COUNTLASTDATA_SQL = "SELECT count(*) FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
- SELETE_SUMLASTDATA_SQL = "SELECT SUM(energy_ac_terminal)+SUM(energy_heating)+SUM(energy_cooling)+SUM(energy_light)+sum(energy_others) as last_data FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
- def datetime_now():
- # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
- #容器时间
- # tz = pytz.timezone('Asia/Shanghai') # 东八区
- datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
- pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
- return datetime_now
- # 获取能耗数据
- def get_data_time(zillionUtil,hbase_database, hbase_table, building, from_time, to_time):
- Criteria = {
- "building": building,
- "energyModelSign": building + "EM01",
- "energyModelNodeSign": {
- "in":[
- "EI10102010101001",
- "EI10102010102001",
- "EI1010202001",
- "EI1001"
- ]
- },
- "data_time": {
- "$gte": from_time,
- "$lt": to_time
- }
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- return datas
- with open("config.json", "r") as f:
- data = json.load(f)
- hbase_database = data["metadata"]["database"]
- url = data["metadata"]["url"]
- building = data["building"]
- mysql = data["mysql"]
- my_database = mysql["database"]
- dingding = data["dingding"]
- at_mobiles = data["at_mobiles"]
- print("同步%s项目数据 " %(building))
- project_id = "Pj" + building
- time_now = datetime.datetime.fromtimestamp(int(time.time()),
- pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
- # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
- start_time = "20221213000000"
- end_time = "20221220000000"
- range_days = get_day(start_time,end_time)
- for i in range_days:
- yesterday,today = i[0],i[1]
- yesterday = yesterday[0:8]
- today = today[0:8]
- # #连接hbase
- zillionUtil = ZillionUtil(url)
- # #连接hbase
- MysqlUtil = MysqlUtils(**mysql)
- datas = get_data_time(zillionUtil ,hbase_database, "data_energydata_1d", building, yesterday, today)
- if datas:
- # 删除上月数据
- print("%s,开始删除%s的数据... " %(datetime_now() ,yesterday))
- delete_sql = DELETE_SQL% (project_id ,yesterday ,today)
- MysqlUtil.update(delete_sql)
- # sqls = []
- energy_cooling = "0"
- energy_heating = "0"
- energy_ac_terminal = "0"
- energy_light = "0"
- sum_data_value = "0"
- for i in datas:
- if i["energyModelNodeSign"] == "EI1001":
- sum_data_value = i["data_value"]
- # 冷热源
- if i["energyModelNodeSign"] == "EI10102010101001":
- energy_cooling = i["data_value"]
- # #热
- # if i["energyModelNodeSign"] == "EI1010201010101001":
- # energy_heating = i["data_value"]
- # 空调末端
- if i["energyModelNodeSign"] == "EI10102010102001":
- energy_ac_terminal = i["data_value"]
- # 照明
- if i["energyModelNodeSign"] == "EI1010202001":
- energy_light = i["data_value"]
- energy_other = float(sum_data_value) - float(energy_cooling) - float(energy_heating) -float(
- energy_light) - float(energy_ac_terminal)
- sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
- project_id, yesterday, energy_cooling, energy_heating, energy_ac_terminal, energy_light, energy_other,
- datetime_now(), datetime_now())
- inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
- print("%s,开始插入数据..." % datetime_now())
- MysqlUtil.update(inser_sql)
- else:
- print("%s,没有查询到数据...")
- time.sleep(2)
- print("%s,等待下次程序执行" % (datetime_now()))
|