temp_main.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import datetime
  2. import json
  3. import time
  4. from MyUtils.ZillionUtil import ZillionUtil
  5. from MyUtils.MysqlUtils import MysqlUtils
  6. from MyUtils.DateUtils import get_day
  7. import pytz
  8. INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values "
  9. DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' "
  10. def datetime_now():
  11. # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  12. #容器时间
  13. # tz = pytz.timezone('Asia/Shanghai') # 东八区
  14. datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
  15. pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
  16. return datetime_now
  17. # 获取能耗数据
  18. def get_data_time(zillionUtil,hbase_database, hbase_table, building,meters, from_time, to_time):
  19. Criteria = {
  20. "building": building,
  21. "meter": {
  22. "in":meters
  23. },
  24. "funcid": 10101,
  25. "data_time": {
  26. "$gte": from_time,
  27. "$lt": to_time
  28. }
  29. }
  30. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  31. return datas
  32. # 获取点位表
  33. def get_pointlists(zillionUtil,hbase_database, hbase_table, building):
  34. Criteria = {
  35. "building": building,
  36. }
  37. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  38. return datas
  39. with open("config.json", "r") as f:
  40. data = json.load(f)
  41. hbase_database = data["metadata"]["database"]
  42. url = data["metadata"]["url"]
  43. buildings = data["building"]
  44. set_time = data["set_time"]
  45. mysql = data["mysql"]
  46. my_database = mysql["database"]
  47. print("同步项目列表:%s"%buildings)
  48. time_now = datetime.datetime.fromtimestamp(int(time.time()),
  49. pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
  50. # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
  51. start_time = "20221201000000"
  52. end_time = "20230110000000"
  53. range_days = get_day(start_time,end_time)
  54. # #连接hbase
  55. zillionUtil = ZillionUtil(url)
  56. # #连接hbase
  57. MysqlUtil = MysqlUtils(**mysql)
  58. for i in range_days:
  59. yesterday, today = i[0], i[1]
  60. yesterday = yesterday[0:8]
  61. today = today[0:8]
  62. for building in buildings:
  63. pointlists = get_pointlists(zillionUtil,hbase_database,"dy_pointlist",building)
  64. meters_10101 = []
  65. for i in pointlists:
  66. if i["funcid"] == 10101:
  67. meters_10101.append(i["meter"])
  68. print("%s,%s项目点位列表%s"%(datetime_now(),building,meters_10101))
  69. project_id = "Pj" + building
  70. # yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y%m%d")
  71. # today = datetime.date.today().strftime("%Y%m%d")
  72. datas = get_data_time(zillionUtil,hbase_database,"data_servicedata_1d",building,meters_10101,yesterday,today)
  73. if datas:
  74. data_values = [float(i["data_value"]) for i in datas]
  75. total_energy = sum(data_values)
  76. sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
  77. project_id, yesterday,"0","0","0","0",total_energy, datetime_now(), datetime_now())
  78. print("%s,开始删除项目%s在%s的数据..." % (datetime_now(), building,yesterday))
  79. delete_sql = DELETE_SQL % (project_id, yesterday, today)
  80. MysqlUtil.update(delete_sql)
  81. inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
  82. print("%s,开始插入项目%s在%s的数据..." % (datetime_now(),building,yesterday))
  83. MysqlUtil.update(inser_sql)
  84. print("%s,%s项目的%s数据插入成功..."%(datetime_now(),building,yesterday))
  85. else:
  86. print("%s,未查询到项目%s在%s的数据..."%(datetime_now(),building,yesterday))
  87. else:
  88. time.sleep(2)
  89. print("%s,等待下次程序执行" % (datetime_now()))
  90. MysqlUtil.close()