start_kafka.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import json
  2. from MyUtils.ZillionUtil import ZillionUtil
  3. from MyUtils.DateUtils import get_month_range
  4. from kafka import KafkaProducer
  5. import datetime
  6. import argparse
  7. # 获取项目点位
  8. def get_pointlist(hbase_database, hbase_table, buildingid):
  9. Criteria = {
  10. "building": buildingid
  11. }
  12. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  13. pointlist = []
  14. for i in datas:
  15. point = []
  16. meter = i["meter"]
  17. funcid = i["funcid"]
  18. point.append(meter)
  19. point.append(funcid)
  20. pointlist.append(point)
  21. return pointlist
  22. # 获取能耗数据
  23. def get_data_time(hbase_database, hbase_table, buildingid, meter, funcid,from_time,to_time):
  24. Criteria = {
  25. "building": buildingid,
  26. "meter": meter,
  27. "funcid": funcid,
  28. "data_time":{
  29. "$gte": from_time,
  30. "$lt": to_time
  31. }
  32. }
  33. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  34. return datas
  35. #kafka发送数据
  36. def send_kafka(topic,msg):
  37. msg = json.dumps(msg)
  38. msg = msg.encode('utf-8')
  39. future = producer.send(topic, msg)
  40. try:
  41. record_metadata = future.get(timeout=10)
  42. partition = record_metadata.partition
  43. offset = record_metadata.offset
  44. # print('save success, partition: {}, offset: {}'.format(partition, offset))
  45. except Exception as e:
  46. print("Error:{}".format(e))
  47. now_time = datetime.datetime.now().strftime("%Y%m%d") + "000000"
  48. end_time = (datetime.datetime.now() + datetime.timedelta(days=1)).strftime("%Y%m%d")
  49. # 读取配置文件信息
  50. with open("config.json", "r") as f:
  51. data = json.load(f)
  52. hbase_database = data["metadata"]["database"]
  53. url = data["metadata"]["url"]
  54. building = data["building"]["id"]
  55. topic = data["kafka"]["topic"]
  56. kafka_host = data["kafka"]["host"]
  57. kafka_port = data["kafka"]["port"]
  58. with open("config-time.json", "r") as f:
  59. data_time = json.load(f)
  60. from_time = data_time["from_time"]
  61. to_time = data_time["to_time"]
  62. #可启动传参,python xxx.py --start_time "20210701000000" --end_time "20210702000000"
  63. parser = argparse.ArgumentParser()
  64. parser.add_argument("--start_time",default=from_time,help="--start_time 20220101000000")
  65. parser.add_argument("--end_time",default=to_time,help="--end_time 20220201000000")
  66. args = parser.parse_args()
  67. # print(args.start_time)
  68. # print(args.end_time)
  69. #连接kafka
  70. producer = KafkaProducer(bootstrap_servers='%s:%s'%(kafka_host,kafka_port)) # 连接kafka
  71. tables = ["fjd_0_near_15min","data_servicedata_15min"]
  72. # #连接hbase
  73. zillionUtil = ZillionUtil(url)
  74. pointlist = get_pointlist(hbase_database,"dy_pointlist",building)
  75. for table in tables:
  76. for i in pointlist:
  77. meter,funcid = i[0],i[1]
  78. monthrange = get_month_range(args.start_time,args.end_time)
  79. for m in monthrange:
  80. startdate,enddate = m[0],m[1]
  81. print("%s开始查询%s至%s的数据 %s-%s"%(table,startdate,enddate,meter,funcid))
  82. if table == "fjd_0_near_15min":
  83. data_fjd = get_data_time(hbase_database,table,building,meter,funcid,startdate,enddate)
  84. if data_fjd:
  85. for i in data_fjd:
  86. i.pop("data_flag")
  87. send_kafka(topic,i)
  88. else:
  89. data_fjd = get_data_time(hbase_database, table, building, meter, funcid, startdate, enddate)
  90. if data_fjd:
  91. for i in data_fjd:
  92. send_kafka(topic,i)
  93. producer.close()