import json from MyUtils.ZillionUtil import ZillionUtil import pymysql import datetime import argparse INSERT_SQL = "replace into %s.%s(building,meter,func_id,data_time,data_value,dt) values " # 获取项目点位 def get_pointlist(hbase_database, hbase_table, buildingid): Criteria = { "building": buildingid } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) pointlist = [] for i in datas: point = [] meter = i["meter"] funcid = i["funcid"] point.append(meter) point.append(funcid) pointlist.append(point) return pointlist # 获取能耗数据 def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time): Criteria = { "building": buildingid, "meter": meter, "funcid": funcid, "data_time":{ "$gte": from_time, "$lt": to_time } } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas #取hbase数据,处理成sql语句 def hbase_energy_data(points,table,building): sqls = [] print("%s开始查询项目%s的%s至%s的数据" % (table, building, args.start_time, args.end_time)) for i in points: meter,funcid = i[0],i[1] # print("%s开始查询项目%s的%s至%s的数据 %s-%s"%(table,building,args.start_time,args.end_time,meter,funcid)) datas = get_data_time(hbase_database,table,building,meter,funcid,args.start_time,args.end_time) for i in datas: data_time = i["data_time"] data_value = i["data_value"] dt = data_time[0:4]+ "-" + data_time[4:6] + "-" + data_time[6:8] sqlline = "(%s,'%s',%s,%s,%s,'%s')" % (building, meter, funcid, data_time, data_value, dt) sqls.append(sqlline) return sqls # mysql插入数据 def insert_mysql(sqls,building,my_table): print("开始往mysql插入%s的%s数据..."%(building,my_table)) for i in range(0, len(sqls), 1000): sqlranges = sqls[i:i + 1000] sqlranges = INSERT_SQL % (my_database,my_table) + ",".join(sqlranges) mysql_cur.execute(sqlranges) conn.commit() print("mysql数据%s,项目%s插入成功,合计%s条..." % (my_table,building,len(sqls))) end_time = datetime.datetime.now().strftime("%Y%m%d") + "000000" start_time = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y%m%d") + "000000" parser = argparse.ArgumentParser() parser.add_argument("--start_time",default=start_time,help="--start_time 20220101000000") parser.add_argument("--end_time",default=end_time,help="--end_time 20220102000000") parser.add_argument("funcid",type=int,help="--funcid 10101") args = parser.parse_args() # print(args.start_time) # print(args.end_time) # print(args.funcid) # 读取配置文件信息 with open("/mnt/datadisk0/saga/bi_saga/transfer_data/config.json", "r") as f: data = json.load(f) hbase_database = data["metadata"]["database"] url = data["metadata"]["url"] building_list = data["building"]["id"] print("项目列表:%s"%building_list) my_database = data["mysql"]["database"] my_fjd_table = data["mysql"]["fjd_table"] my_energy_table = data["mysql"]["energy_table"] my_co2_table = data["mysql"]["co2_table"] my_pm25_table = data["mysql"]["pm25_table"] my_temperature_table = data["mysql"]["temperature_table"] my_hcho_table = data["mysql"]["hcho_table"] my_humidity_table = data["mysql"]["humidity_table"] mysql = data["mysql"] del mysql["fjd_table"] del mysql["energy_table"] del mysql["co2_table"] del mysql["pm25_table"] del mysql["temperature_table"] del mysql["hcho_table"] del mysql["humidity_table"] tables = ["fjd_0_near_15min","data_servicedata_15min"] # #连接hbase zillionUtil = ZillionUtil(url) #连接mysql conn = pymysql.connect(**mysql) mysql_cur = conn.cursor() for building in building_list: #处理点位表 pointlist = get_pointlist(hbase_database,"dy_pointlist",building) if pointlist: points = [] for point in pointlist: i = point[1] if i == args.funcid: points.append(point) #电 if args.funcid == 10101: # sqls = hbase_energy_data(points, "fjd_0_near_15min",building) # insert_mysql(sqls,building, my_fjd_table) sqls = hbase_energy_data(points, "data_servicedata_15min",building) insert_mysql(sqls,building, my_energy_table) #CO2 if args.funcid == 11301: sqls = hbase_energy_data(points, "fjd_0_near_15min",building) insert_mysql(sqls,building, my_co2_table) #pm2.5 if args.funcid == 11401: sqls = hbase_energy_data(points, "fjd_0_near_15min",building) insert_mysql(sqls,building, my_pm25_table) #甲醛 if args.funcid == 11305: sqls = hbase_energy_data(points, "fjd_0_near_15min",building) insert_mysql(sqls,building, my_hcho_table) #温度 if args.funcid == 11101: sqls = hbase_energy_data(points, "fjd_0_near_15min",building) insert_mysql(sqls,building, my_temperature_table) #湿度 if args.funcid == 11201: sqls = hbase_energy_data(points, "fjd_0_near_15min",building) insert_mysql(sqls, building,my_humidity_table) else: print("未查询到项目%s的点位表..."%building) mysql_cur.close() conn.close()