|
@@ -1,19 +1,17 @@
|
|
package com.cxzx.canal;
|
|
package com.cxzx.canal;
|
|
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
|
-
|
|
|
|
-import com.alibaba.fastjson2.JSONArray;
|
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
|
+import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
import com.alibaba.otter.canal.protocol.CanalEntry;
|
|
|
|
+import com.alibaba.otter.canal.protocol.Message;
|
|
import com.aliyun.openservices.log.common.LogItem;
|
|
import com.aliyun.openservices.log.common.LogItem;
|
|
import com.cxzx.sls.AliyunLogger;
|
|
import com.cxzx.sls.AliyunLogger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.util.Assert;
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
-import com.alibaba.otter.canal.client.kafka.KafkaCanalConnector;
|
|
|
|
-import com.alibaba.otter.canal.protocol.Message;
|
|
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
public class CanalClientMain {
|
|
public class CanalClientMain {
|
|
protected final static Logger logger = LoggerFactory.getLogger(CanalClientMain.class);
|
|
protected final static Logger logger = LoggerFactory.getLogger(CanalClientMain.class);
|
|
@@ -28,18 +26,22 @@ public class CanalClientMain {
|
|
|
|
|
|
private final Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
|
|
private final Thread.UncaughtExceptionHandler handler = (t, e) -> logger.error("parse events has an error", e);
|
|
|
|
|
|
- public CanalClientMain(String servers, String topic, Integer partition, String groupId) {
|
|
|
|
|
|
+ public CanalClientMain(String servers, String topic, Integer partition, String groupId, String logStore) {
|
|
connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
|
|
connector = new KafkaCanalConnector(servers, topic, partition, groupId, null, false);
|
|
- sls = new AliyunLogger("taide-binlog");
|
|
|
|
|
|
+ sls = new AliyunLogger(logStore);
|
|
}
|
|
}
|
|
|
|
|
|
public static void main(String[] args) {
|
|
public static void main(String[] args) {
|
|
|
|
+ // canal taide-binlog # huaqiyun huaqiyun-binlog
|
|
|
|
+ var topic = args[0];
|
|
|
|
+ var logStore = args[1];
|
|
try {
|
|
try {
|
|
final CanalClientMain canalClient = new CanalClientMain(
|
|
final CanalClientMain canalClient = new CanalClientMain(
|
|
"baidu06:9092,baidu07:9092,baidu08:9092,baidu09:9092",
|
|
"baidu06:9092,baidu07:9092,baidu08:9092,baidu09:9092",
|
|
- "canal",
|
|
|
|
|
|
+ topic,
|
|
0,
|
|
0,
|
|
- "zyx-canal");
|
|
|
|
|
|
+ "zyx-canal",
|
|
|
|
+ logStore);
|
|
canalClient.start();
|
|
canalClient.start();
|
|
logger.info("## the canal kafka consumer is running now ......");
|
|
logger.info("## the canal kafka consumer is running now ......");
|
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|