zyx 2 år sedan
förälder
incheckning
cbd98df2f0

+ 1 - 2
src/main/java/com/cxzx/Main.java

@@ -1,7 +1,6 @@
 package com.cxzx;
 
 import cn.hutool.core.collection.CollStreamUtil;
-import cn.hutool.core.collection.CollUtil;
 import com.cxzx.config.KafkaConfig;
 import com.cxzx.db.KafkaData;
 import com.cxzx.db.Tidb;
@@ -19,7 +18,7 @@ import java.util.Map;
 public class Main {
     public static void main(String[] args) {
 
-        var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaProperties());
+        var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaConsumerProperties());
 
         try (consumer) {
             consumer.subscribe(List.of("article","material"));

+ 29 - 1
src/main/java/com/cxzx/config/KafkaConfig.java

@@ -4,14 +4,16 @@ import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
 
 public class KafkaConfig {
 
-    public static Properties getKafkaProperties() {
+    public static Properties getKafkaConsumerProperties() {
         var userDir = SystemUtils.getUserDir();
         var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
         var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
@@ -39,4 +41,30 @@ public class KafkaConfig {
         props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300 * 1000);
         return props;
     }
+
+    public static Properties getKafkaProducerProperties() {
+        var userDir = SystemUtils.getUserDir();
+        var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
+        var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
+        var props = new Properties();
+        //根证书store的密码,保持不变。
+        //接入协议,目前支持使用SASL_SSL协议接入。
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+        props.put(CommonClientConfigs.GROUP_ID_CONFIG, "cxzx");
+        //SASL鉴权方式,保持不变。
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);
+        //根证书store的密码,保持不变。
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+        System.setProperty("java.security.auth.login.config", jaasPath);
+        props.put("bootstrap.servers",
+                "alikafka-pre-cn-tl32wxejg003-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-3.alikafka.aliyuncs.com:9093");
+
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+
+        return props;
+    }
 }

+ 17 - 3
src/main/java/com/cxzx/db/Tidb.java

@@ -4,9 +4,8 @@ import cn.hutool.core.collection.CollUtil;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
+import java.sql.*;
+import java.util.ArrayList;
 import java.util.List;
 
 public class Tidb {
@@ -40,4 +39,19 @@ public class Tidb {
         preparedStatement.executeBatch();
 
     }
+
+    @SneakyThrows
+    public static ArrayList<String> articleData(int limit) {
+        @Cleanup
+        var statement = connection.createStatement();
+        @Cleanup
+        var resultSet = statement.executeQuery("select data from article limit " + limit);
+
+        var result = new ArrayList<String>();
+        while (resultSet.next()) {
+            result.add(resultSet.getString(1));
+        }
+        return result;
+    }
+
 }

+ 20 - 0
src/main/java/com/cxzx/taide/PushTestData.java

@@ -0,0 +1,20 @@
+package com.cxzx.taide;
+
+import com.cxzx.config.KafkaConfig;
+import lombok.Cleanup;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Date;
+
+import static com.cxzx.db.Tidb.articleData;
+
+public class PushTestData {
+    public static void main(String[] args) {
+        @Cleanup
+        var producer = new KafkaProducer<String, String>(KafkaConfig.getKafkaProducerProperties());
+        for (String data : articleData(1000)) {
+            producer.send(new ProducerRecord<>("article_test", 0, new Date().getTime(), "", data));
+        }
+    }
+}