|
@@ -1,26 +1,30 @@
|
|
|
import datetime
|
|
|
-import json
|
|
|
+import json,pymysql
|
|
|
import os
|
|
|
import time
|
|
|
from MyUtils.ZillionUtil import ZillionUtil
|
|
|
from MyUtils.MysqlUtils import MysqlUtils
|
|
|
+from MyUtils.Dingtalk import send_message
|
|
|
import pytz
|
|
|
+import multiprocessing
|
|
|
|
|
|
INSERT_SQL = "replace into %s.%s(project_id,date,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_others,create_time,update_time) values "
|
|
|
DELETE_SQL = "DELETE FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s' "
|
|
|
+SELETE_COUNTLASTDATA_SQL = "SELECT count(*) FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
|
|
|
+SELETE_SUMLASTDATA_SQL = "SELECT SUM(energy_ac_terminal)+SUM(energy_heating)+SUM(energy_cooling)+SUM(energy_light)+sum(energy_others) as last_data FROM `energy_week_day` WHERE `project_id` = '%s' AND `date` >= '%s' AND `date` < '%s'"
|
|
|
|
|
|
def datetime_now():
|
|
|
|
|
|
|
|
|
|
|
|
datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
|
|
|
- pytz.timezone('Asia/Shanghai')).strftime('%Y%m%d%H%M%S')
|
|
|
+ pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
|
|
|
return datetime_now
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-def get_data_time(hbase_database, hbase_table, building, from_time, to_time):
|
|
|
+def get_data_time(zillionUtil,hbase_database, hbase_table, building, from_time, to_time):
|
|
|
Criteria = {
|
|
|
"building": building,
|
|
|
"energyModelSign": building + "EM01",
|
|
@@ -49,89 +53,141 @@ def get_data_time(hbase_database, hbase_table, building, from_time, to_time):
|
|
|
|
|
|
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-hbase_database = os.getenv("hbase_database")
|
|
|
-url = os.getenv("url")
|
|
|
-building = os.getenv("building")
|
|
|
-mysql = {
|
|
|
- "host": os.getenv("host"),
|
|
|
- "port": os.getenv("port"),
|
|
|
- "user": os.getenv("user"),
|
|
|
- "passwd": os.getenv("passwd"),
|
|
|
- "database": os.getenv("database")
|
|
|
-}
|
|
|
-my_database = os.getenv("database")
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-print("同步%s项目数据"%(building))
|
|
|
-while True:
|
|
|
-
|
|
|
-
|
|
|
- time_now = datetime.datetime.fromtimestamp(int(time.time()),
|
|
|
- pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
|
|
|
-
|
|
|
- am_set_time = "08:00:00"
|
|
|
- pm_set_time = "22:00:00"
|
|
|
- if time_now == am_set_time or time_now ==pm_set_time:
|
|
|
- today = datetime.date.today().strftime("%Y%m%d")
|
|
|
- yesterday = (datetime.date.today()-datetime.timedelta(days=1)).strftime("%Y%m%d")
|
|
|
-
|
|
|
-
|
|
|
- zillionUtil = ZillionUtil(url)
|
|
|
-
|
|
|
- MysqlUtil = MysqlUtils(**mysql)
|
|
|
- datas = get_data_time(hbase_database, "data_energydata_1d", building, yesterday, today)
|
|
|
- if datas:
|
|
|
- project_id = "Pj" + building
|
|
|
-
|
|
|
- print("%s,开始删除%s的数据..."%(datetime_now(),yesterday))
|
|
|
- delete_sql = DELETE_SQL% (project_id,yesterday,today)
|
|
|
- MysqlUtil.update(delete_sql)
|
|
|
-
|
|
|
- energy_cooling = "0"
|
|
|
- energy_heating = "0"
|
|
|
- energy_ac_terminal = "0"
|
|
|
- energy_light = "0"
|
|
|
- sum_data_value = "0"
|
|
|
- for i in datas:
|
|
|
-
|
|
|
- if i["energyModelNodeSign"] == "EI1001":
|
|
|
- sum_data_value = i["data_value"]
|
|
|
-
|
|
|
-
|
|
|
- if i["energyModelNodeSign"] == "EI10102010101001":
|
|
|
- energy_cooling = i["data_value"]
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if i["energyModelNodeSign"] == "EI10102010102001":
|
|
|
- energy_ac_terminal = i["data_value"]
|
|
|
-
|
|
|
- if i["energyModelNodeSign"] == "EI1010202001":
|
|
|
- energy_light = i["data_value"]
|
|
|
- energy_other = float(sum_data_value) - float(energy_cooling) - float(energy_heating) -float(energy_light) - float(energy_ac_terminal)
|
|
|
- sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
|
|
- project_id, yesterday,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_other, datetime_now(), datetime_now())
|
|
|
- inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
|
|
|
- print("%s,开始插入数据..."%datetime_now())
|
|
|
- MysqlUtil.update(inser_sql)
|
|
|
-
|
|
|
-
|
|
|
- else:
|
|
|
- print("%s,没有查询到数据...")
|
|
|
- time.sleep(2)
|
|
|
- print("%s,等待下次程序执行" % (datetime_now()))
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
+with open("config.json", "r") as f:
|
|
|
+ data = json.load(f)
|
|
|
+ hbase_database = data["metadata"]["database"]
|
|
|
+ url = data["metadata"]["url"]
|
|
|
+ building = data["building"]
|
|
|
+ mysql = data["mysql"]
|
|
|
+ my_database = mysql["database"]
|
|
|
+ dingding = data["dingding"]
|
|
|
+ at_mobiles = data["at_mobiles"]
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def check_energy():
|
|
|
+ while True:
|
|
|
+ project_id = "Pj" + building
|
|
|
+
|
|
|
+ today = datetime.datetime.fromtimestamp(int(time.time()),
|
|
|
+ pytz.timezone('Asia/Shanghai'))
|
|
|
+
|
|
|
+ weekday = today.weekday()
|
|
|
+ if weekday == 0:
|
|
|
+ time_now = datetime.datetime.fromtimestamp(int(time.time()),
|
|
|
+ pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
|
|
|
+ if time_now == "08:00:00":
|
|
|
+
|
|
|
+ lastmonday = today - datetime.timedelta(days=weekday + 7)
|
|
|
+ conn = pymysql.connect(**mysql)
|
|
|
+ mysql_cur = conn.cursor()
|
|
|
+
|
|
|
+ count_sql = SELETE_COUNTLASTDATA_SQL % (project_id, lastmonday.strftime("%Y%m%d"), today.strftime("%Y%m%d"))
|
|
|
+ mysql_cur.execute(count_sql)
|
|
|
+ lastdatacount = mysql_cur.fetchall()[0][0]
|
|
|
+ if lastdatacount == 7:
|
|
|
+ lastdata_sql = SELETE_SUMLASTDATA_SQL % (
|
|
|
+ project_id, lastmonday.strftime("%Y%m%d"), today.strftime("%Y%m%d"))
|
|
|
+ mysql_cur.execute(lastdata_sql)
|
|
|
+ lastdata = mysql_cur.fetchall()[0][0]
|
|
|
+ last_lastmonday = lastmonday - datetime.timedelta(days=weekday + 7)
|
|
|
+ last_lastdata_sql = SELETE_SUMLASTDATA_SQL % (
|
|
|
+ project_id, last_lastmonday.strftime("%Y%m%d"), lastmonday.strftime("%Y%m%d"))
|
|
|
+ mysql_cur.execute(last_lastdata_sql)
|
|
|
+ last_lastdata = mysql_cur.fetchall()[0][0]
|
|
|
+ message = "【博锐周报数据】 %s,上上周能耗数据:%s,上周能耗数据:%s %s" % (datetime_now(), last_lastdata, lastdata, at_mobiles)
|
|
|
+ print(message)
|
|
|
+
|
|
|
+
|
|
|
+ else:
|
|
|
+ message = "【博锐周报数据】 %s,上周能耗数据有缺失,请及时核对!!!%s" % (datetime_now(), at_mobiles)
|
|
|
+ print(message)
|
|
|
+
|
|
|
+
|
|
|
+ mysql_cur.close()
|
|
|
+ conn.close()
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+def transfer_energy():
|
|
|
+ print("同步%s项目数据"%(building))
|
|
|
+ while True:
|
|
|
+ project_id = "Pj" + building
|
|
|
+ time_now = datetime.datetime.fromtimestamp(int(time.time()),
|
|
|
+ pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
|
|
|
+
|
|
|
+ am_set_time = "08:00:00"
|
|
|
+ pm_set_time = "14:50:00"
|
|
|
+ if time_now == am_set_time or time_now ==pm_set_time:
|
|
|
+ today = datetime.date.today().strftime("%Y%m%d")
|
|
|
+ yesterday = (datetime.date.today()-datetime.timedelta(days=1)).strftime("%Y%m%d")
|
|
|
+
|
|
|
+
|
|
|
+ zillionUtil = ZillionUtil(url)
|
|
|
+
|
|
|
+ MysqlUtil = MysqlUtils(**mysql)
|
|
|
+ datas = get_data_time(zillionUtil,hbase_database, "data_energydata_1d", building, yesterday, today)
|
|
|
+ if datas:
|
|
|
+
|
|
|
+
|
|
|
+ print("%s,开始删除%s的数据..."%(datetime_now(),yesterday))
|
|
|
+ delete_sql = DELETE_SQL% (project_id,yesterday,today)
|
|
|
+ MysqlUtil.update(delete_sql)
|
|
|
+
|
|
|
+ energy_cooling = "0"
|
|
|
+ energy_heating = "0"
|
|
|
+ energy_ac_terminal = "0"
|
|
|
+ energy_light = "0"
|
|
|
+ sum_data_value = "0"
|
|
|
+ for i in datas:
|
|
|
+
|
|
|
+ if i["energyModelNodeSign"] == "EI1001":
|
|
|
+ sum_data_value = i["data_value"]
|
|
|
+
|
|
|
+
|
|
|
+ if i["energyModelNodeSign"] == "EI10102010101001":
|
|
|
+ energy_cooling = i["data_value"]
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ if i["energyModelNodeSign"] == "EI10102010102001":
|
|
|
+ energy_ac_terminal = i["data_value"]
|
|
|
+
|
|
|
+ if i["energyModelNodeSign"] == "EI1010202001":
|
|
|
+ energy_light = i["data_value"]
|
|
|
+ energy_other = float(sum_data_value) - float(energy_cooling) - float(energy_heating) -float(energy_light) - float(energy_ac_terminal)
|
|
|
+ sql = "('%s','%s','%s','%s','%s','%s','%s','%s','%s')" % (
|
|
|
+ project_id, yesterday,energy_cooling,energy_heating,energy_ac_terminal,energy_light,energy_other, datetime_now(), datetime_now())
|
|
|
+ inser_sql = INSERT_SQL % (my_database, "energy_week_day") + sql
|
|
|
+ print("%s,开始插入数据..."%datetime_now())
|
|
|
+ MysqlUtil.update(inser_sql)
|
|
|
+
|
|
|
+
|
|
|
+ else:
|
|
|
+ print("%s,没有查询到数据...")
|
|
|
+ time.sleep(2)
|
|
|
+ print("%s,等待下次程序执行" % (datetime_now()))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ check_energy_process = multiprocessing.Process(target=check_energy)
|
|
|
+ transfer_energy_process = multiprocessing.Process(target=transfer_energy)
|
|
|
+ check_energy_process.start()
|
|
|
+ transfer_energy_process.start()
|