start_kafka_real.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import json,time
  2. from MyUtils.ZillionUtil import ZillionUtil
  3. from MyUtils.DateUtils import get_month_range
  4. from kafka import KafkaProducer
  5. import datetime
  6. # 获取项目点位
  7. def get_pointlist(hbase_database, hbase_table, buildingid):
  8. Criteria = {
  9. "building": buildingid
  10. }
  11. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  12. pointlist = []
  13. for i in datas:
  14. point = []
  15. meter = i["meter"]
  16. funcid = i["funcid"]
  17. point.append(meter)
  18. point.append(funcid)
  19. pointlist.append(point)
  20. return pointlist
  21. # 获取能耗数据
  22. def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time):
  23. Criteria = {
  24. "building": buildingid,
  25. "meter": meter,
  26. "funcid": funcid,
  27. "data_time":{
  28. "$gte": from_time,
  29. "$lt": to_time
  30. }
  31. }
  32. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  33. return datas
  34. #kafka发送数据
  35. def send_kafka(topic,msg):
  36. msg = json.dumps(msg)
  37. msg = msg.encode('utf-8')
  38. future = producer.send(topic, msg)
  39. try:
  40. record_metadata = future.get(timeout=10)
  41. # partition = record_metadata.partition
  42. # offset = record_metadata.offset
  43. # print('save success, partition: {}, offset: {}'.format(partition, offset))
  44. except Exception as e:
  45. print("Error:{}".format(e))
  46. yesterday_time = (datetime.datetime.now()+datetime.timedelta(days=-1)).strftime("%Y%m%d") + "000000"
  47. now_time = datetime.datetime.now().strftime("%Y%m%d") + "000000"
  48. # 读取配置文件信息
  49. with open("config.json", "r") as f:
  50. data = json.load(f)
  51. hbase_database = data["metadata"]["database"]
  52. url = data["metadata"]["url"]
  53. building = data["building"]["id"]
  54. topic = data["kafka"]["topic"]
  55. kafka_host = data["kafka"]["host"]
  56. kafka_port = data["kafka"]["port"]
  57. #先修改文件时间
  58. with open("config-time-real.json", "r") as f:
  59. data_time = json.load(f)
  60. data_time["from_time"] = yesterday_time
  61. data_time["to_time"] = now_time
  62. with open("config-time-real.json", "w") as f_config:
  63. json.dump(data_time,f_config)
  64. with open("config-time-real.json", "r") as f:
  65. data_time = json.load(f)
  66. from_time = data_time["from_time"]
  67. to_time = data_time["to_time"]
  68. schedule_task = data_time["schedule_task"]
  69. tables = ["fjd_0_near_15min","data_servicedata_15min"]
  70. print("------------------等待下次运行时间%s------------------" % schedule_task)
  71. while True:
  72. time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
  73. if time_now == schedule_task: # 此处设置每天定时的时间
  74. #连接kafka
  75. producer = KafkaProducer(bootstrap_servers='%s:%s'%(kafka_host,kafka_port)) # 连接kafka
  76. # #连接hbase
  77. zillionUtil = ZillionUtil(url)
  78. pointlist = get_pointlist(hbase_database,"dy_pointlist",building)
  79. for table in tables:
  80. for i in pointlist:
  81. meter,funcid = i[0],i[1]
  82. monthrange = get_month_range(from_time,to_time)
  83. for m in monthrange:
  84. startdate,enddate = m[0],m[1]
  85. print("%s开始查询%s至%s的数据 %s-%s"%(table,startdate,enddate,meter,funcid))
  86. if table == "fjd_0_near_15min":
  87. data_fjd = get_data_time(hbase_database,table,building,meter,funcid,startdate,enddate)
  88. if data_fjd:
  89. for i in data_fjd:
  90. i.pop("data_flag")
  91. send_kafka(topic,i)
  92. else:
  93. data_fjd = get_data_time(hbase_database, table, building, meter, funcid, startdate, enddate)
  94. if data_fjd:
  95. for i in data_fjd:
  96. send_kafka(topic,i)
  97. producer.close()
  98. print("------------------等待下次运行时间%s------------------" % schedule_task)