Ver código fonte

去掉binlog文件查询,增加断点续传

weiyizhong 3 anos atrás
pai
commit
177ac6d464

+ 28 - 21
src/main/java/com/persagy/binlog/BinlogClientRunner.java

@@ -25,10 +25,7 @@ import org.springframework.scheduling.annotation.Async;
 import java.io.Serializable;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * binlog监听类
@@ -46,9 +43,6 @@ public class BinlogClientRunner implements CommandLineRunner {
     @Autowired
     private RwdObjectWdMapper rwdObjectWdMapper;
 
-    @Autowired
-    private DBUtil dbUtil;
-
     @Value("${binlog.host}")
     private String host;
 
@@ -119,14 +113,6 @@ public class BinlogClientRunner implements CommandLineRunner {
                     log.info(tableName+"同步第"+i+"页,数量"+rwdObjectWds.size());
                 }
             }
-            if(binlogPosition==null){
-                binlogPosition = new BinlogPosition();
-            }
-            binlogPosition.setServerId(serverId);
-            //查询binlogName
-            String binlogName = dbUtil.queryBinlogName("show binary logs");
-            binlogPosition.setBinlogName(binlogName);
-            binlogPositionService.saveOrUpdate(binlogPosition);
         }
 
 
@@ -199,30 +185,51 @@ public class BinlogClientRunner implements CommandLineRunner {
                         }
                     }
                 }
+                //处理rotate事件
+                saveRotatePosition( event);
             }
         }));
         client.connect();
     }
 
+    private void saveRotatePosition(Event event){
+        List<EventType> excludePositionEventType = new ArrayList<>();
+        excludePositionEventType.add(EventType.FORMAT_DESCRIPTION);
+        excludePositionEventType.add(EventType.HEARTBEAT);
+        if (!excludePositionEventType.contains(event.getHeader().getEventType())) {
+            //处理rotate事件,这里会替换调binlog fileName
+            if (event.getHeader().getEventType().equals(EventType.ROTATE)) {
+                BinlogPosition binlogPositionSave = binlogPositionService.getPosition();
+                if(null == binlogPositionSave){
+                    binlogPositionSave = new BinlogPosition();
+                }
+                RotateEventData rotateEventData = (RotateEventData) event.getData();
+                binlogPositionSave.setBinlogName(rotateEventData.getBinlogFilename());
+                binlogPositionSave.setPosition(rotateEventData.getBinlogPosition());
+                binlogPositionSave.setServerId(event.getHeader().getServerId());
+                //将最新的配置保存到mysql中
+                log.info("保存的数据{}", JSON.toJSONString(binlogPositionSave));
+                binlogPositionService.saveOrUpdate(binlogPositionSave);
+            }
+        }
+    }
+
     private void saveBinlogPosition(Event event) {
         BinlogPosition binlogPositionSave = new BinlogPosition();
         //处理rotate事件,这里会替换调binlog fileName
         if (event.getHeader().getEventType().equals(EventType.ROTATE)) {
-            RotateEventData rotateEventData = (RotateEventData) event.getData();
-            binlogPositionSave.setBinlogName(rotateEventData.getBinlogFilename());
-            binlogPositionSave.setPosition(rotateEventData.getBinlogPosition());
-            binlogPositionSave.setServerId(event.getHeader().getServerId());
+            saveRotatePosition(event);
         } else {
             //统一处理事件对应的binlog position
             binlogPositionSave = binlogPositionService.getPosition();
-            if(binlogPositionSave==null){
+            if(null == binlogPositionSave){
                 binlogPositionSave = new BinlogPosition();
             }
             EventHeaderV4 eventHeaderV4 = (EventHeaderV4) event.getHeader();
             binlogPositionSave.setPosition(eventHeaderV4.getPosition());
             binlogPositionSave.setServerId(event.getHeader().getServerId());
         }
-        //将最新的配置保存到Redis中
+        //将最新的配置保存到mysql
         log.info("保存的数据{}", JSON.toJSONString(binlogPositionSave));
         binlogPositionService.saveOrUpdate(binlogPositionSave);
     }

+ 1 - 0
src/main/java/com/persagy/utils/DBUtil.java

@@ -59,6 +59,7 @@ public class DBUtil {
         //提取查询结果
         String binlogName = "";
         while(rs.next()){
+            //取最后一个binlog文件
             binlogName = rs.getString("log_name");
         }
         close(conn);