zyx vor 2 Jahren
Ursprung
Commit
65234dfee8
2 geänderte Dateien mit 23 neuen und 9 gelöschten Zeilen
  1. 11 7
      src/main/java/com/cxzx/db/Tidb.java
  2. 12 2
      src/main/java/com/cxzx/taide/PushTestData.java

+ 11 - 7
src/main/java/com/cxzx/db/Tidb.java

@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 
-import java.sql.*;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,15 +43,17 @@ public class Tidb {
     }
 
     @SneakyThrows
-    public static ArrayList<String> articleData(int limit) {
+    public static List<KafkaData> articleData(long offset, int limit) {
         @Cleanup
-        var statement = connection.createStatement();
+        var statement = connection.prepareStatement("select offset,data from article where offset > ? order by offset limit ?");
+        statement.setLong(1, offset);
+        statement.setInt(2, limit);
         @Cleanup
-        var resultSet = statement.executeQuery("select data from article limit " + limit);
-
-        var result = new ArrayList<String>();
+        var resultSet = statement.executeQuery();
+        var result = new ArrayList<KafkaData>();
         while (resultSet.next()) {
-            result.add(resultSet.getString(1));
+            var kafkaData = new KafkaData(null, resultSet.getLong(1), resultSet.getString(2), null);
+            result.add(kafkaData);
         }
         return result;
     }

+ 12 - 2
src/main/java/com/cxzx/taide/PushTestData.java

@@ -1,6 +1,7 @@
 package com.cxzx.taide;
 
 import com.cxzx.config.KafkaConfig;
+import com.cxzx.db.KafkaData;
 import lombok.Cleanup;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -13,8 +14,17 @@ public class PushTestData {
     public static void main(String[] args) {
         @Cleanup
         var producer = new KafkaProducer<String, String>(KafkaConfig.getKafkaProducerProperties());
-        for (String data : articleData(1000)) {
-            producer.send(new ProducerRecord<>("article_test", 0, new Date().getTime(), "", data));
+        long offset = 0;
+        while (true) {
+            var articleDataList = articleData(offset, 1000);
+            if (articleDataList.size() == 0) {
+                break;
+            }
+            for (KafkaData data : articleDataList) {
+                producer.send(new ProducerRecord<>("article_test", 0, new Date().getTime(), "", data.getData()));
+                offset = data.getOffset();
+            }
         }
+
     }
 }