RepairTable.java 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package com.sagacloud;
  2. /*
  3. * Author: Jxing
  4. * Create Time: 2019/3/12
  5. */
  6. import com.alibaba.fastjson.*;
  7. import com.google.common.base.Strings;
  8. import com.sagacloud.hbase.HbaseScanIndex;
  9. import com.sagacloud.hbase.ZillionOpUtil;
  10. import com.sagacloud.pojo.HBaseTable;
  11. import com.sagacloud.hbase.HBaseScanUtil;
  12. import com.sagacloud.pojo.InfosObj;
  13. import com.zillion.database.agent.ZillionAgent;
  14. import com.zillion.util.table.HTableUtil;
  15. import com.zillion.util.table.ZillionTableIndex;
  16. import com.zillion.util.table.ZillionTableSchema;
  17. import org.apache.hadoop.hbase.client.Result;
  18. import org.apache.hadoop.hbase.client.ResultScanner;
  19. import java.io.IOException;
  20. import java.util.*;
  21. public class RepairTable {
  22. public static void main(String[] args) {
  23. args = new String[]{"-b", "physical_world_v3_226", "-p", "1101080001"};
  24. HBaseTable tableInfo = new HBaseTable();
  25. boolean complete = HBaseTable.getInfo(args, tableInfo);
  26. if(!complete){
  27. System.out.println("参数错误, -b 数据库名, -p 项目号(不带Pj)");
  28. return;
  29. }
  30. ZillionAgent agent = ZillionAgentProvider.GetAgent();
  31. ResultScanner scanner = null;
  32. JSONObject criteria = new JSONObject();
  33. if(tableInfo.getProjectId() != null && tableInfo.getProjectId().length() > 0){
  34. criteria.put("project_id", tableInfo.getProjectId());
  35. }
  36. ZillionTableSchema schema = agent.container.repositoryMap.get(tableInfo.getDatabase()).GetTable(tableInfo.getDatatable());
  37. ZillionTableIndex index = HbaseScanIndex.zillionTableIndex(schema, criteria, null);
  38. try {
  39. scanner = getScanner(agent, tableInfo, criteria);
  40. } catch (Exception e) {
  41. System.out.println(e.getMessage());
  42. }
  43. fixData(scanner, index, schema, tableInfo);
  44. agent.Stop();
  45. }
  46. private static void fixData(ResultScanner scanner, ZillionTableIndex index, ZillionTableSchema schema, HBaseTable tableInfo) {
  47. if(scanner == null)
  48. return;
  49. // key --> objId + infoId , value --> [ info_value, ArrayList<JSONObjectRow> ]
  50. Map<String, ArrayList<InfosObj>> fixMap = new LinkedHashMap<>();
  51. ArrayList<InfosObj> deleteArr = new ArrayList<>();
  52. try {
  53. String curProjectId = "aaa";
  54. while(true) {
  55. Result[] rs = scanner.next(2000);
  56. if (rs == null || rs.length == 0) {
  57. break;
  58. }
  59. for (int i = 0; i < rs.length; ++i) {
  60. if (HTableUtil.valid(schema, rs[i].listCells())) {
  61. JSONObject resultObject = HBaseScanUtil.Convert(schema, index, rs[i]);
  62. if (!resultObject.containsKey("obj_type") || !resultObject.containsKey("obj_id")
  63. || !resultObject.containsKey("info_id") || !resultObject.containsKey("time")) {
  64. throw new Exception("该表不是数据平台3的obj_infos表");
  65. }
  66. InfosObj infosObj = resultObject.toJavaObject(InfosObj.class);
  67. String idKey = String.join("", infosObj.getProject_id(), infosObj.getObj_type(), infosObj.getObj_id(), infosObj.getInfo_id());
  68. putToFixMap(idKey, infosObj, fixMap);
  69. if(!(curProjectId == null && infosObj.getProject_id() == null) &&
  70. (curProjectId == null || infosObj.getProject_id() == null || !curProjectId.equals(infosObj.getProject_id()))){
  71. curProjectId = infosObj.getProject_id();
  72. System.out.println("正在处理项目: " + curProjectId);
  73. continue;
  74. }
  75. }
  76. }
  77. putToDeleteArr(fixMap, deleteArr);
  78. if(deleteArr.size() > 2000){
  79. ZillionOpUtil.delete(deleteArr, tableInfo);
  80. System.out.println("正在修复 " + curProjectId);
  81. }
  82. }
  83. ZillionOpUtil.delete(deleteArr, tableInfo);
  84. } catch (Exception e) {
  85. System.out.println(e.getMessage());
  86. return;
  87. }finally {
  88. scanner.close();
  89. }
  90. System.out.println("已完成");
  91. }
  92. private static void putToDeleteArr(Map<String, ArrayList<InfosObj>> fixMap, ArrayList<InfosObj> deleteArr) {
  93. String lastKey = null;
  94. for(String idKey : fixMap.keySet()){
  95. lastKey = idKey;
  96. ArrayList<InfosObj> arr = fixMap.get(idKey);
  97. boolean isFirst = true;
  98. Object lastValue = null;
  99. for(int i = 0; i < arr.size(); ++i){
  100. InfosObj single = arr.get(i);
  101. if(isFirst){
  102. lastValue = single.getObjInfoValue();
  103. isFirst = false;
  104. continue;
  105. }
  106. if(single.getObjInfoValue() == null){
  107. if(lastValue == null) {
  108. // remove
  109. deleteArr.add(single);
  110. arr.remove(i);
  111. --i;
  112. continue;
  113. }else{
  114. lastValue = single.getObjInfoValue();
  115. }
  116. }else if(single.getObjInfoValue().equals(lastValue)){
  117. // remove
  118. deleteArr.add(single);
  119. arr.remove(i);
  120. --i;
  121. continue;
  122. }else{
  123. lastValue = single.getObjInfoValue();
  124. }
  125. }
  126. }
  127. if(lastKey == null ){
  128. return;
  129. }
  130. Iterator<String> iter = fixMap.keySet().iterator();
  131. while(iter.hasNext()) {
  132. String key = iter.next();
  133. if (!key.equals(lastKey)) {
  134. iter.remove();
  135. }
  136. }
  137. }
  138. private static void putToFixMap(String idKey, InfosObj resultObject, Map<String, ArrayList<InfosObj>> fixMap) {
  139. if(fixMap.containsKey(idKey)){
  140. fixMap.get(idKey).add(resultObject);
  141. }else{
  142. ArrayList<InfosObj> arr = new ArrayList<>();
  143. fixMap.put(idKey, arr);
  144. arr.add(resultObject);
  145. }
  146. }
  147. private static ResultScanner getScanner(ZillionAgent agent, HBaseTable tableInfo, JSONObject jsonObject) throws Exception {
  148. return HBaseScanUtil.getScan(agent, tableInfo.getDatabase(), tableInfo.getDatatable(), tableInfo.getDatatable(), jsonObject, null);
  149. }
  150. }