123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- import json
- from MyUtils.ZillionUtil import ZillionUtil
- import pymysql
- import datetime
- import argparse
- INSERT_SQL = "replace into %s.%s(building,meter,func_id,data_time,data_value,dt) values "
- # 获取项目点位
- def get_pointlist(hbase_database, hbase_table, buildingid):
- Criteria = {
- "building": buildingid
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- pointlist = []
- for i in datas:
- point = []
- meter = i["meter"]
- funcid = i["funcid"]
- point.append(meter)
- point.append(funcid)
- pointlist.append(point)
- return pointlist
- # 获取能耗数据
- def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time):
- Criteria = {
- "building": buildingid,
- "meter": meter,
- "funcid": funcid,
- "data_time":{
- "$gte": from_time,
- "$lt": to_time
- }
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- return datas
- #取hbase数据,处理成sql语句
- def hbase_energy_data(points,table,building):
- sqls = []
- print("%s开始查询项目%s的%s至%s的数据" % (table, building, args.start_time, args.end_time))
- for i in points:
- meter,funcid = i[0],i[1]
- # print("%s开始查询项目%s的%s至%s的数据 %s-%s"%(table,building,args.start_time,args.end_time,meter,funcid))
- datas = get_data_time(hbase_database,table,building,meter,funcid,args.start_time,args.end_time)
- for i in datas:
- data_time = i["data_time"]
- data_value = i["data_value"]
- dt = data_time[0:4]+ "-" + data_time[4:6] + "-" + data_time[6:8]
- sqlline = "(%s,'%s',%s,%s,%s,'%s')" % (building, meter, funcid, data_time, data_value, dt)
- sqls.append(sqlline)
- return sqls
- # mysql插入数据
- def insert_mysql(sqls,building,my_table):
- print("开始往mysql插入%s的%s数据..."%(building,my_table))
- for i in range(0, len(sqls), 1000):
- sqlranges = sqls[i:i + 1000]
- sqlranges = INSERT_SQL % (my_database,my_table) + ",".join(sqlranges)
- mysql_cur.execute(sqlranges)
- conn.commit()
- print("mysql数据%s,项目%s插入成功,合计%s条..." % (my_table,building,len(sqls)))
- end_time = datetime.datetime.now().strftime("%Y%m%d") + "000000"
- start_time = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y%m%d") + "000000"
- parser = argparse.ArgumentParser()
- parser.add_argument("--start_time",default=start_time,help="--start_time 20220101000000")
- parser.add_argument("--end_time",default=end_time,help="--end_time 20220102000000")
- parser.add_argument("funcid",type=int,help="--funcid 10101")
- args = parser.parse_args()
- # print(args.start_time)
- # print(args.end_time)
- # print(args.funcid)
- # 读取配置文件信息
- with open("/mnt/datadisk0/saga/bi_saga/transfer_data/config.json", "r") as f:
- data = json.load(f)
- hbase_database = data["metadata"]["database"]
- url = data["metadata"]["url"]
- building_list = data["building"]["id"]
- print("项目列表:%s"%building_list)
- my_database = data["mysql"]["database"]
- my_fjd_table = data["mysql"]["fjd_table"]
- my_energy_table = data["mysql"]["energy_table"]
- my_co2_table = data["mysql"]["co2_table"]
- my_pm25_table = data["mysql"]["pm25_table"]
- my_temperature_table = data["mysql"]["temperature_table"]
- my_hcho_table = data["mysql"]["hcho_table"]
- my_humidity_table = data["mysql"]["humidity_table"]
- mysql = data["mysql"]
- del mysql["fjd_table"]
- del mysql["energy_table"]
- del mysql["co2_table"]
- del mysql["pm25_table"]
- del mysql["temperature_table"]
- del mysql["hcho_table"]
- del mysql["humidity_table"]
- tables = ["fjd_0_near_15min","data_servicedata_15min"]
- # #连接hbase
- zillionUtil = ZillionUtil(url)
- #连接mysql
- conn = pymysql.connect(**mysql)
- mysql_cur = conn.cursor()
- for building in building_list:
- #处理点位表
- pointlist = get_pointlist(hbase_database,"dy_pointlist",building)
- if pointlist:
- points = []
- for point in pointlist:
- i = point[1]
- if i == args.funcid:
- points.append(point)
- #电
- if args.funcid == 10101:
- # sqls = hbase_energy_data(points, "fjd_0_near_15min",building)
- # insert_mysql(sqls,building, my_fjd_table)
- sqls = hbase_energy_data(points, "data_servicedata_15min",building)
- insert_mysql(sqls,building, my_energy_table)
- #CO2
- if args.funcid == 11301:
- sqls = hbase_energy_data(points, "fjd_0_near_15min",building)
- insert_mysql(sqls,building, my_co2_table)
- #pm2.5
- if args.funcid == 11401:
- sqls = hbase_energy_data(points, "fjd_0_near_15min",building)
- insert_mysql(sqls,building, my_pm25_table)
- #甲醛
- if args.funcid == 11305:
- sqls = hbase_energy_data(points, "fjd_0_near_15min",building)
- insert_mysql(sqls,building, my_hcho_table)
- #温度
- if args.funcid == 11101:
- sqls = hbase_energy_data(points, "fjd_0_near_15min",building)
- insert_mysql(sqls,building, my_temperature_table)
- #湿度
- if args.funcid == 11201:
- sqls = hbase_energy_data(points, "fjd_0_near_15min",building)
- insert_mysql(sqls, building,my_humidity_table)
- else:
- print("未查询到项目%s的点位表..."%building)
- mysql_cur.close()
- conn.close()
|