孙永军 1 rok temu
rodzic
commit
c60c0dcaee

+ 69 - 0
src/main/java/com/cxzx/config/KafkaSXTTConfig.java

@@ -0,0 +1,69 @@
+package com.cxzx.config;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+
+public class KafkaSXTTConfig {
+
+    public static Properties getKafkaConsumerProperties() {
+        var userDir = SystemUtils.getUserDir();
+        var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
+        var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
+        var props = new Properties();
+        //根证书store的密码,保持不变。
+        //接入协议,目前支持使用SASL_SSL协议接入。
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+        props.put(CommonClientConfigs.GROUP_ID_CONFIG, "sxtt");
+        //SASL鉴权方式,保持不变。
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);
+        //根证书store的密码,保持不变。
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+        System.setProperty("java.security.auth.login.config", jaasPath);
+        props.put("bootstrap.servers",
+                "alikafka-pre-cn-tl32wxejg003-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-3.alikafka.aliyuncs.com:9093");
+        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 300);
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300 * 1000);
+        return props;
+    }
+
+    public static Properties getKafkaProducerProperties() {
+        var userDir = SystemUtils.getUserDir();
+        var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
+        var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
+        var props = new Properties();
+        //根证书store的密码,保持不变。
+        //接入协议,目前支持使用SASL_SSL协议接入。
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+        //SASL鉴权方式,保持不变。
+        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
+        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);
+        //根证书store的密码,保持不变。
+        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
+        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+        System.setProperty("java.security.auth.login.config", jaasPath);
+        props.put("bootstrap.servers",
+                "alikafka-pre-cn-tl32wxejg003-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-tl32wxejg003-3.alikafka.aliyuncs.com:9093");
+
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+
+        return props;
+    }
+}

+ 2 - 6
src/main/java/com/cxzx/sxtt/SyncMain.java

@@ -1,17 +1,13 @@
 package com.cxzx.sxtt;
 
 import cn.hutool.crypto.SecureUtil;
-import cn.hutool.http.Header;
 import cn.hutool.http.HttpRequest;
 import cn.hutool.http.HttpResponse;
-import cn.hutool.http.HttpUtil;
-import com.cxzx.config.KafkaConfig;
-import com.fasterxml.jackson.core.JsonGenerator;
+import com.cxzx.config.KafkaSXTTConfig;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.json.JsonWriteFeature;
 import com.fasterxml.jackson.databind.MapperFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationConfig;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -34,7 +30,7 @@ public class SyncMain {
 
     public static void main(String[] args) {
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.getKafkaConsumerProperties());
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaSXTTConfig.getKafkaConsumerProperties());
         ObjectMapper om = new ObjectMapper();
         om.setConfig(om.getSerializationConfig()
                 .with(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)