main.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. import datetime
  2. import json,pymysql
  3. import os
  4. import time
  5. from MyUtils.ZillionUtil import ZillionUtil
  6. from MyUtils.MysqlUtils import MysqlUtils
  7. from MyUtils.Dingtalk import send_message
  8. import pytz
  9. import multiprocessing
  10. INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values "
  11. DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' "
  12. SELETE_COUNTLASTDATA_SQL = "SELECT count(*) FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
  13. SELETE_SUMLASTDATA_SQL = "SELECT SUM(energy_ac_terminal)+SUM(energy_heating)+SUM(energy_cooling)+SUM(energy_light)+sum(energy_others) as last_data FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
  14. def datetime_now():
  15. # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  16. #容器时间
  17. # tz = pytz.timezone('Asia/Shanghai') # 东八区
  18. datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
  19. pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
  20. return datetime_now
  21. # 获取能耗数据
  22. def get_data_time(zillionUtil,hbase_database, hbase_table, building, from_time, to_time):
  23. Criteria = {
  24. "building": building,
  25. "energyModelSign": building + "EM01",
  26. "energyModelNodeSign": {
  27. "in":[
  28. "EI10102010101001",
  29. "EI10102010102001",
  30. "EI1010202001",
  31. "EI1001"
  32. ]
  33. },
  34. "data_time": {
  35. "$gte": from_time,
  36. "$lt": to_time
  37. }
  38. }
  39. datas = zillionUtil.select(hbase_database, hbase_table, Criteria)
  40. return datas
  41. # # mysql插入数据
  42. # def insert_mysql(sqls,building,my_table):
  43. # print("%s,开始往mysql插入%s的%s数据..."%(datetime_now(),building,my_table))
  44. # for i in range(0, len(sqls), 1000):
  45. # sqlranges = sqls[i:i + 1000]
  46. # sqlranges = INSERT_SQL % (my_database,my_table) + ",".join(sqlranges)
  47. # MysqlUtil.update(sqlranges)
  48. # print("%s,mysql数据%s,项目%s插入成功,合计%s条..." % (datetime_now(),my_table,building,len(sqls)))
  49. # with open("config.json", "r") as f:
  50. # data = json.load(f)
  51. # hbase_database = data["metadata"]["database"]
  52. # url = data["metadata"]["url"]
  53. # building = data["building"]
  54. # mysql = data["mysql"]
  55. # my_database = mysql["database"]
  56. # dingding = data["dingding"]
  57. # at_mobiles = data["at_mobiles"]
  58. hbase_database = os.getenv("hbase_database")
  59. url = os.getenv("url")
  60. building = os.getenv("building")
  61. dingding = os.getenv("dingding")
  62. at_mobiles = os.getenv("at_mobiles")
  63. mysql = {
  64. "host": os.getenv("host"),
  65. "port": os.getenv("port"),
  66. "user": os.getenv("user"),
  67. "passwd": os.getenv("passwd"),
  68. "database": os.getenv("database")
  69. }
  70. my_database = os.getenv("database")
  71. def check_energy():
  72. while True:
  73. project_id = "Pj" + building
  74. # 容器里获取时间
  75. today = datetime.datetime.fromtimestamp(int(time.time()),
  76. pytz.timezone('Asia/Shanghai'))
  77. # 获取星期几0~6 0代表星期一
  78. weekday = today.weekday()
  79. if weekday == 0:
  80. time_now = datetime.datetime.fromtimestamp(int(time.time()),
  81. pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
  82. if time_now == "09:00:00":
  83. ##查询本周能耗和上周能耗
  84. lastmonday = today - datetime.timedelta(days=weekday + 7)
  85. conn = pymysql.connect(**mysql)
  86. mysql_cur = conn.cursor()
  87. count_sql = SELETE_COUNTLASTDATA_SQL % (project_id, lastmonday.strftime("%Y%m%d"), today.strftime("%Y%m%d"))
  88. mysql_cur.execute(count_sql)
  89. lastdatacount = mysql_cur.fetchall()[0][0]
  90. print("上周能耗天数%s"%lastdatacount)
  91. if lastdatacount == 7:
  92. lastdata_sql = SELETE_SUMLASTDATA_SQL % (
  93. project_id, lastmonday.strftime("%Y%m%d"), today.strftime("%Y%m%d"))
  94. mysql_cur.execute(lastdata_sql)
  95. lastdata = mysql_cur.fetchall()[0][0]
  96. last_lastmonday = lastmonday - datetime.timedelta(days=weekday + 7)
  97. last_lastdata_sql = SELETE_SUMLASTDATA_SQL % (
  98. project_id, last_lastmonday.strftime("%Y%m%d"), lastmonday.strftime("%Y%m%d"))
  99. mysql_cur.execute(last_lastdata_sql)
  100. last_lastdata = mysql_cur.fetchall()[0][0]
  101. message = "【博锐周报数据】 %s,上上周能耗数据:%s,上周能耗数据:%s %s" % (datetime_now(), last_lastdata, lastdata, at_mobiles)
  102. print(message)
  103. send_message(message, dingding, at_mobiles)
  104. else:
  105. message = "【博锐周报数据】 %s,上周能耗数据有缺失,请及时核对!!!%s" % (datetime_now(), at_mobiles)
  106. print(message)
  107. send_message(message, dingding, at_mobiles)
  108. mysql_cur.close()
  109. conn.close()
  110. time.sleep(2)
  111. def transfer_energy():
  112. print("同步%s项目数据"%(building))
  113. while True:
  114. project_id = "Pj" + building
  115. time_now = datetime.datetime.fromtimestamp(int(time.time()),
  116. pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
  117. # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
  118. am_set_time = "08:00:00"
  119. pm_set_time = "22:00:00"
  120. if time_now == am_set_time or time_now ==pm_set_time:#此处设置每天定时的时间
  121. today = datetime.date.today().strftime("%Y%m%d")
  122. yesterday = (datetime.date.today()-datetime.timedelta(days=1)).strftime("%Y%m%d")
  123. # #连接hbase
  124. zillionUtil = ZillionUtil(url)
  125. # #连接hbase
  126. MysqlUtil = MysqlUtils(**mysql)
  127. datas = get_data_time(zillionUtil,hbase_database, "data_energydata_1d", building, yesterday, today)
  128. if datas:
  129. # 删除上月数据
  130. print("%s,开始删除%s的数据..."%(datetime_now(),yesterday))
  131. delete_sql = DELETE_SQL% (project_id,yesterday,today)
  132. MysqlUtil.update(delete_sql)
  133. # sqls = []
  134. energy_cooling = "0"
  135. energy_heating = "0"
  136. energy_ac_terminal = "0"
  137. energy_light = "0"
  138. sum_data_value = "0"
  139. for i in datas:
  140. if i["energyModelNodeSign"] == "EI1001":
  141. sum_data_value = i["data_value"]
  142. #冷热源
  143. if i["energyModelNodeSign"] == "EI10102010101001":
  144. energy_cooling = i["data_value"]
  145. # #热
  146. # if i["energyModelNodeSign"] == "EI1010201010101001":
  147. # energy_heating = i["data_value"]
  148. #空调末端
  149. if i["energyModelNodeSign"] == "EI10102010102001":
  150. energy_ac_terminal = i["data_value"]
  151. #照明
  152. if i["energyModelNodeSign"] == "EI1010202001":
  153. energy_light = i["data_value"]
  154. energy_other = float(sum_data_value) - float(energy_cooling) - float(energy_heating) -float(energy_light) - float(energy_ac_terminal)
  155. sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
  156. project_id, yesterday,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_other, datetime_now(), datetime_now())
  157. inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
  158. print("%s,开始插入数据..."%datetime_now())
  159. MysqlUtil.update(inser_sql)
  160. else:
  161. print("%s,没有查询到数据...")
  162. time.sleep(2)
  163. print("%s,等待下次程序执行" % (datetime_now()))
  164. if __name__ == '__main__':
  165. check_energy_process = multiprocessing.Process(target=check_energy)
  166. transfer_energy_process = multiprocessing.Process(target=transfer_energy)
  167. check_energy_process.start()
  168. transfer_energy_process.start()