123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import json
- from MyUtils.ConfigUtils import ConfigUtils
- from MyUtils.ZillionUtil import ZillionUtil
- import os,time
- import datetime
- #插入hbase
- def put_hbase(hbasedatabase,hbasetable,datas):
- for i in range(0, len(datas), 1000):
- dataranges = datas[i:i + 1000]
- zillionUtil.insert(hbasedatabase, hbasetable, dataranges)
- #删除hbase数据
- def remove_hbase(hbasedatabase,hbasetable,key):
- zillionUtil.remove(hbasedatabase, hbasetable, key)
- #删除hbase数据
- def delete_hbase(hbasedatabase,hbasetable):
- Criteria = {"project_id":"3301100002"}
- zillionUtil.delete(hbasedatabase, hbasetable, Criteria)
- #获取表主键
- def get_hbase_key(hbasedatabase, hbasetable):
- keys = zillionUtil.table_key(hbasedatabase, hbasetable)
- return keys
- datetimenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- datetimenow_strp = datetime.datetime.strptime(datetimenow,"%Y-%m-%d %H:%M:%S")
- print(type(datetimenow_strp),datetimenow_strp)
- config = ConfigUtils("config.xml")
- url, hbasedatabase = config.readTop("metadata", ["url", "database"])
- sourcepath, targetpath = config.readTop("cmd", ["sourcepath", "targetpath"])
- #连接hbase
- zillionUtil = ZillionUtil(url)
- ##切换到输出目录,清空文件
- #os.chdir(targetpath)
- #os.system("rm -rf *")
- #切换到java程序工作目录,执行导出数据程序
- os.chdir(sourcepath)
- print("进入%s目录,开始执行java程序"%os.getcwd())
- cmd = "java -jar -Dfile.encoding=UTF-8 data-migration.jar"
- status = os.system(cmd)
- #如果程序执行成功
- if status == 0:
- print("导出程序执行成功")
- #切换到输出目录,判断文件是否是最新文件
- os.chdir(targetpath)
- print("进入%s目录,检查文件并写入hbase"%os.getcwd())
- list = os.listdir(os.getcwd())
- for i in list:
- print(i)
- file = i.lstrip("\\")
- cmd_mv = "mv \%s %s"%(i,file)
- print(cmd_mv)
- os.system(cmd_mv)
- updatetime = os.path.getmtime(file) #查询文件修改时间
- timeArray = time.localtime(updatetime)
- otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
- otherStyleTime_strp = datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S")
- detal_time = (otherStyleTime_strp - datetimenow_strp).total_seconds()
- print("%s文件最新修改时间:%s"%(file,otherStyleTime))
- hbasetable = file.split(".")[0]
- if int(detal_time) < 10000:
- with open(file,"r",encoding = 'utf-8') as fp:
- if fp.name == "rel_btw_objs.json":
- datas_rel = []
- for line in fp.readlines():
- data = json.loads(line)
- datas_rel.append(data)
- print("删除%s"%fp.name)
- delete_hbase(hbasedatabase,hbasetable)
- put_hbase(hbasedatabase, hbasetable, datas_rel)
- else:
- keys = get_hbase_key(hbasedatabase, hbasetable)
- datas = []
- for line in fp.readlines():
- data = json.loads(line)
- hbase_key = {}
- for key in keys:
- if key in data:
- hbase_key[key] = data[key]
- datas.append(data)
- remove_hbase(hbasedatabase,hbasetable,hbase_key)
- time.sleep(0.1)
- put_hbase(hbasedatabase,hbasetable,datas)
- print("%s写入完成"%hbasetable)
- else:
- print("检查%s导出文件数据,可能不是最新数据"%file)
- print("数据写入完成")
- os.system("apt-get update&&apt -y install curl")
- os.system("curl http://data-platform-3:80/mng/clear_cache?secret=A123456")
- os.system("curl http://sagacloud-object-service:80/object/sync/AllData?projectId=Pj3301100002")
- print("数据刷新完成")
- else:
- print("执行java程序失败")
|