Kaynağa Gözat

第一次启动项目,全量同步后续只会增量同

weiyizhong 3 yıl önce
ebeveyn
işleme
6377c7ce9f

+ 2 - 2
src/main/java/com/persagy/binlog/BinlogClientRunner.java

@@ -77,7 +77,7 @@ public class BinlogClientRunner implements CommandLineRunner {
         //先全量把数据同步一下再,增量同步数据
         //查询数据
         //表示需要同步到中台去的数据,先将监听表的infos字段信息查询出来
-        if(binlogPosition.getPosition()==null){
+        if(binlogPosition==null){
             /**开始全量同步数据=======================================*/
             log.info("开始全量同步数据=======================================");
             for (String databaseTableName:databaseList
@@ -117,7 +117,7 @@ public class BinlogClientRunner implements CommandLineRunner {
                 binlogPosition = new BinlogPosition();
             }
             binlogPosition.setServerId(serverId);
-
+            binlogPositionService.saveOrUpdate(binlogPosition);
         }
 
 

+ 0 - 123
src/main/java/com/persagy/binlog/BinlogClientRunnerCopy.java

@@ -1,123 +0,0 @@
-//package com.persagy.cn.binlog;
-//
-//import com.alibaba.fastjson.JSON;
-//import com.alibaba.fastjson.JSONObject;
-//import com.github.shyiko.mysql.binlog.BinaryLogClient;
-//import com.github.shyiko.mysql.binlog.event.*;
-//import com.persagy.cn.business.HandlerData;
-//import lombok.extern.slf4j.Slf4j;
-//import org.springframework.beans.factory.annotation.Autowired;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.boot.CommandLineRunner;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.core.annotation.Order;
-//import org.springframework.scheduling.annotation.Async;
-//
-//import java.io.Serializable;
-//import java.util.Arrays;
-//import java.util.HashMap;
-//import java.util.List;
-//import java.util.Map;
-//
-///**
-// * @Author : weiyizhong
-// * @Description: binlog监听
-// * @Date : 2021/9/15  10:37
-// * @Modified By :
-// */
-//@Slf4j
-//@Configuration
-//@Order(1000)
-//public class BinlogClientRunner  implements CommandLineRunner {
-//
-//    @Autowired
-//    private HandlerData handlerData;
-//
-//    @Value("${binlog.host}")
-//    private String host;
-//
-//    @Value("${binlog.port}")
-//    private int port;
-//
-//    @Value("${binlog.user}")
-//    private String user;
-//
-//    @Value("${binlog.password}")
-//    private String password;
-//
-//    // binlog server_id
-//    @Value("${server.id}")
-//    private long serverId;
-//
-//    // 指定监听的数据表
-//    @Value("${binlog.database.table}")
-//    private String database_table;
-//
-//    // 指定监听的数据表
-//    @Value("${binlog.database.table-format}")
-//    private String tableFormat;
-//
-//
-//    @Async
-//    @Override
-//    public void run(String... args) throws Exception {
-//        // 获取监听数据表数组
-//        List<String> databaseList = Arrays.asList(database_table.split(","));
-//        HashMap<Long, String> tableMap = new HashMap<Long, String>();
-//        // 创建binlog监听客户端
-//        BinaryLogClient client = new BinaryLogClient(host, port, user, password);
-//        client.setServerId(serverId);
-////        client.setBinlogFilename("mysql-bin.000053");
-////        client.setBinlogPosition(0);
-//        client.registerEventListener((event -> {
-//            // binlog事件
-//            EventData data = event.getData();
-//            if (data != null) {
-//                if (data instanceof TableMapEventData) {
-//                    TableMapEventData tableMapEventData = (TableMapEventData) data;
-//                    tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getDatabase() + "." + tableMapEventData.getTable());
-//                }
-//                // update数据
-//                if (data instanceof UpdateRowsEventData) {
-//                    UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
-//                    String tableName = tableMap.get(updateRowsEventData.getTableId());
-//                    if (tableName != null && databaseList.contains(tableName)) {
-//                        String eventKey = tableName + ".update";
-//                        log.info("监听数据库更新数据:{}",eventKey);
-//                        for (Map.Entry<Serializable[], Serializable[]> row : updateRowsEventData.getRows()) {
-//                            List<Serializable> entries = Arrays.asList(row.getValue());
-//                            handlerData.handlerDataToWd(tableFormat,entries,tableName,"update");
-//                        }
-//                    }
-//                }
-//                // insert数据
-//                else if (data instanceof WriteRowsEventData) {
-//                    WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data;
-//                    String tableName = tableMap.get(writeRowsEventData.getTableId());
-//                    if (tableName != null && databaseList.contains(tableName)) {
-//                        String eventKey = tableName + ".insert";
-//                        log.info("监听数据库插入数据:{}",eventKey);
-//                        for (Serializable[] row : writeRowsEventData.getRows()) {
-//                            List<Serializable> entries = Arrays.asList(row);
-//                            handlerData.handlerDataToWd(tableFormat,entries,tableName,"insert");
-//                        }
-//                    }
-//                }
-//                // delete数据
-//                else if (data instanceof DeleteRowsEventData) {
-//                    DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data;
-//                    String tableName = tableMap.get(deleteRowsEventData.getTableId());
-//                    if (tableName != null && databaseList.contains(tableName)) {
-//                        String eventKey = tableName + ".delete";
-//                        for (Serializable[] row : deleteRowsEventData.getRows()) {
-//                            System.out.println("delete======");
-//                        }
-//                    }
-//                }
-//            }
-//        }));
-//        client.connect();
-//        client.disconnect();
-//    }
-//
-//}