123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import json
- from MyUtils.ZillionUtil import ZillionUtil
- from MyUtils.DateUtils import get_day
- import datetime
- import pymysql
- import pytz
- import time
- INSERT_SQL = "replace into %s.energy_week_day(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values"
- SELECT_SQL = "select c_building from energy_survey.t_research_bdratio_compute"
- DELETE_SQL = "DELETE FROM %s.energy_week_day WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s';"
- # 获取能耗数据
- def get_hbase_data(hbase_database, hbase_table, buildingid, energyModelNodeSign, from_time, to_time):
- Criteria = {
- "building": buildingid,
- "energyModelNodeSign": energyModelNodeSign,
- "energyModelSign": buildingid + "EM01",
- "data_time": {
- "$gte": from_time,
- "$lt": to_time
- }
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- return datas
- #计算建筑的能耗和
- def sum_values(hbase_database, hbase_table, buildinglist, energyModelNodeSign, day_start, day_end):
- data_values = []
- for building in buildinglist:
- datas = get_hbase_data(hbase_database, hbase_table, building, energyModelNodeSign, day_start, day_end)
- for i in datas:
- data_value = i["data_value"]
- data_values.append(data_value)
- if data_values:
- sum_data_value = sum(data_values)
- return sum_data_value
- # 当前时间
- # datetimenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M")
- #容器里python获取时间少8小时
- tz = pytz.timezone('Asia/Shanghai') #东八区
- datetimenow = datetime.datetime.fromtimestamp(int(time.time()),
- pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
- now_time = datetime.datetime.now().strftime("%Y%m%d") + "000000"
- end_time = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y%m%d")
- # 读取配置文件信息
- with open("config.json", "r") as f:
- data = json.load(f)
- hbase_database = data["metadata"]["database"]
- url = data["metadata"]["url"]
- project = data["building"]["id"]
- energyModelNodeSigns = data["building"]["energyModels"]
- mysql = data["mysql"]
- database = mysql["database"]
- with open("config-time.json", "r") as fp:
- data_t = json.load(fp)
- from_time = data_t["from_time"]
- to_time = data_t["to_time"]
- hbase_table = "data_energydata_1d"
- if to_time == "now":
- to_time = end_time + "000000"
- days = get_day(from_time, to_time)
- try:
- # 连接mysql
- conn = pymysql.connect(**mysql)
- mysql_cur = conn.cursor()
- # 获取建筑列表
- mysql_cur.execute(SELECT_SQL)
- buildinglist = mysql_cur.fetchall()
- buildinglist = [i[0] for i in buildinglist]
- # #连接hbase
- zillionUtil = ZillionUtil(url)
- sqls = []
- for day in days:
- energy_cooling = "NULL"
- energy_heating = "NULL"
- energy_ac_terminal = "NULL"
- energy_light = "NULL"
- sum_value_data = "NULL"
- day_start, day_end = day[0], day[1]
- m_day_start, m_day_end = day[0][0:8], day[1][0:8]
- date = day_start[0:8]
- print("开始查询%s到%s的数据" % (m_day_start, m_day_end))
- for energyModelNodeSign in energyModelNodeSigns:
- #冷
- if energyModelNodeSign == "EI10102010101001":
- #计算建筑的能耗和
- energy_cooling = sum_values(hbase_database, hbase_table, buildinglist, energyModelNodeSign, day_start, day_end)
- #热
- if energyModelNodeSign == "EI10102010102001":
- energy_heating = sum_values(hbase_database, hbase_table, buildinglist, energyModelNodeSign, day_start,day_end)
- #空调末端
- if energyModelNodeSign == "EI10102010101001":
- energy_ac_terminal = sum_values(hbase_database, hbase_table, buildinglist, energyModelNodeSign, day_start,day_end)
- #照明
- if energyModelNodeSign == "EI1010202001":
- energy_light = sum_values(hbase_database, hbase_table, buildinglist, energyModelNodeSign, day_start,day_end)
- #总
- if energyModelNodeSign == "EI1001":
- sum_value_data = sum_values(hbase_database, hbase_table, buildinglist, energyModelNodeSign, day_start,day_end)
- if sum_value_data != None:
- if energy_cooling == None:
- energy_cooling = 0
- if energy_heating == None:
- energy_heating = 0
- if energy_ac_terminal == None:
- energy_ac_terminal = 0
- if energy_light == None:
- energy_light = 0
- energy_other = sum_value_data-energy_cooling-energy_heating-energy_ac_terminal-energy_light
- sqlline = "('%s','%s',%s,%s,%s,%s,%s,'%s','%s')" % (
- project, date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_other, datetimenow, datetimenow)
- sqls.append(sqlline)
- else:
- print("未查询到%s总电耗数据"%(date))
- print("开始删除%s到%s的数据" % (from_time, to_time))
- mysql_cur.execute(DELETE_SQL % (database, project, from_time[0:8], to_time[0:8]))
- conn.commit()
- # mysql插入数据
- print("开始往mysql插入数据...")
- #取出最后一条数据的时间
- last_data_time = eval(sqls[-1].split(",")[1])
- last_data_time = str(last_data_time)+"000000"
- for i in range(0, len(sqls), 1000):
- sqlranges = sqls[i:i + 1000]
- sqlranges = INSERT_SQL % (database) + ",".join(sqlranges)
- mysql_cur.execute(sqlranges)
- conn.commit()
- mysql_cur.close()
- conn.close()
- print("mysql数据插入成功,合计%s条..." % len(sqls))
- # 更新config文件时间
- print("更新config-time.json文件时间")
- data_t["from_time"] = last_data_time
- with open("config-time.json", "w") as f:
- json.dump(data_t, f)
- except Exception as e:
- print(e)
|