//package com.persagy.cn.binlog; // //import com.alibaba.fastjson.JSON; //import com.alibaba.fastjson.JSONObject; //import com.github.shyiko.mysql.binlog.BinaryLogClient; //import com.github.shyiko.mysql.binlog.event.*; //import com.persagy.cn.business.HandlerData; //import lombok.extern.slf4j.Slf4j; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.beans.factory.annotation.Value; //import org.springframework.boot.CommandLineRunner; //import org.springframework.context.annotation.Configuration; //import org.springframework.core.annotation.Order; //import org.springframework.scheduling.annotation.Async; // //import java.io.Serializable; //import java.util.Arrays; //import java.util.HashMap; //import java.util.List; //import java.util.Map; // ///** // * @Author : weiyizhong // * @Description: binlog监听 // * @Date : 2021/9/15 10:37 // * @Modified By : // */ //@Slf4j //@Configuration //@Order(1000) //public class BinlogClientRunner implements CommandLineRunner { // // @Autowired // private HandlerData handlerData; // // @Value("${binlog.host}") // private String host; // // @Value("${binlog.port}") // private int port; // // @Value("${binlog.user}") // private String user; // // @Value("${binlog.password}") // private String password; // // // binlog server_id // @Value("${server.id}") // private long serverId; // // // 指定监听的数据表 // @Value("${binlog.database.table}") // private String database_table; // // // 指定监听的数据表 // @Value("${binlog.database.table-format}") // private String tableFormat; // // // @Async // @Override // public void run(String... args) throws Exception { // // 获取监听数据表数组 // List databaseList = Arrays.asList(database_table.split(",")); // HashMap tableMap = new HashMap(); // // 创建binlog监听客户端 // BinaryLogClient client = new BinaryLogClient(host, port, user, password); // client.setServerId(serverId); //// client.setBinlogFilename("mysql-bin.000053"); //// client.setBinlogPosition(0); // client.registerEventListener((event -> { // // binlog事件 // EventData data = event.getData(); // if (data != null) { // if (data instanceof TableMapEventData) { // TableMapEventData tableMapEventData = (TableMapEventData) data; // tableMap.put(tableMapEventData.getTableId(), tableMapEventData.getDatabase() + "." + tableMapEventData.getTable()); // } // // update数据 // if (data instanceof UpdateRowsEventData) { // UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data; // String tableName = tableMap.get(updateRowsEventData.getTableId()); // if (tableName != null && databaseList.contains(tableName)) { // String eventKey = tableName + ".update"; // log.info("监听数据库更新数据:{}",eventKey); // for (Map.Entry row : updateRowsEventData.getRows()) { // List entries = Arrays.asList(row.getValue()); // handlerData.handlerDataToWd(tableFormat,entries,tableName,"update"); // } // } // } // // insert数据 // else if (data instanceof WriteRowsEventData) { // WriteRowsEventData writeRowsEventData = (WriteRowsEventData) data; // String tableName = tableMap.get(writeRowsEventData.getTableId()); // if (tableName != null && databaseList.contains(tableName)) { // String eventKey = tableName + ".insert"; // log.info("监听数据库插入数据:{}",eventKey); // for (Serializable[] row : writeRowsEventData.getRows()) { // List entries = Arrays.asList(row); // handlerData.handlerDataToWd(tableFormat,entries,tableName,"insert"); // } // } // } // // delete数据 // else if (data instanceof DeleteRowsEventData) { // DeleteRowsEventData deleteRowsEventData = (DeleteRowsEventData) data; // String tableName = tableMap.get(deleteRowsEventData.getTableId()); // if (tableName != null && databaseList.contains(tableName)) { // String eventKey = tableName + ".delete"; // for (Serializable[] row : deleteRowsEventData.getRows()) { // System.out.println("delete======"); // } // } // } // } // })); // client.connect(); // client.disconnect(); // } // //}