start.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from MyUtils.MysqlUtils import MysqlUtils
  2. from MyUtils.DateUtils import get_day
  3. from MyUtils.Dingtalk import send_message_mobiles
  4. import datetime
  5. import pytz
  6. import json
  7. import time
  8. import os
  9. SELETE_SQL = "SELECT " \
  10. "t.project_id, " \
  11. "t.object_id," \
  12. "t.source_type," \
  13. "t.user_id," \
  14. "t.user_phone," \
  15. "t.user_name," \
  16. "t.value_type," \
  17. "t.item_id," \
  18. "t.create_time," \
  19. "COUNT(*) " \
  20. "FROM %s.feedback t WHERE " \
  21. "t.create_time >= '%s' AND t.create_time < '%s' GROUP BY " \
  22. "t.project_id, " \
  23. "t.object_id," \
  24. "t.source_type," \
  25. "t.user_id," \
  26. "t.user_phone," \
  27. "t.user_name," \
  28. "t.value_type," \
  29. "t.item_id," \
  30. "t.create_time" \
  31. " HAVING COUNT(*) > 1"
  32. SELETE_FEEDBACK_COUNT_SQL = "select count(*) from %s.feedback where create_time >= '%s' and create_time < '%s'"
  33. SELECT_ID_SQL = "SELECT t.id FROM %s.feedback t WHERE " \
  34. "t.project_id='%s' and t.object_id %s and t.source_type %s and t.user_id %s and t.user_phone %s and t.user_name %s and t.value_type='%s' and t.item_id %s and t.create_time='%s'"
  35. INSERT_ID_DATA_SQL = "INSERT INTO %s.feedback_history (" \
  36. "id,project_id,object_id,source_type,user_id,user_phone,user_name," \
  37. "value_type,item_id,`value`,create_time,next_open_time,model,duration_type," \
  38. "custom_plan,curr_temp,nick_name, `result`,exe_result,fb_temp,`remark`) " \
  39. "SELECT * FROM %s.feedback WHERE id = '%s'"
  40. UPDATE_ID_SQL = "update %s.feedback_history set duplicate_data = '%s' where id = '%s'"
  41. SELECT_DATA_SQL = "SELECT t.project_id,t.object_id,t.source_type,t.user_id,t.user_phone,t.user_name,t.value_type,t.item_id,t.create_time FROM %s.feedback t WHERE " \
  42. "t.create_time >= '%s' AND t.create_time < '%s' " \
  43. "GROUP BY t.project_id,t.object_id,t.source_type,t.user_id,t.user_phone,t.user_name,t.value_type,t.item_id,t.create_time " \
  44. "HAVING COUNT(*) =1"
  45. INSERT_DADA_SQL = "INSERT INTO %s.feedback_history (" \
  46. "id,project_id,object_id,source_type,user_id,user_phone,user_name," \
  47. "value_type,item_id,`value`,create_time,next_open_time,model,duration_type," \
  48. "custom_plan,curr_temp,nick_name, `result`,exe_result,fb_temp,`remark`,duplicate_data) values "
  49. DELETE_DUPLICATE_SQL = "delete from %s.feedback where id ='%s'"
  50. DELETE_SQL = "delete from %s.feedback where create_time >= '%s' and create_time < '%s'"
  51. DELETE_DUPLICATE_MULT_SQL = "delete from %s.feedback where id in (select * from (SELECT id FROM %s.feedback t WHERE t.create_time >= '%s' AND t.create_time < '%s' GROUP BY t.project_id,t.object_id,t.source_type,t.user_id,t.user_phone,t.user_name,t.`value`,t.item_id,t.create_time HAVING COUNT(*) = 1) s)"
  52. def datetime_now():
  53. # datetime_now = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
  54. #容器时间
  55. # tz = pytz.timezone('Asia/Shanghai') # 东八区
  56. datetime_now = datetime.datetime.fromtimestamp(int(time.time()),
  57. pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S')
  58. return datetime_now
  59. def None_data(data):
  60. if data is None:
  61. data = "is NULL"
  62. else:
  63. data = "= '%s'" % data
  64. return data
  65. def data_sql(SQL,tag):
  66. data_infos = MysqlUtil.query(SQL % (my_database, create_time_start, create_time_end))
  67. if data_infos:
  68. print("%s 时间段%s至%s,tag为%s,开始插入数据到feedback_history表" % (datetime_now(), create_time_start, create_time_end,tag))
  69. for data in data_infos:
  70. project_id = data[0]
  71. object_id = None_data(data[1])
  72. source_type = None_data(data[2])
  73. user_id = None_data(data[3])
  74. user_phone = None_data(data[4])
  75. user_name = None_data(data[5])
  76. value_type = data[6]
  77. item_id = None_data(data[7])
  78. create_time = data[8]
  79. create_time = datetime.datetime.strftime(create_time, "%Y-%m-%d %H:%M:%S")
  80. # 获取数据的id
  81. data_infos_id = MysqlUtil.query(SELECT_ID_SQL % (
  82. my_database, project_id, object_id, source_type, user_id, user_phone, user_name, value_type, item_id,
  83. create_time))
  84. if data_infos_id:
  85. result = None
  86. for data_id in data_infos_id:
  87. # print("%s 开始插入数据%s" % (datetime_now(), data_id[0]))
  88. # #将数据插入到history表中
  89. sql1 = INSERT_ID_DATA_SQL % (my_database, my_database, data_id[0])
  90. # #打标签”Y,N“
  91. sql2 = UPDATE_ID_SQL % (my_database, tag, data_id[0])
  92. # # 删除feedback数据
  93. # sql3 = DELETE_DUPLICATE_SQL % (my_database, data_id[0])
  94. result = MysqlUtil.update_mult(sql1, sql2)
  95. if result == True:
  96. duplicate_data_sum.append(data_id[0])
  97. if tag == "Y" and result == True:
  98. MysqlUtil.update(UPDATE_ID_SQL % (my_database, "N", data_infos_id[0][0]))
  99. else:
  100. print("%s tag为%s,时间段%s至%s没有数据" % (datetime_now(),tag, create_time_start, create_time_end))
  101. with open("config.json", "r") as f:
  102. data = json.load(f)
  103. mysql = data["mysql"]
  104. my_database = mysql["database"]
  105. dingding = data["dingding"]
  106. at_mobiles = data["at_mobiles"]
  107. # mysql = {
  108. # "host": os.getenv("host"),
  109. # "port": int(os.getenv("port")),
  110. # "user": os.getenv("user"),
  111. # "passwd": os.getenv("passwd"),
  112. # "database": os.getenv("database")
  113. # }
  114. # my_database = os.getenv("database")
  115. # dingding = os.getenv("dingding")
  116. # at_mobiles = os.getenv("at_mobiles")
  117. while True:
  118. time_now = datetime.datetime.fromtimestamp(int(time.time()),
  119. pytz.timezone('Asia/Shanghai')).strftime('%H:%M:%S')
  120. # time_now = time.strftime("%H:%M:%S", time.localtime()) # 刷新
  121. am_set_time = "06:00:00"
  122. if time_now == am_set_time: # 此处设置每天定时的时间
  123. # #连接mysql
  124. MysqlUtil = MysqlUtils(**mysql)
  125. # starttime = '20230301000000'
  126. # endtime = '20230319000000'
  127. # yesterday = (datetime.datetime.today() - datetime.timedelta(days=1)).strftime("%Y%m%d")+"000000"
  128. # today = datetime.datetime.today().strftime("%Y%m%d")+"000000"
  129. today = datetime.datetime.fromtimestamp(int(time.time()),
  130. pytz.timezone('Asia/Shanghai')).strftime("%Y%m%d" ) +"000000"
  131. yesterday = (datetime.datetime.fromtimestamp(int(time.time()),
  132. pytz.timezone('Asia/Shanghai')) - datetime.timedelta(days=1)).strftime("%Y%m%d" ) +"000000"
  133. first_14day = (datetime.datetime.fromtimestamp(int(time.time()),
  134. pytz.timezone('Asia/Shanghai')) - datetime.timedelta(days=14)).strftime("%Y%m%d" ) +"000000"
  135. first_15day = (datetime.datetime.fromtimestamp(int(time.time()),
  136. pytz.timezone('Asia/Shanghai')) - datetime.timedelta(days=15)).strftime("%Y%m%d" ) +"000000"
  137. day_range = get_day(yesterday,today)
  138. for day in day_range:
  139. duplicate_data_sum = []
  140. create_time_start,create_time_end = day[0],day[1]
  141. feedback_sum = MysqlUtil.query(SELETE_FEEDBACK_COUNT_SQL%(my_database,create_time_start,create_time_end))[0][0]
  142. #获取重复数据
  143. data_sql(SELETE_SQL,"Y")
  144. #获取不重复数据
  145. data_sql(SELECT_DATA_SQL,"N")
  146. #对比查询记录和插入记录是否一致
  147. if feedback_sum == len(duplicate_data_sum):
  148. print("%s 时间段%s至%s,处理数据%s条,成功!"%(datetime_now(),create_time_start,create_time_end,feedback_sum))
  149. # message = ("【feedback表打唯一数据标签】 %s 时间段%s至%s,处理数据%s条,成功 \n") % (datetime_now(),create_time_start,create_time_end,feedback_sum)
  150. # send_message_mobiles(message,dingding,at_mobiles)
  151. #删除14天之前的数据,保留最新2周的数据
  152. del_sql = DELETE_SQL % (my_database,first_15day,first_14day)
  153. MysqlUtil.update(del_sql)
  154. else:
  155. print("%s 时间段%s至%s,查询数据%s条,插入数据%s条,失败!"%(datetime_now(),create_time_start,create_time_end,feedback_sum,len(duplicate_data_sum)))
  156. message = ("【feedback表打唯一数据标签】 %s 时间段%s至%s,查询数据%s条,插入数据%s条,失败 \n")%(datetime_now(),create_time_start,create_time_end,feedback_sum,len(duplicate_data_sum))
  157. send_message_mobiles(message,dingding,at_mobiles)
  158. MysqlUtil.close()
  159. time.sleep(2)