123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- package com.cxzx.canal;
- import com.alibaba.fastjson2.JSONObject;
- import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
- import com.alibaba.otter.canal.protocol.CanalEntry;
- import com.alibaba.otter.canal.protocol.Message;
- import com.aliyun.openservices.log.common.LogItem;
- import com.cxzx.sls.AliyunLogger;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.Assert;
- import java.util.List;
- import java.util.concurrent.TimeUnit;
- public class CanalClientMain {
- protected final static Logger logger = LoggerFactory.getLogger(CanalClientMain.class);
- private final KafkaCanalConnector connector;
- private final AliyunLogger sls;
- private static volatile boolean running = false;
- private Thread thread = null;
- private final Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
- public CanalClientMain(String servers, String topic, Integer partition, String groupId, String logStore) {
- connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
- sls = new AliyunLogger(logStore);
- }
- public static void main(String[] args) {
- // canal taide-binlog # huaqiyun huaqiyun-binlog
- var topic = args[0];
- var logStore = args[1];
- try {
- final CanalClientMain canalClient = new CanalClientMain(
- "baidu06:9092,baidu07:9092,baidu08:9092,baidu09:9092",
- topic,
- 0,
- "zyx-canal",
- logStore);
- canalClient.start();
- logger.info("## the canal kafka consumer is running now ......");
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- try {
- logger.info("## stop the kafka consumer");
- canalClient.stop();
- } catch (Throwable e) {
- logger.warn("##something goes wrong when stopping kafka consumer:", e);
- } finally {
- logger.info("## kafka consumer is down.");
- }
- }));
- while (running)
- Thread.onSpinWait();
- } catch (Throwable e) {
- logger.error("## Something goes wrong when starting up the kafka consumer:", e);
- System.exit(0);
- }
- }
- public void start() {
- Assert.notNull(connector, "connector is null");
- thread = new Thread(this::process);
- thread.setUncaughtExceptionHandler(handler);
- thread.start();
- running = true;
- }
- public void stop() {
- if (!running) {
- return;
- }
- running = false;
- if (thread != null) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
- private void process() {
- while (!running) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- }
- while (running) {
- try {
- connector.connect();
- connector.subscribe();
- while (running) {
- try {
- List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
- if (messages == null) {
- continue;
- }
- for (Message message : messages) {
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- } else {
- // printSummary(message, batchId, size);
- printEntry(message.getEntries());
- // logger.info(message.toString());
- }
- }
- connector.ack(); // 提交确认
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
- }
- connector.unsubscribe();
- connector.disconnect();
- sls.close();
- }
- private void printEntry(List<CanalEntry.Entry> entrys) {
- for (CanalEntry.Entry entry : entrys) {
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
- //开启/关闭事务的实体类型,跳过
- continue;
- }
- //RowChange对象,包含了一行数据变化的所有特征
- //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
- CanalEntry.RowChange rowChage;
- try {
- rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
- }
- //打印Header信息
- // System.out.printf("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s%n\n",
- // entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- // entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- // eventType);
- //判断是否是DDL语句
- if (rowChage.getIsDdl()) {
- continue;
- }
- var tableName = entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
- var updateTime = entry.getHeader().getExecuteTime() / 1000;
- //获取操作类型:insert/update/delete类型
- CanalEntry.EventType eventType = rowChage.getEventType();
- //获取RowChange对象里的每一行数据,打印出来
- for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
- var logItem = new LogItem();
- logItem.SetTime((int) updateTime);
- logItem.PushBack("table_name", tableName);
- //如果是删除语句
- if (eventType == CanalEntry.EventType.DELETE) {
- logItem.PushBack("event", "delete");
- setColumn(rowData.getBeforeColumnsList(), logItem);
- //如果是新增语句
- } else if (eventType == CanalEntry.EventType.INSERT) {
- logItem.PushBack("event", "insert");
- setColumn(rowData.getAfterColumnsList(), logItem);
- //如果是更新的语句
- } else if (eventType == CanalEntry.EventType.UPDATE) {
- logItem.PushBack("event", "update");
- setColumnUpdate(rowData, logItem);
- } else {
- continue;
- }
- sls.info(logItem);
- }
- }
- }
- private void setColumn(List<CanalEntry.Column> columns, LogItem logItem) {
- var data = new JSONObject();
- for (CanalEntry.Column column : columns) {
- data.put(column.getName(), column.getValue());
- }
- logItem.PushBack("data", data.toString());
- }
- private void setColumnUpdate(CanalEntry.RowData rowData, LogItem logItem) {
- var root = new JSONObject();
- var oldData = new JSONObject();
- for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
- oldData.put(column.getName(), column.getValue());
- }
- var newData = new JSONObject();
- for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
- if (column.getUpdated()) {
- newData.put(column.getName(), column.getValue());
- }
- }
- root.put("old", oldData);
- root.put("new", newData);
- logItem.PushBack("data", root.toString());
- }
- }
|