CanalClientMain.java 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. package com.cxzx.canal;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
  4. import com.alibaba.otter.canal.protocol.CanalEntry;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import com.aliyun.openservices.log.common.LogItem;
  7. import com.cxzx.sls.AliyunLogger;
  8. import org.slf4j.Logger;
  9. import org.slf4j.LoggerFactory;
  10. import org.springframework.util.Assert;
  11. import java.util.List;
  12. import java.util.concurrent.TimeUnit;
  13. public class CanalClientMain {
  14. protected final static Logger logger = LoggerFactory.getLogger(CanalClientMain.class);
  15. private final KafkaCanalConnector connector;
  16. private final AliyunLogger sls;
  17. private static volatile boolean running = false;
  18. private Thread thread = null;
  19. private final Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
  20. public CanalClientMain(String servers, String topic, Integer partition, String groupId, String logStore) {
  21. connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
  22. sls = new AliyunLogger(logStore);
  23. }
  24. public static void main(String[] args) {
  25. // canal taide-binlog # huaqiyun huaqiyun-binlog
  26. var topic = args[0];
  27. var logStore = args[1];
  28. try {
  29. final CanalClientMain canalClient = new CanalClientMain(
  30. "baidu06:9092,baidu07:9092,baidu08:9092,baidu09:9092",
  31. topic,
  32. 0,
  33. "zyx-canal",
  34. logStore);
  35. canalClient.start();
  36. logger.info("## the canal kafka consumer is running now ......");
  37. Runtime.getRuntime().addShutdownHook(new Thread(() -> {
  38. try {
  39. logger.info("## stop the kafka consumer");
  40. canalClient.stop();
  41. } catch (Throwable e) {
  42. logger.warn("##something goes wrong when stopping kafka consumer:", e);
  43. } finally {
  44. logger.info("## kafka consumer is down.");
  45. }
  46. }));
  47. while (running)
  48. Thread.onSpinWait();
  49. } catch (Throwable e) {
  50. logger.error("## Something goes wrong when starting up the kafka consumer:", e);
  51. System.exit(0);
  52. }
  53. }
  54. public void start() {
  55. Assert.notNull(connector, "connector is null");
  56. thread = new Thread(this::process);
  57. thread.setUncaughtExceptionHandler(handler);
  58. thread.start();
  59. running = true;
  60. }
  61. public void stop() {
  62. if (!running) {
  63. return;
  64. }
  65. running = false;
  66. if (thread != null) {
  67. try {
  68. thread.join();
  69. } catch (InterruptedException e) {
  70. // ignore
  71. }
  72. }
  73. }
  74. private void process() {
  75. while (!running) {
  76. try {
  77. Thread.sleep(1000);
  78. } catch (InterruptedException ignored) {
  79. }
  80. }
  81. while (running) {
  82. try {
  83. connector.connect();
  84. connector.subscribe();
  85. while (running) {
  86. try {
  87. List<Message> messages = connector.getListWithoutAck(100L, TimeUnit.MILLISECONDS); // 获取message
  88. if (messages == null) {
  89. continue;
  90. }
  91. for (Message message : messages) {
  92. long batchId = message.getId();
  93. int size = message.getEntries().size();
  94. if (batchId == -1 || size == 0) {
  95. try {
  96. Thread.sleep(1000);
  97. } catch (InterruptedException ignored) {
  98. }
  99. } else {
  100. // printSummary(message, batchId, size);
  101. printEntry(message.getEntries());
  102. // logger.info(message.toString());
  103. }
  104. }
  105. connector.ack(); // 提交确认
  106. } catch (Exception e) {
  107. logger.error(e.getMessage(), e);
  108. }
  109. }
  110. } catch (Exception e) {
  111. logger.error(e.getMessage(), e);
  112. }
  113. }
  114. connector.unsubscribe();
  115. connector.disconnect();
  116. sls.close();
  117. }
  118. private void printEntry(List<CanalEntry.Entry> entrys) {
  119. for (CanalEntry.Entry entry : entrys) {
  120. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  121. //开启/关闭事务的实体类型,跳过
  122. continue;
  123. }
  124. //RowChange对象,包含了一行数据变化的所有特征
  125. //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
  126. CanalEntry.RowChange rowChage;
  127. try {
  128. rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  129. } catch (Exception e) {
  130. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry, e);
  131. }
  132. //打印Header信息
  133. // System.out.printf("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s%n\n",
  134. // entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  135. // entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  136. // eventType);
  137. //判断是否是DDL语句
  138. if (rowChage.getIsDdl()) {
  139. continue;
  140. }
  141. var tableName = entry.getHeader().getSchemaName() + "." + entry.getHeader().getTableName();
  142. var updateTime = entry.getHeader().getExecuteTime() / 1000;
  143. //获取操作类型:insert/update/delete类型
  144. CanalEntry.EventType eventType = rowChage.getEventType();
  145. //获取RowChange对象里的每一行数据,打印出来
  146. for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
  147. var logItem = new LogItem();
  148. logItem.SetTime((int) updateTime);
  149. logItem.PushBack("table_name", tableName);
  150. //如果是删除语句
  151. if (eventType == CanalEntry.EventType.DELETE) {
  152. logItem.PushBack("event", "delete");
  153. setColumn(rowData.getBeforeColumnsList(), logItem);
  154. //如果是新增语句
  155. } else if (eventType == CanalEntry.EventType.INSERT) {
  156. logItem.PushBack("event", "insert");
  157. setColumn(rowData.getAfterColumnsList(), logItem);
  158. //如果是更新的语句
  159. } else if (eventType == CanalEntry.EventType.UPDATE) {
  160. logItem.PushBack("event", "update");
  161. setColumnUpdate(rowData, logItem);
  162. } else {
  163. continue;
  164. }
  165. sls.info(logItem);
  166. }
  167. }
  168. }
  169. private void setColumn(List<CanalEntry.Column> columns, LogItem logItem) {
  170. var data = new JSONObject();
  171. for (CanalEntry.Column column : columns) {
  172. data.put(column.getName(), column.getValue());
  173. }
  174. logItem.PushBack("data", data.toString());
  175. }
  176. private void setColumnUpdate(CanalEntry.RowData rowData, LogItem logItem) {
  177. var root = new JSONObject();
  178. var oldData = new JSONObject();
  179. for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
  180. oldData.put(column.getName(), column.getValue());
  181. }
  182. var newData = new JSONObject();
  183. for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
  184. if (column.getUpdated()) {
  185. newData.put(column.getName(), column.getValue());
  186. }
  187. }
  188. root.put("old", oldData);
  189. root.put("new", newData);
  190. logItem.PushBack("data", root.toString());
  191. }
  192. }