zyx 2 lat temu
rodzic
commit
f2c0dc3c22

+ 11 - 1
build.gradle

@@ -13,7 +13,7 @@ dependencies {
     implementation 'commons-io:commons-io:2.11.0'
     implementation 'org.apache.commons:commons-lang3:3.12.0'
     implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.0'
-    implementation 'cn.hutool:hutool-all:5.8.9'
+    implementation 'cn.hutool:hutool-all:5.8.11'
 
     implementation 'ch.qos.logback:logback-classic:1.2.11'
     implementation 'org.slf4j:slf4j-api:1.7.36'
@@ -23,6 +23,16 @@ dependencies {
 
 
     implementation 'com.mysql:mysql-connector-j:8.0.31'
+    implementation('com.alibaba.otter:canal.client:1.1.6') {
+        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    }
+
+
+    implementation('com.alibaba.otter:canal.protocol:1.1.6') {
+        exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    }
+    implementation 'com.aliyun.openservices:aliyun-log:0.6.77:jar-with-dependencies'
+    implementation 'com.aliyun.openservices:aliyun-log-producer:0.3.10'
 
     testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
     testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'

+ 218 - 0
src/main/java/com/cxzx/canal/CanalClientMain.java

@@ -0,0 +1,218 @@
+package com.cxzx.canal;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.alibaba.fastjson2.JSONArray;
+import com.alibaba.fastjson2.JSONObject;
+import com.alibaba.otter.canal.protocol.CanalEntry;
+import com.aliyun.openservices.log.common.LogItem;
+import com.cxzx.sls.AliyunLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
+
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
+import com.alibaba.otter.canal.protocol.Message;
+
+public class CanalClientMain {
+    protected final static Logger logger = LoggerFactory.getLogger(CanalClientMain.class);
+
+    private final KafkaCanalConnector connector;
+
+    private final AliyunLogger sls;
+
+    private static volatile boolean running = false;
+
+    private Thread thread = null;
+
+    private final Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
+
+    public CanalClientMain(String servers, String topic, Integer partition, String groupId) {
+        connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
+        sls = new AliyunLogger("taide-binlog");
+    }
+
+    public static void main(String[] args) {
+        try {
+            final CanalClientMain canalClient = new CanalClientMain(
+                    "baidu06:9092,baidu07:9092,baidu08:9092,baidu09:9092",
+                    "canal",
+                    0,
+                    "zyx-canal");
+            canalClient.start();
+            logger.info("## the canal kafka consumer is running now ......");
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                try {
+                    logger.info("## stop the kafka consumer");
+                    canalClient.stop();
+                } catch (Throwable e) {
+                    logger.warn("##something goes wrong when stopping kafka consumer:", e);
+                } finally {
+                    logger.info("## kafka consumer is down.");
+                }
+            }));
+            while (running)
+                Thread.onSpinWait();
+        } catch (Throwable e) {
+            logger.error("## Something goes wrong when starting up the kafka consumer:", e);
+            System.exit(0);
+        }
+    }
+
+    public void start() {
+        Assert.notNull(connector, "connector is null");
+        thread = new Thread(this::process);
+        thread.setUncaughtExceptionHandler(handler);
+        thread.start();
+        running = true;
+    }
+
+    public void stop() {
+        if (!running) {
+            return;
+        }
+        running = false;
+        if (thread != null) {
+            try {
+                thread.join();
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
+    private void process() {
+        while (!running) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignored) {
+            }
+        }
+
+        while (running) {
+            try {
+                connector.connect();
+                connector.subscribe();
+                while (running) {
+                    try {
+                        List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
+                        if (messages == null) {
+                            continue;
+                        }
+                        for (Message message : messages) {
+                            long batchId = message.getId();
+                            int size = message.getEntries().size();
+                            if (batchId == -1 || size == 0) {
+                                try {
+                                    Thread.sleep(1000);
+                                } catch (InterruptedException ignored) {
+                                }
+                            } else {
+                                // printSummary(message, batchId, size);
+                                printEntry(message.getEntries());
+//                                logger.info(message.toString());
+                            }
+                        }
+
+                        connector.ack(); // 提交确认
+                    } catch (Exception e) {
+                        logger.error(e.getMessage(), e);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
+        connector.unsubscribe();
+        connector.disconnect();
+        sls.close();
+    }
+
+    private void printEntry(List<CanalEntry.Entry> entrys) {
+        for (CanalEntry.Entry entry : entrys) {
+            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
+                //开启/关闭事务的实体类型,跳过
+                continue;
+            }
+            //RowChange对象,包含了一行数据变化的所有特征
+            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
+            CanalEntry.RowChange rowChage;
+            try {
+                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
+            } catch (Exception e) {
+                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
+            }
+
+            //打印Header信息
+//            System.out.printf("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s%n\n",
+//                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
+//                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
+//                    eventType);
+            //判断是否是DDL语句
+            if (rowChage.getIsDdl()) {
+                continue;
+            }
+            var tableName = entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
+            //获取操作类型:insert/update/delete类型
+            CanalEntry.EventType eventType = rowChage.getEventType();
+
+
+            //获取RowChange对象里的每一行数据,打印出来
+            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
+                var logItem = new LogItem();
+                logItem.PushBack("table_name", tableName);
+                //如果是删除语句
+                if (eventType == CanalEntry.EventType.DELETE) {
+                    logItem.PushBack("event", "delete");
+                    setColumn(rowData.getBeforeColumnsList(), logItem);
+                    //如果是新增语句
+                } else if (eventType == CanalEntry.EventType.INSERT) {
+                    logItem.PushBack("event", "insert");
+                    setColumn(rowData.getAfterColumnsList(), logItem);
+                    //如果是更新的语句
+                } else if (eventType == CanalEntry.EventType.UPDATE) {
+                    logItem.PushBack("event", "update");
+                    setColumnUpdate(rowData, logItem);
+                } else {
+                    continue;
+                }
+                sls.info(logItem);
+            }
+        }
+    }
+
+    private void setColumn(List<CanalEntry.Column> columns, LogItem logItem) {
+        var data = new JSONObject();
+        for (CanalEntry.Column column : columns) {
+            data.put(column.getName(), column.getValue());
+        }
+        logItem.PushBack("data", data.toString());
+    }
+
+    private void setColumnUpdate(CanalEntry.RowData rowData, LogItem logItem) {
+        var root = new JSONObject();
+
+        var oldData = new JSONObject();
+        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
+            oldData.put(column.getName(), column.getValue());
+        }
+
+        var newData = new JSONObject();
+        var changeColumn = new JSONArray();
+        for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
+            newData.put(column.getName(), column.getValue());
+            if (column.getUpdated()) {
+                changeColumn.add(column.getName());
+            }
+        }
+
+        root.put("old", oldData);
+        root.put("new", newData);
+        root.put("change", changeColumn);
+
+
+        logItem.PushBack("data", root.toString());
+    }
+}

+ 54 - 0
src/main/java/com/cxzx/sls/AliyunLogger.java

@@ -0,0 +1,54 @@
+package com.cxzx.sls;
+
+import com.aliyun.openservices.aliyun.log.producer.LogProducer;
+import com.aliyun.openservices.aliyun.log.producer.Producer;
+import com.aliyun.openservices.aliyun.log.producer.ProducerConfig;
+import com.aliyun.openservices.aliyun.log.producer.ProjectConfig;
+import com.aliyun.openservices.aliyun.log.producer.errors.ProducerException;
+import com.aliyun.openservices.log.common.LogItem;
+
+public class AliyunLogger {
+    private final Producer producer;
+    private final String logStore;
+    private final String project = "k8s-log-c4d23a31b8c6c4ed49e7fc1473dfd0c57";
+
+    public AliyunLogger(String logStore) {
+
+        this.logStore = logStore;
+
+        ProducerConfig producerConfig = new ProducerConfig();
+        producerConfig.setBatchSizeThresholdInBytes(3 * 1024 * 1024);
+        producerConfig.setBatchCountThreshold(40960);
+
+        producer = new LogProducer(producerConfig);
+        producer.putProjectConfig(new ProjectConfig(project,
+                "cn-chengdu.log.aliyuncs.com",
+                "LTAI5tCb5r8efPipWaKSeCkE", "BsKtHRePUFBrfF9aA8b8sDTrox23cU"));
+
+        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+    }
+
+    public void info(LogItem item) {
+        try {
+            item.PushBack("level", "info");
+            producer.send(project, logStore, item);
+        } catch (InterruptedException | ProducerException ignored) {
+        }
+    }
+
+    public void error(LogItem item) {
+        try {
+            item.PushBack("level", "error");
+            producer.send(project, logStore, item);
+        } catch (InterruptedException | ProducerException ignored) {
+        }
+    }
+
+    public void close() {
+        try {
+            producer.close();
+        } catch (InterruptedException | ProducerException ignored) {
+        }
+    }
+
+}

+ 1 - 1
src/main/resources/logback.xml

@@ -9,7 +9,7 @@
 
 
 
-    <root level="warn">
+    <root level="info">
         <appender-ref ref="CONSOLE-LOG"/>
     </root>
 </configuration>