temp_main.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import datetime
  2. import json,pymysql
  3. import os
  4. import time
  5. from MyUtils.ZillionUtil import ZillionUtil
  6. from MyUtils.MysqlUtils import MysqlUtils
  7. from MyUtils.DateUtils import get_day
  8. from MyUtils.Dingtalk import send_message
  9. import pytz
  10. import multiprocessing
  11. INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values "
  12. DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' "
  13. SELETE_COUNTLASTDATA_SQL = "SELECT count(*) FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
  14. SELETE_SUMLASTDATA_SQL = "SELECT SUM(energy_ac_terminal)+SUM(energy_heating)+SUM(energy_cooling)+SUM(energy_light)+sum(energy_others) as last_data FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
  15. def datetime_now():
  16. # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  17. #容器时间
  18. # tz = pytz.timezone('Asia/Shanghai') # 东八区
  19. datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
  20. pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
  21. return datetime_now
  22. # 获取能耗数据
  23. def get_data_time(zillionUtil,hbase_database, hbase_table, building, from_time, to_time):
  24. Criteria = {
  25. "building": building,
  26. "energyModelSign": building + "EM01",
  27. "energyModelNodeSign": {
  28. "in":[
  29. "EI10102010101001",
  30. "EI10102010102001",
  31. "EI1010202001",
  32. "EI1001"
  33. ]
  34. },
  35. "data_time": {
  36. "$gte": from_time,
  37. "$lt": to_time
  38. }
  39. }
  40. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  41. return datas
  42. with open("config.json", "r") as f:
  43. data = json.load(f)
  44. hbase_database = data["metadata"]["database"]
  45. url = data["metadata"]["url"]
  46. building = data["building"]
  47. mysql = data["mysql"]
  48. my_database = mysql["database"]
  49. dingding = data["dingding"]
  50. at_mobiles = data["at_mobiles"]
  51. print("同步%s项目数据 " %(building))
  52. project_id = "Pj" + building
  53. time_now = datetime.datetime.fromtimestamp(int(time.time()),
  54. pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
  55. # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
  56. start_time = "20221213000000"
  57. end_time = "20221220000000"
  58. range_days = get_day(start_time,end_time)
  59. for i in range_days:
  60. yesterday,today = i[0],i[1]
  61. yesterday = yesterday[0:8]
  62. today = today[0:8]
  63. # #连接hbase
  64. zillionUtil = ZillionUtil(url)
  65. # #连接hbase
  66. MysqlUtil = MysqlUtils(**mysql)
  67. datas = get_data_time(zillionUtil ,hbase_database, "data_energydata_1d", building, yesterday, today)
  68. if datas:
  69. # 删除上月数据
  70. print("%s,开始删除%s的数据... " %(datetime_now() ,yesterday))
  71. delete_sql = DELETE_SQL% (project_id ,yesterday ,today)
  72. MysqlUtil.update(delete_sql)
  73. # sqls = []
  74. energy_cooling = "0"
  75. energy_heating = "0"
  76. energy_ac_terminal = "0"
  77. energy_light = "0"
  78. sum_data_value = "0"
  79. for i in datas:
  80. if i["energyModelNodeSign"] == "EI1001":
  81. sum_data_value = i["data_value"]
  82. # 冷热源
  83. if i["energyModelNodeSign"] == "EI10102010101001":
  84. energy_cooling = i["data_value"]
  85. # #热
  86. # if i["energyModelNodeSign"] == "EI1010201010101001":
  87. # energy_heating = i["data_value"]
  88. # 空调末端
  89. if i["energyModelNodeSign"] == "EI10102010102001":
  90. energy_ac_terminal = i["data_value"]
  91. # 照明
  92. if i["energyModelNodeSign"] == "EI1010202001":
  93. energy_light = i["data_value"]
  94. energy_other = float(sum_data_value) - float(energy_cooling) - float(energy_heating) -float(
  95. energy_light) - float(energy_ac_terminal)
  96. sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
  97. project_id, yesterday, energy_cooling, energy_heating, energy_ac_terminal, energy_light, energy_other,
  98. datetime_now(), datetime_now())
  99. inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
  100. print("%s,开始插入数据..." % datetime_now())
  101. MysqlUtil.update(inser_sql)
  102. else:
  103. print("%s,没有查询到数据...")
  104. time.sleep(2)
  105. print("%s,等待下次程序执行" % (datetime_now()))