123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import json,time
- from MyUtils.ZillionUtil import ZillionUtil
- from MyUtils.DateUtils import get_month_range
- from kafka import KafkaProducer
- import datetime
- # 获取项目点位
- def get_pointlist(hbase_database, hbase_table, buildingid):
- Criteria = {
- "building": buildingid
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- pointlist = []
- for i in datas:
- point = []
- meter = i["meter"]
- funcid = i["funcid"]
- point.append(meter)
- point.append(funcid)
- pointlist.append(point)
- return pointlist
- # 获取能耗数据
- def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time):
- Criteria = {
- "building": buildingid,
- "meter": meter,
- "funcid": funcid,
- "data_time":{
- "$gte": from_time,
- "$lt": to_time
- }
- }
- datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
- return datas
- #kafka发送数据
- def send_kafka(topic,msg):
- msg = json.dumps(msg)
- msg = msg.encode('utf-8')
- future = producer.send(topic, msg)
- try:
- record_metadata = future.get(timeout=10)
- # partition = record_metadata.partition
- # offset = record_metadata.offset
- # print('save success, partition: {}, offset: {}'.format(partition, offset))
- except Exception as e:
- print("Error:{}".format(e))
- yesterday_time = (datetime.datetime.now()+datetime.timedelta(days=-1)).strftime("%Y%m%d") + "000000"
- now_time = datetime.datetime.now().strftime("%Y%m%d") + "000000"
- # 读取配置文件信息
- with open("config.json", "r") as f:
- data = json.load(f)
- hbase_database = data["metadata"]["database"]
- url = data["metadata"]["url"]
- building = data["building"]["id"]
- topic = data["kafka"]["topic"]
- kafka_host = data["kafka"]["host"]
- kafka_port = data["kafka"]["port"]
- #先修改文件时间
- with open("config-time-real.json", "r") as f:
- data_time = json.load(f)
- data_time["from_time"] = yesterday_time
- data_time["to_time"] = now_time
- with open("config-time-real.json", "w") as f_config:
- json.dump(data_time,f_config)
- with open("config-time-real.json", "r") as f:
- data_time = json.load(f)
- from_time = data_time["from_time"]
- to_time = data_time["to_time"]
- schedule_task = data_time["schedule_task"]
- tables = ["fjd_0_near_15min","data_servicedata_15min"]
- print("------------------等待下次运行时间%s------------------" % schedule_task)
- while True:
- time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
- if time_now == schedule_task: # 此处设置每天定时的时间
- #连接kafka
- producer = KafkaProducer(bootstrap_servers='%s:%s'%(kafka_host,kafka_port)) # 连接kafka
- # #连接hbase
- zillionUtil = ZillionUtil(url)
- pointlist = get_pointlist(hbase_database,"dy_pointlist",building)
- for table in tables:
- for i in pointlist:
- meter,funcid = i[0],i[1]
- monthrange = get_month_range(from_time,to_time)
- for m in monthrange:
- startdate,enddate = m[0],m[1]
- print("%s开始查询%s至%s的数据 %s-%s"%(table,startdate,enddate,meter,funcid))
- if table == "fjd_0_near_15min":
- data_fjd = get_data_time(hbase_database,table,building,meter,funcid,startdate,enddate)
- if data_fjd:
- for i in data_fjd:
- i.pop("data_flag")
- send_kafka(topic,i)
- else:
- data_fjd = get_data_time(hbase_database, table, building, meter, funcid, startdate, enddate)
- if data_fjd:
- for i in data_fjd:
- send_kafka(topic,i)
- producer.close()
- print("------------------等待下次运行时间%s------------------" % schedule_task)
|