from MyUtils.MysqlUtils import MysqlUtils from MyUtils.DateUtils import get_day from MyUtils.Dingtalk import send_message_mobiles import datetime import pytz import json import time import os SELETE_SQL = "SELECT " \ "t.project_id, " \ "t.object_id," \ "t.source_type," \ "t.user_id," \ "t.user_phone," \ "t.user_name," \ "t.value_type," \ "t.item_id," \ "t.create_time," \ "COUNT(*) " \ "FROM %s.feedback t WHERE " \ "t.create_time >= '%s' AND t.create_time < '%s' GROUP BY " \ "t.project_id, " \ "t.object_id," \ "t.source_type," \ "t.user_id," \ "t.user_phone," \ "t.user_name," \ "t.value_type," \ "t.item_id," \ "t.create_time" \ " HAVING COUNT(*) > 1" SELETE_FEEDBACK_COUNT_SQL = "select count(*) from %s.feedback where create_time >= '%s' and create_time < '%s'" SELECT_ID_SQL = "SELECT t.id FROM %s.feedback t WHERE " \ "t.project_id='%s' and t.object_id %s and t.source_type %s and t.user_id %s and t.user_phone %s and t.user_name %s and t.value_type='%s' and t.item_id %s and t.create_time='%s'" INSERT_ID_DATA_SQL = "INSERT INTO %s.feedback_history (" \ "id,project_id,object_id,source_type,user_id,user_phone,user_name," \ "value_type,item_id,`value`,create_time,next_open_time,model,duration_type," \ "custom_plan,curr_temp,nick_name, `result`,exe_result,fb_temp,`remark`) " \ "SELECT * FROM %s.feedback WHERE id = '%s'" UPDATE_ID_SQL = "update %s.feedback_history set duplicate_data = '%s' where id = '%s'" SELECT_DATA_SQL = "SELECT t.project_id,t.object_id,t.source_type,t.user_id,t.user_phone,t.user_name,t.value_type,t.item_id,t.create_time FROM %s.feedback t WHERE " \ "t.create_time >= '%s' AND t.create_time < '%s' " \ "GROUP BY t.project_id,t.object_id,t.source_type,t.user_id,t.user_phone,t.user_name,t.value_type,t.item_id,t.create_time " \ "HAVING COUNT(*) =1" INSERT_DADA_SQL = "INSERT INTO %s.feedback_history (" \ "id,project_id,object_id,source_type,user_id,user_phone,user_name," \ "value_type,item_id,`value`,create_time,next_open_time,model,duration_type," \ "custom_plan,curr_temp,nick_name, `result`,exe_result,fb_temp,`remark`,duplicate_data) values " DELETE_DUPLICATE_SQL = "delete from %s.feedback where id ='%s'" DELETE_SQL = "delete from %s.feedback where create_time >= '%s' and create_time < '%s'" DELETE_DUPLICATE_MULT_SQL = "delete from %s.feedback where id in (select * from (SELECT id FROM %s.feedback t WHERE t.create_time >= '%s' AND t.create_time < '%s' GROUP BY t.project_id,t.object_id,t.source_type,t.user_id,t.user_phone,t.user_name,t.`value`,t.item_id,t.create_time HAVING COUNT(*) = 1) s)" def datetime_now(): # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S") #容器时间 # tz = pytz.timezone('Asia/Shanghai') # 东八区 datetime_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') return datetime_now def None_data(data): if data is None: data = "is NULL" else: data = "= '%s'" % data return data def data_sql(SQL,tag): data_infos = MysqlUtil.query(SQL % (my_database, create_time_start, create_time_end)) if data_infos: print("%s 时间段%s至%s,tag为%s,开始插入数据到feedback_history表" % (datetime_now(), create_time_start, create_time_end,tag)) for data in data_infos: project_id = data[0] object_id = None_data(data[1]) source_type = None_data(data[2]) user_id = None_data(data[3]) user_phone = None_data(data[4]) user_name = None_data(data[5]) value_type = data[6] item_id = None_data(data[7]) create_time = data[8] create_time = datetime.datetime.strftime(create_time, "%Y-%m-%d %H:%M:%S") # 获取数据的id data_infos_id = MysqlUtil.query(SELECT_ID_SQL % ( my_database, project_id, object_id, source_type, user_id, user_phone, user_name, value_type, item_id, create_time)) if data_infos_id: result = None for data_id in data_infos_id: # print("%s 开始插入数据%s" % (datetime_now(), data_id[0])) # #将数据插入到history表中 sql1 = INSERT_ID_DATA_SQL % (my_database, my_database, data_id[0]) # #打标签”Y,N“ sql2 = UPDATE_ID_SQL % (my_database, tag, data_id[0]) # # 删除feedback数据 # sql3 = DELETE_DUPLICATE_SQL % (my_database, data_id[0]) result = MysqlUtil.update_mult(sql1, sql2) if result == True: duplicate_data_sum.append(data_id[0]) if tag == "Y" and result == True: MysqlUtil.update(UPDATE_ID_SQL % (my_database, "N", data_infos_id[0][0])) else: print("%s tag为%s,时间段%s至%s没有数据" % (datetime_now(),tag, create_time_start, create_time_end)) with open("config.json", "r") as f: data = json.load(f) mysql = data["mysql"] my_database = mysql["database"] dingding = data["dingding"] at_mobiles = data["at_mobiles"] # mysql = { # "host": os.getenv("host"), # "port": int(os.getenv("port")), # "user": os.getenv("user"), # "passwd": os.getenv("passwd"), # "database": os.getenv("database") # } # my_database = os.getenv("database") # dingding = os.getenv("dingding") # at_mobiles = os.getenv("at_mobiles") while True: time_now = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S') # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新 am_set_time = "06:00:00" if time_now == am_set_time: # 此处设置每天定时的时间 # #连接mysql MysqlUtil = MysqlUtils(**mysql) # starttime = '20230301000000' # endtime = '20230319000000' # yesterday = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime("%Y%m%d")+"000000" # today = datetime.datetime.today().strftime("%Y%m%d")+"000000" today = datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')).strftime("%Y%m%d" ) +"000000" yesterday = (datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')) - datetime.timedelta(days=1)).strftime("%Y%m%d" ) +"000000" first_14day = (datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')) - datetime.timedelta(days=14)).strftime("%Y%m%d" ) +"000000" first_15day = (datetime.datetime.fromtimestamp(int(time.time()), pytz.timezone('Asia/Shanghai')) - datetime.timedelta(days=15)).strftime("%Y%m%d" ) +"000000" day_range = get_day(yesterday,today) for day in day_range: duplicate_data_sum = [] create_time_start,create_time_end = day[0],day[1] feedback_sum = MysqlUtil.query(SELETE_FEEDBACK_COUNT_SQL%(my_database,create_time_start,create_time_end))[0][0] #获取重复数据 data_sql(SELETE_SQL,"Y") #获取不重复数据 data_sql(SELECT_DATA_SQL,"N") #对比查询记录和插入记录是否一致 if feedback_sum == len(duplicate_data_sum): print("%s 时间段%s至%s,处理数据%s条,成功!"%(datetime_now(),create_time_start,create_time_end,feedback_sum)) # message = ("【feedback表打唯一数据标签】 %s 时间段%s至%s,处理数据%s条,成功 \n") % (datetime_now(),create_time_start,create_time_end,feedback_sum) # send_message_mobiles(message,dingding,at_mobiles) #删除14天之前的数据,保留最新2周的数据 del_sql = DELETE_SQL % (my_database,first_15day,first_14day) MysqlUtil.update(del_sql) else: print("%s 时间段%s至%s,查询数据%s条,插入数据%s条,失败!"%(datetime_now(),create_time_start,create_time_end,feedback_sum,len(duplicate_data_sum))) message = ("【feedback表打唯一数据标签】 %s 时间段%s至%s,查询数据%s条,插入数据%s条,失败 \n")%(datetime_now(),create_time_start,create_time_end,feedback_sum,len(duplicate_data_sum)) send_message_mobiles(message,dingding,at_mobiles) MysqlUtil.close() time.sleep(2)