Browse Source

fix adm 数据迁移 异常出来 关系日志分批量入库

lvxianyun 2 years ago
parent
commit
be5de96343

+ 1 - 1
adm-business/adm-middleware/src/main/java/com/persagy/proxy/migration/controller/DataMigrationController.java

@@ -74,7 +74,7 @@ public class DataMigrationController {
         try {
 
             if (migrationInfo == null || StrUtil.isBlank(migrationInfo.getTargetUrl())) {
-                throw new BusinessException(ResponseCode.A0402.getCode(), ResponseCode.A0402.getDesc());
+                throw new BusinessException(ResponseCode.A0402.toString());
             }
             InstanceUrlParam context = AdmContextUtil.toDmpContext();
             //确定集团编码 和 项目id

+ 5 - 5
adm-business/adm-middleware/src/main/java/com/persagy/proxy/migration/service/Impl/MigrationAbstractServiceImpl.java

@@ -385,7 +385,7 @@ public class MigrationAbstractServiceImpl<T> implements IMigrationAbstractServic
             syncData.setTableName(tableName);
             syncData.setTargetId("error");
             syncData.setError(msg);
-            syncData.setSign(1);
+            syncData.setSign(3);//异常
             syncDataList.add(syncData);
         }
         return syncDataList;
@@ -432,12 +432,12 @@ public class MigrationAbstractServiceImpl<T> implements IMigrationAbstractServic
                 }
                 return DataMigrationResponse.success(JsonNodeUtils.toEntity(arrayNode, clazz, null));
             }else {
-                log.error(commonResult.getMessage());
-                return DataMigrationResponse.error(commonResult.getMessage());
+                log.error(StrUtil.format("获取中台数据失败{}",commonResult.getMessage()));
+                return DataMigrationResponse.error(StrUtil.format("获取中台数据失败{}",commonResult.getMessage()));
             }
         }catch (Exception e){
-            log.error(e.getMessage());
-            return DataMigrationResponse.error(e.getMessage());
+            log.error(StrUtil.format("获取中台数据异常{}",e.getMessage()));
+            return DataMigrationResponse.error(StrUtil.format("获取中台数据异常{}",e.getMessage()));
         }
 
     }

+ 16 - 12
adm-business/adm-middleware/src/main/java/com/persagy/proxy/migration/service/Impl/ObjectDigitalMigration.java

@@ -84,9 +84,9 @@ public class ObjectDigitalMigration extends MigrationAbstractServiceImpl<ObjectD
             log.info("######################### dt_object "+objType.getCode()+"同步开始 #########################");
 
             QueryCriteria queryCriteria = getQueryCriteria(objType.getCode());
-            List<SyncData> syncDataList = startMigrateForLog(context,migrationInfo,queryCriteria);
+            startMigrateForLog(context,migrationInfo,queryCriteria);
 
-            log.info("######################### dt_object "+objType.getCode()+"同步结束 共计:"+syncDataList.size()+"条 #########################");
+            log.info("######################### dt_object "+objType.getCode()+"同步结束 #########################");
         }
         long end = System.currentTimeMillis();
         log.info("######################### dt_object 数据迁移已结束 时间:"+(end-start)+" #########################");
@@ -190,8 +190,7 @@ public class ObjectDigitalMigration extends MigrationAbstractServiceImpl<ObjectD
      * 迁移数据,记录日志
      *
      */
-    private List<SyncData> startMigrateForLog(InstanceUrlParam context, MigrationInfo migrationInfo, QueryCriteria queryCriteria) {
-        List<SyncData> syncDataList = new ArrayList<>();
+    private void startMigrateForLog(InstanceUrlParam context, MigrationInfo migrationInfo, QueryCriteria queryCriteria) {
         List<ObjectNode> admData = getAdmData(context,queryCriteria);
 
         String queryUrl = requestUrl(context, migrationInfo, MigrationType.QUERY.getCode());
@@ -205,7 +204,9 @@ public class ObjectDigitalMigration extends MigrationAbstractServiceImpl<ObjectD
         String insertUrl = requestUrl(context, migrationInfo, MigrationType.CREATE.getCode());
         if(CollUtil.isEmpty(projObjectNodeList)){
             DataMigrationResponse dataMigrationResponse = insertBatch(admData, ObjectDigital.class, insertUrl);
-            syncDataList = processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode());
+            List<SyncData> syncDataList = processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode());
+            //处理并保存日志
+            super.addSynLog(context, syncDataList);
         }
         Map<String,Object> projectMap = toEntityMap(projObjectNodeList, ObjectDigital.class);
         Map<String,Object> admMap = toEntityMap(admData, ObjectDigital.class);
@@ -216,11 +217,13 @@ public class ObjectDigitalMigration extends MigrationAbstractServiceImpl<ObjectD
             //调用中台验证待新增的数据
             List<ObjectDigital> digitalList = validateInfoCode(insertData, context);
             //将验证结果放到日志
-            syncDataList.addAll(processDataForLog(DataMigrationResponse.success(digitalList), MigrationType.CREATE.getCode()));
+            //处理并保存日志
+            super.addSynLog(context,processDataForLog(DataMigrationResponse.success(digitalList), MigrationType.CREATE.getCode()));
 
             //插入数据
             DataMigrationResponse dataMigrationResponse = insertBatch(insertData, ObjectDigital.class, insertUrl);
-            syncDataList.addAll(processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode()));
+            //处理并保存日志
+            super.addSynLog(context,processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode()));
         }
 
         //差集 删除
@@ -233,7 +236,8 @@ public class ObjectDigitalMigration extends MigrationAbstractServiceImpl<ObjectD
             List<String> successIds = (List<String>) dataMigrationResponse.getData();
             List<ObjectNode> delObjs = toListByIds(successIds, projObjectNodeList);
             dataMigrationResponse.setData(delObjs);
-            syncDataList.addAll(processDataForLog(dataMigrationResponse, MigrationType.DELETE.getCode()));
+            //处理并保存日志
+            super.addSynLog(context,processDataForLog(dataMigrationResponse, MigrationType.DELETE.getCode()));
         }
 
         //交集更新
@@ -246,16 +250,16 @@ public class ObjectDigitalMigration extends MigrationAbstractServiceImpl<ObjectD
                 List<ObjectNode> updateData = toList(compareData, admData);
                 //调用中台验证待更新的数据
                 List<ObjectDigital> digitalList = validateInfoCode(updateData, context);
-                //将验证结果放到日志
-                syncDataList.addAll(processDataForLog(DataMigrationResponse.success(digitalList), MigrationType.UPDATE.getCode()));
+                //将验证结果放到日志并保存
+                super.addSynLog(context,processDataForLog(DataMigrationResponse.success(digitalList), MigrationType.UPDATE.getCode()));
 
                 //更新
                 String updateUrl = requestUrl(context, migrationInfo, MigrationType.UPDATE.getCode());
                 DataMigrationResponse dataMigrationResponse = updateBatch(updateData, ObjectDigital.class, updateUrl);
-                syncDataList.addAll(processDataForLog(dataMigrationResponse, MigrationType.UPDATE.getCode()));
+                //处理并保存日志
+                super.addSynLog(context,processDataForLog(dataMigrationResponse, MigrationType.UPDATE.getCode()));
             }
         }
-       return super.addSynLog(context, syncDataList);
     }
 
     /**

+ 13 - 37
adm-business/adm-middleware/src/main/java/com/persagy/proxy/migration/service/Impl/ObjectRelationMigration.java

@@ -101,36 +101,18 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
         log.info("######################### dt_relation 同步开始 #########################");
         long start = System.currentTimeMillis();
         ExecutorService service = getExcecutor();
-        List<SyncData> syncDataList = new ArrayList<>();
-        List<Future<List<SyncData>>> list = new ArrayList<>();
 
         for (MiGrationRelCode miGrationRelCode: MiGrationRelCode.values()){
             log.info("######################### dt_relation "+miGrationRelCode.getCode()+"数据迁移开始 #########################");
 
-            Future<List<SyncData>> future = service.submit(new Callable<List<SyncData>>(){
+            service.submit(new Runnable() {
                 @Override
-                public List<SyncData> call() throws Exception {
-                    return startMigrateForLog(context, migrationInfo, miGrationRelCode.getCode());
+                public void run() {
+                    startMigrateForLog(context, migrationInfo, miGrationRelCode.getCode());
                 }
             });
-            list.add(future);
         }
         service.shutdown();
-        for (Future<List<SyncData>> future : list) {
-            try {
-                List<SyncData> syncDatas = future.get();
-                if(CollUtil.isNotEmpty(syncDatas)){
-                    syncDataList.addAll(syncDatas);
-                }
-
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-                log.error("######################### dt_relation 数据迁移失败"+e.getMessage());
-            } catch (ExecutionException e) {
-                e.printStackTrace();
-                log.error("######################### dt_relation 数据迁移失败"+e.getMessage());
-            }
-        }
         long end = System.currentTimeMillis();
         log.info("######################### dt_relation 同步结束 "+(end-start)+"#########################");
         return AdmResponse.success();
@@ -146,15 +128,6 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
     public String migrateForSql(InstanceUrlParam context) {
         StringBuffer sqls = new StringBuffer(" \n -- 数据迁移  dt_relation ------ \n");
         //获取已经采集的数据
-        /* StringBuffer sqls_temp = new StringBuffer();
-        for (MiGrationRelCode miGrationRelCode: MiGrationRelCode.values()){
-            QueryCriteria queryCriteria = getQueryCriteria(miGrationRelCode.getCode());
-            List<ObjectRelation> admData = getAdmData(context,queryCriteria);
-            if(CollUtil.isEmpty(admData)){
-                continue;
-            }
-        }*/
-
         String sqls_temp = "";
         ExecutorService service = getExcecutor();
         List<Future<StringBuffer>> futureList = new ArrayList<>();
@@ -345,9 +318,8 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
         return dataMigrationExcels;
     }
 
-    private List<SyncData> startMigrateForLog(InstanceUrlParam context, MigrationInfo migrationInfo, String code){
+    private void startMigrateForLog(InstanceUrlParam context, MigrationInfo migrationInfo, String code){
         QueryCriteria queryCriteria = getQueryCriteria(code);
-        List<SyncData> syncDataList = new ArrayList<>();
         List<ObjectRelation> admRelations = getAdmData(context, queryCriteria);
 
         String queryUrl = requestUrl(context, migrationInfo, MigrationType.QUERY.getCode());
@@ -359,7 +331,9 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
         String insertUrl = requestUrl(context, migrationInfo, MigrationType.CREATE.getCode());
         if(CollUtil.isEmpty(projectRelations)){
             DataMigrationResponse dataMigrationResponse = insertBatch(admRelations, ObjectRelation.class, insertUrl);
-            syncDataList = processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode());
+            List<SyncData> syncDataList = processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode());
+            //处理并保存日志
+            super.addSynLog(context,syncDataList);
         }
         List<ObjectNode> objectNodeListPro = JsonNodeUtils.toListNode(projectRelations, null, null);
         Map<String,Object> projectDefineMap = toEntityMap(objectNodeListPro, ObjectRelation.class);
@@ -371,7 +345,8 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
         if(!CollUtil.isEmpty(doSubtractFromInsert)){
             List<ObjectRelation> insertData = toList(doSubtractFromInsert, admRelations);
             DataMigrationResponse dataMigrationResponse = insertBatch(insertData, ObjectRelation.class, insertUrl);
-            syncDataList.addAll(processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode()));
+            //处理并保存日志
+            super.addSynLog(context,processDataForLog(dataMigrationResponse, MigrationType.CREATE.getCode()));
         }
 
         //差集 删除
@@ -381,7 +356,8 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
             String delUrl = requestUrl(context, migrationInfo, MigrationType.DELETE.getCode());
             //处理删除的数据
             DataMigrationResponse dataMigrationResponse = deleteBatch(deleteIds, delUrl);
-            syncDataList.addAll(processDataForLog(dataMigrationResponse, MigrationType.DELETE.getCode()));
+            //处理并保存日志
+            super.addSynLog(context,processDataForLog(dataMigrationResponse, MigrationType.DELETE.getCode()));
         }
 
         //交集更新
@@ -393,10 +369,10 @@ public class ObjectRelationMigration extends MigrationAbstractServiceImpl<Object
                 List<ObjectRelation> updateData = toList(compareData, admRelations);
                 String updateUrl = requestUrl(context, migrationInfo, MigrationType.UPDATE.getCode());
                 DataMigrationResponse dataMigrationResponse = updateBatch(updateData, ObjectRelation.class, updateUrl);
-                syncDataList.addAll(processDataForLog(dataMigrationResponse, MigrationType.UPDATE.getCode()));
+                //处理并保存日志
+                super.addSynLog(context,processDataForLog(dataMigrationResponse, MigrationType.UPDATE.getCode()));
             }
         }
-        return super.addSynLog(context, syncDataList);
     }