start.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import json
  4. from physical_world_exporter.MyUtils.ConfigUtils import ConfigUtils
  5. from physical_world_exporter.MyUtils.ZillionUtil import ZillionUtil
  6. import os,time,datetime
  7. #插入hbase
  8. def put_hbase(hbasedatabase,hbasetable,datas):
  9. for i in range(0, len(datas), 1000):
  10. dataranges = datas[i:i + 1000]
  11. zillionUtil.put(hbasedatabase, hbasetable, dataranges)
  12. #删除hbase数据
  13. def remove_hbase(hbasedatabase,hbasetable,key):
  14. zillionUtil.remove(hbasedatabase, hbasetable, key)
  15. #删除hbase数据
  16. def delete_hbase(hbasedatabase,hbasetable):
  17. Criteria = {"project_id":"3301100002"}
  18. zillionUtil.delete(hbasedatabase, hbasetable, Criteria)
  19. #获取表主键
  20. def get_hbase_key(hbasedatabase, hbasetable):
  21. keys = zillionUtil.table_key(hbasedatabase, hbasetable)
  22. return keys
  23. datetimenow = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  24. datetimenow_strp = datetime.datetime.strptime(datetimenow,"%Y-%m-%d %H:%M:%S")
  25. print(type(datetimenow_strp),datetimenow_strp)
  26. config = ConfigUtils("physical_world_exporter/config.xml")
  27. url, hbasedatabase = config.readTop("metadata", ["url", "database"])
  28. sourcepath, targetpath = config.readTop("cmd", ["sourcepath", "targetpath"])
  29. #连接hbase
  30. zillionUtil = ZillionUtil(url)
  31. # #切换到输出目录,清空文件
  32. # os.chdir(targetpath)
  33. # os.system("rm -rf *")
  34. #切换到java程序工作目录,执行导出数据程序
  35. os.chdir(sourcepath)
  36. print("进入%s目录,开始执行java程序"%os.getcwd())
  37. cmd = "java -jar -Dfile.encoding=UTF-8 data-migration.jar"
  38. status = os.system(cmd)
  39. #如果程序执行成功
  40. if status == 0:
  41. print("导出程序执行成功")
  42. #切换到输出目录,判断文件是否是最新文件
  43. os.chdir(targetpath)
  44. print("进入%s目录,检查文件并写入hbase"%os.getcwd())
  45. list = os.listdir(os.getcwd())
  46. for file in list:
  47. #linux 导出程序bug,需要处理下文件名称
  48. # print(i)
  49. # file = i.lstrip("\\")
  50. # cmd_mv = "mv \%s %s"%(i,file)
  51. # print(cmd_mv)
  52. # os.system(cmd_mv)
  53. updatetime = os.path.getmtime(file) #查询文件修改时间
  54. timeArray = time.localtime(updatetime)
  55. otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
  56. otherStyleTime_strp = datetime.datetime.strptime(otherStyleTime,"%Y-%m-%d %H:%M:%S")
  57. detal_time = (otherStyleTime_strp - datetimenow_strp).total_seconds()
  58. print("%s文件最新修改时间:%s"%(file,otherStyleTime))
  59. hbasetable = file.split(".")[0]
  60. if int(detal_time) < 10000:
  61. with open(file,"r",encoding = 'utf-8') as fp:
  62. print(fp.name)
  63. if file == "rel_btw_objs.json":
  64. datas_rel = []
  65. for line in fp.readlines():
  66. data = json.loads(line)
  67. datas_rel.append(data)
  68. print("删除%s"%fp.name)
  69. delete_hbase(hbasedatabase,hbasetable)
  70. put_hbase(hbasedatabase, hbasetable, datas_rel)
  71. else:
  72. keys = get_hbase_key(hbasedatabase, hbasetable)
  73. datas = []
  74. for line in fp.readlines():
  75. data = json.loads(line)
  76. hbase_key = {}
  77. for key in keys:
  78. if key in data:
  79. hbase_key[key] = data[key]
  80. datas.append(data)
  81. remove_hbase(hbasedatabase,hbasetable,hbase_key)
  82. time.sleep(0.1)
  83. put_hbase(hbasedatabase,hbasetable,datas)
  84. print("%s写入完成"%hbasetable)
  85. else:
  86. print("检查%s导出文件数据,可能不是最新数据"%file)
  87. else:
  88. print("执行java程序失败")