import json from MyUtils.ZillionUtil import ZillionUtil from MyUtils.DateUtils import get_month_range from kafka import KafkaProducer import datetime import argparse # 获取项目点位 def get_pointlist(hbase_database, hbase_table, buildingid): Criteria = { "building": buildingid } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) pointlist = [] for i in datas: point = [] meter = i["meter"] funcid = i["funcid"] point.append(meter) point.append(funcid) pointlist.append(point) return pointlist # 获取能耗数据 def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time): Criteria = { "building": buildingid, "meter": meter, "funcid": funcid, "data_time":{ "$gte": from_time, "$lt": to_time } } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas #kafka发送数据 def send_kafka(topic,msg): msg = json.dumps(msg) msg = msg.encode('utf-8') future = producer.send(topic, msg) try: record_metadata = future.get(timeout=10) partition = record_metadata.partition offset = record_metadata.offset # print('save success, partition: {}, offset: {}'.format(partition, offset)) except Exception as e: print("Error:{}".format(e)) now_time = datetime.datetime.now().strftime("%Y%m%d") + "000000" end_time = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y%m%d") # 读取配置文件信息 with open("config.json", "r") as f: data = json.load(f) hbase_database = data["metadata"]["database"] url = data["metadata"]["url"] building = data["building"]["id"] topic = data["kafka"]["topic"] kafka_host = data["kafka"]["host"] kafka_port = data["kafka"]["port"] with open("config-time.json", "r") as f: data_time = json.load(f) from_time = data_time["from_time"] to_time = data_time["to_time"] #可启动传参,python xxx.py --start_time "20210701000000" --end_time "20210702000000" parser = argparse.ArgumentParser() parser.add_argument("--start_time",default=from_time,help="--start_time 20220101000000") parser.add_argument("--end_time",default=to_time,help="--end_time 20220201000000") args = parser.parse_args() # print(args.start_time) # print(args.end_time) #连接kafka producer = KafkaProducer(bootstrap_servers='%s:%s'%(kafka_host,kafka_port)) # 连接kafka tables = ["fjd_0_near_15min","data_servicedata_15min"] # #连接hbase zillionUtil = ZillionUtil(url) pointlist = get_pointlist(hbase_database,"dy_pointlist",building) for table in tables: for i in pointlist: meter,funcid = i[0],i[1] monthrange = get_month_range(args.start_time,args.end_time) for m in monthrange: startdate,enddate = m[0],m[1] print("%s开始查询%s至%s的数据 %s-%s"%(table,startdate,enddate,meter,funcid)) if table == "fjd_0_near_15min": data_fjd = get_data_time(hbase_database,table,building,meter,funcid,startdate,enddate) if data_fjd: for i in data_fjd: i.pop("data_flag") send_kafka(topic,i) else: data_fjd = get_data_time(hbase_database, table, building, meter, funcid, startdate, enddate) if data_fjd: for i in data_fjd: send_kafka(topic,i) producer.close()