zyx 2 жил өмнө
parent
commit
5c4d54a5a7

+ 1 - 1
apply.sh

@@ -1,5 +1,5 @@
 
-version="1.9"
+version="1.10"
 docker build -t registry.cn-chengdu.aliyuncs.com/cxzx-spider/ali-kafka:$version .
 docker push registry.cn-chengdu.aliyuncs.com/cxzx-spider/ali-kafka:$version
 sed -i "s!{ { image } }!registry\.cn-chengdu\.aliyuncs\.com/cxzx-spider/ali-kafka:$version!g" *.yml

+ 8 - 3
src/main/java/com/cxzx/Main.java

@@ -1,5 +1,7 @@
 package com.cxzx;
 
+import cn.hutool.core.collection.CollStreamUtil;
+import cn.hutool.core.collection.CollUtil;
 import com.cxzx.config.KafkaConfig;
 import com.cxzx.db.KafkaData;
 import com.cxzx.db.Tidb;
@@ -22,13 +24,16 @@ public class Main {
         var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaProperties());
 
         try (consumer) {
-            consumer.subscribe(List.of("article"));
+            consumer.subscribe(List.of("article","media_center_test"));
             while (true) {
                 var kafkaDataList = new ArrayList<KafkaData>();
                 for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(2000))) {
-                    kafkaDataList.add(new KafkaData(record.offset(), record.value(), new Timestamp(record.timestamp())));
+                    kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp())));
                 }
-                Tidb.insert(kafkaDataList);
+                for (Map.Entry<String, List<KafkaData>> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) {
+                    Tidb.insert(entry.getValue(),entry.getKey());
+                }
+
                 consumer.commitSync();
             }
         } catch (Exception e) {

+ 1 - 0
src/main/java/com/cxzx/db/KafkaData.java

@@ -8,6 +8,7 @@ import java.sql.Timestamp;
 @Data
 @AllArgsConstructor
 public class KafkaData {
+    private String topic;
     private Long offset;
     private String data;
     private Timestamp createTime;

+ 3 - 3
src/main/java/com/cxzx/db/Tidb.java

@@ -11,7 +11,7 @@ import java.util.List;
 
 public class Tidb {
 
-    private static final Connection connection ;
+    private static final Connection connection;
 
     static {
         try {
@@ -25,12 +25,12 @@ public class Tidb {
     }
 
     @SneakyThrows
-    public static void insert(List<KafkaData> kafkaDataList) {
+    public static void insert(List<KafkaData> kafkaDataList, String topic) {
         if (CollUtil.isEmpty(kafkaDataList)) {
             return;
         }
         @Cleanup
-        var preparedStatement = connection.prepareStatement("insert ignore into article (offset,create_time,data) values (?,?,?)");
+        var preparedStatement = connection.prepareStatement("insert ignore into " + topic + " (offset,create_time,data) values (?,?,?)");
         for (KafkaData kafkaData : kafkaDataList) {
             preparedStatement.setLong(1, kafkaData.getOffset());
             preparedStatement.setTimestamp(2, kafkaData.getCreateTime());