import json from MyUtils.ConfigUtils import ConfigUtils from MyUtils.ZillionUtil import ZillionUtil import os,time import datetime #插入hbase def put_hbase(hbasedatabase,hbasetable,datas): for i in range(0, len(datas), 1000): dataranges = datas[i:i + 1000] zillionUtil.insert(hbasedatabase, hbasetable, dataranges) #删除hbase数据 def remove_hbase(hbasedatabase,hbasetable,key): zillionUtil.remove(hbasedatabase, hbasetable, key) #删除hbase数据 def delete_hbase(hbasedatabase,hbasetable): Criteria = {"project_id":"3301100002"} zillionUtil.delete(hbasedatabase, hbasetable, Criteria) #获取表主键 def get_hbase_key(hbasedatabase, hbasetable): keys = zillionUtil.table_key(hbasedatabase, hbasetable) return keys datetimenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") datetimenow_strp = datetime.datetime.strptime(datetimenow,"%Y-%m-%d %H:%M:%S") print(type(datetimenow_strp),datetimenow_strp) config = ConfigUtils("config.xml") url, hbasedatabase = config.readTop("metadata", ["url", "database"]) sourcepath, targetpath = config.readTop("cmd", ["sourcepath", "targetpath"]) #连接hbase zillionUtil = ZillionUtil(url) ##切换到输出目录,清空文件 #os.chdir(targetpath) #os.system("rm -rf *") #切换到java程序工作目录,执行导出数据程序 os.chdir(sourcepath) print("进入%s目录,开始执行java程序"%os.getcwd()) cmd = "java -jar -Dfile.encoding=UTF-8 data-migration.jar" status = os.system(cmd) #如果程序执行成功 if status == 0: print("导出程序执行成功") #切换到输出目录,判断文件是否是最新文件 os.chdir(targetpath) print("进入%s目录,检查文件并写入hbase"%os.getcwd()) list = os.listdir(os.getcwd()) for i in list: print(i) file = i.lstrip("\\") cmd_mv = "mv \%s %s"%(i,file) print(cmd_mv) os.system(cmd_mv) updatetime = os.path.getmtime(file) #查询文件修改时间 timeArray = time.localtime(updatetime) otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray) otherStyleTime_strp = datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S") detal_time = (otherStyleTime_strp - datetimenow_strp).total_seconds() print("%s文件最新修改时间:%s"%(file,otherStyleTime)) hbasetable = file.split(".")[0] if int(detal_time) < 10000: with open(file,"r",encoding = 'utf-8') as fp: if fp.name == "rel_btw_objs.json": datas_rel = [] for line in fp.readlines(): data = json.loads(line) datas_rel.append(data) print("删除%s"%fp.name) delete_hbase(hbasedatabase,hbasetable) put_hbase(hbasedatabase, hbasetable, datas_rel) else: keys = get_hbase_key(hbasedatabase, hbasetable) datas = [] for line in fp.readlines(): data = json.loads(line) hbase_key = {} for key in keys: if key in data: hbase_key[key] = data[key] datas.append(data) remove_hbase(hbasedatabase,hbasetable,hbase_key) time.sleep(0.1) put_hbase(hbasedatabase,hbasetable,datas) print("%s写入完成"%hbasetable) else: print("检查%s导出文件数据,可能不是最新数据"%file) print("数据写入完成") os.system("apt-get update&&apt -y install curl") os.system("curl http://data-platform-3:80/mng/clear_cache?secret=A123456") os.system("curl http://sagacloud-object-service:80/object/sync/AllData?projectId=Pj3301100002") print("数据刷新完成") else: print("执行java程序失败")