zyx 2 лет назад
Родитель
Сommit
c2be9eaf97
2 измененных файлов с 46 добавлено и 37 удалено
  1. 4 37
      src/main/java/com/cxzx/Main.java
  2. 42 0
      src/main/java/com/cxzx/config/KafkaConfig.java

+ 4 - 37
src/main/java/com/cxzx/Main.java

@@ -1,58 +1,25 @@
 package com.cxzx;
 
+import com.cxzx.config.KafkaConfig;
+import com.cxzx.db.KafkaData;
 import com.cxzx.db.Tidb;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang3.SystemUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import com.cxzx.db.KafkaData;
 
 import java.sql.Timestamp;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 
 @Slf4j
 public class Main {
     public static void main(String[] args) {
 
-        var userDir = SystemUtils.getUserDir();
-        var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
-        var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
-        var props = new Properties();
-        //根证书store的密码,保持不变。
-        //接入协议,目前支持使用SASL_SSL协议接入。
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
-        props.put(CommonClientConfigs.GROUP_ID_CONFIG, "cxzx");
-        //SASL鉴权方式,保持不变。
-        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
-        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);
-        //根证书store的密码,保持不变。
-        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
-        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
-        System.setProperty("java.security.auth.login.config", jaasPath);
-        props.put("bootstrap.servers",
-                "alikafka-pre-cn-tl32wxejg003-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-3.alikafka.aliyuncs.com:9093");
-        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
-        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
-        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600 * 1000);
-
-
-        var consumer = new KafkaConsumer<String, String>(props);
+        var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaProperties());
 
         try (consumer) {
             consumer.subscribe(List.of("article"));
@@ -69,6 +36,6 @@ public class Main {
             log.error(e.getMessage(), e);
         }
 
-
     }
+
 }

+ 42 - 0
src/main/java/com/cxzx/config/KafkaConfig.java

@@ -0,0 +1,42 @@
+package com.cxzx.config;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+
+import java.util.Properties;
+
+public class KafkaConfig {
+
+    public static Properties getKafkaProperties() {
+        var userDir = SystemUtils.getUserDir();
+        var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
+        var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
+        var props = new Properties();
+        //根证书store的密码,保持不变。
+        //接入协议,目前支持使用SASL_SSL协议接入。
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+        props.put(CommonClientConfigs.GROUP_ID_CONFIG, "cxzx");
+        //SASL鉴权方式,保持不变。
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);
+        //根证书store的密码,保持不变。
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+        System.setProperty("java.security.auth.login.config", jaasPath);
+        props.put("bootstrap.servers",
+                "alikafka-pre-cn-tl32wxejg003-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-3.alikafka.aliyuncs.com:9093");
+        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 300);
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
+        return props;
+    }
+}