12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- import json
- from physical_world_exporter.MyUtils.ConfigUtils import ConfigUtils
- from physical_world_exporter.MyUtils.ZillionUtil import ZillionUtil
- import os,time,datetime
- #插入hbase
- def put_hbase(hbasedatabase,hbasetable,datas):
- for i in range(0, len(datas), 1000):
- dataranges = datas[i:i + 1000]
- zillionUtil.put(hbasedatabase, hbasetable, dataranges)
- #删除hbase数据
- def remove_hbase(hbasedatabase,hbasetable,key):
- zillionUtil.remove(hbasedatabase, hbasetable, key)
- #删除hbase数据
- def delete_hbase(hbasedatabase,hbasetable):
- Criteria = {"project_id":"3301100002"}
- zillionUtil.delete(hbasedatabase, hbasetable, Criteria)
- #获取表主键
- def get_hbase_key(hbasedatabase, hbasetable):
- keys = zillionUtil.table_key(hbasedatabase, hbasetable)
- return keys
- datetimenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- datetimenow_strp = datetime.datetime.strptime(datetimenow,"%Y-%m-%d %H:%M:%S")
- print(type(datetimenow_strp),datetimenow_strp)
- config = ConfigUtils("physical_world_exporter/config.xml")
- url, hbasedatabase = config.readTop("metadata", ["url", "database"])
- sourcepath, targetpath = config.readTop("cmd", ["sourcepath", "targetpath"])
- #连接hbase
- zillionUtil = ZillionUtil(url)
- # #切换到输出目录,清空文件
- # os.chdir(targetpath)
- # os.system("rm -rf *")
- #切换到java程序工作目录,执行导出数据程序
- os.chdir(sourcepath)
- print("进入%s目录,开始执行java程序"%os.getcwd())
- cmd = "java -jar -Dfile.encoding=UTF-8 data-migration.jar"
- status = os.system(cmd)
- #如果程序执行成功
- if status == 0:
- print("导出程序执行成功")
- #切换到输出目录,判断文件是否是最新文件
- os.chdir(targetpath)
- print("进入%s目录,检查文件并写入hbase"%os.getcwd())
- list = os.listdir(os.getcwd())
- for file in list:
- #linux 导出程序bug,需要处理下文件名称
- # print(i)
- # file = i.lstrip("\\")
- # cmd_mv = "mv \%s %s"%(i,file)
- # print(cmd_mv)
- # os.system(cmd_mv)
- updatetime = os.path.getmtime(file) #查询文件修改时间
- timeArray = time.localtime(updatetime)
- otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
- otherStyleTime_strp = datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S")
- detal_time = (otherStyleTime_strp - datetimenow_strp).total_seconds()
- print("%s文件最新修改时间:%s"%(file,otherStyleTime))
- hbasetable = file.split(".")[0]
- if int(detal_time) < 10000:
- with open(file,"r",encoding = 'utf-8') as fp:
- print(fp.name)
- if file == "rel_btw_objs.json":
- datas_rel = []
- for line in fp.readlines():
- data = json.loads(line)
- datas_rel.append(data)
- print("删除%s"%fp.name)
- delete_hbase(hbasedatabase,hbasetable)
- put_hbase(hbasedatabase, hbasetable, datas_rel)
- else:
- keys = get_hbase_key(hbasedatabase, hbasetable)
- datas = []
- for line in fp.readlines():
- data = json.loads(line)
- hbase_key = {}
- for key in keys:
- if key in data:
- hbase_key[key] = data[key]
- datas.append(data)
- remove_hbase(hbasedatabase,hbasetable,hbase_key)
- time.sleep(0.1)
- put_hbase(hbasedatabase,hbasetable,datas)
- print("%s写入完成"%hbasetable)
- else:
- print("检查%s导出文件数据,可能不是最新数据"%file)
- else:
- print("执行java程序失败")
|