import json,time from MyUtils.ZillionUtil import ZillionUtil from MyUtils.DateUtils import get_month_range from kafka import KafkaProducer import datetime # 获取项目点位 def get_pointlist(hbase_database, hbase_table, buildingid): Criteria = { "building": buildingid } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) pointlist = [] for i in datas: point = [] meter = i["meter"] funcid = i["funcid"] point.append(meter) point.append(funcid) pointlist.append(point) return pointlist # 获取能耗数据 def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time): Criteria = { "building": buildingid, "meter": meter, "funcid": funcid, "data_time":{ "$gte": from_time, "$lt": to_time } } datas = zillionUtil.select(hbase_database, hbase_table, Criteria) return datas #kafka发送数据 def send_kafka(topic,msg): msg = json.dumps(msg) msg = msg.encode('utf-8') future = producer.send(topic, msg) try: record_metadata = future.get(timeout=10) # partition = record_metadata.partition # offset = record_metadata.offset # print('save success, partition: {}, offset: {}'.format(partition, offset)) except Exception as e: print("Error:{}".format(e)) yesterday_time = (datetime.datetime.now()+datetime.timedelta(days=-1)).strftime("%Y%m%d") + "000000" now_time = datetime.datetime.now().strftime("%Y%m%d") + "000000" # 读取配置文件信息 with open("config.json", "r") as f: data = json.load(f) hbase_database = data["metadata"]["database"] url = data["metadata"]["url"] building = data["building"]["id"] topic = data["kafka"]["topic"] kafka_host = data["kafka"]["host"] kafka_port = data["kafka"]["port"] #先修改文件时间 with open("config-time-real.json", "r") as f: data_time = json.load(f) data_time["from_time"] = yesterday_time data_time["to_time"] = now_time with open("config-time-real.json", "w") as f_config: json.dump(data_time,f_config) with open("config-time-real.json", "r") as f: data_time = json.load(f) from_time = data_time["from_time"] to_time = data_time["to_time"] schedule_task = data_time["schedule_task"] tables = ["fjd_0_near_15min","data_servicedata_15min"] print("------------------等待下次运行时间%s------------------" % schedule_task) while True: time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新 if time_now == schedule_task: # 此处设置每天定时的时间 #连接kafka producer = KafkaProducer(bootstrap_servers='%s:%s'%(kafka_host,kafka_port)) # 连接kafka # #连接hbase zillionUtil = ZillionUtil(url) pointlist = get_pointlist(hbase_database,"dy_pointlist",building) for table in tables: for i in pointlist: meter,funcid = i[0],i[1] monthrange = get_month_range(from_time,to_time) for m in monthrange: startdate,enddate = m[0],m[1] print("%s开始查询%s至%s的数据 %s-%s"%(table,startdate,enddate,meter,funcid)) if table == "fjd_0_near_15min": data_fjd = get_data_time(hbase_database,table,building,meter,funcid,startdate,enddate) if data_fjd: for i in data_fjd: i.pop("data_flag") send_kafka(topic,i) else: data_fjd = get_data_time(hbase_database, table, building, meter, funcid, startdate, enddate) if data_fjd: for i in data_fjd: send_kafka(topic,i) producer.close() print("------------------等待下次运行时间%s------------------" % schedule_task)