孙永军 1 year ago
parent
commit
8677ffbf42

+ 10 - 2
src/main/java/com/cxzx/config/KafkaConfig.java

@@ -13,7 +13,15 @@ import java.util.Properties;
 
 public class KafkaConfig {
 
-    public static Properties getKafkaConsumerProperties() {
+    public static Properties getSXTTKafkaConsumerProperties(){
+        return getKafkaConsumerPropertiesByGroup("sxtt");
+    }
+
+    public static Properties getKafkaConsumerProperties(){
+        return getKafkaConsumerPropertiesByGroup("cxzx");
+    }
+
+    public static Properties getKafkaConsumerPropertiesByGroup(String  group) {
         var userDir = SystemUtils.getUserDir();
         var jksPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/only.4096.client.truststore.jks");
         var jaasPath = FilenameUtils.concat(userDir.getAbsolutePath(), "conf/kafka_client_jaas.conf");
@@ -21,7 +29,7 @@ public class KafkaConfig {
         //根证书store的密码,保持不变。
         //接入协议,目前支持使用SASL_SSL协议接入。
         props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
-        props.put(CommonClientConfigs.GROUP_ID_CONFIG, "cxzx");
+        props.put(CommonClientConfigs.GROUP_ID_CONFIG, group);
         //SASL鉴权方式,保持不变。
         props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
         props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, jksPath);

+ 2 - 2
src/main/java/com/cxzx/sxtt/SyncMain.java

@@ -3,7 +3,7 @@ package com.cxzx.sxtt;
 import cn.hutool.crypto.SecureUtil;
 import cn.hutool.http.HttpRequest;
 import cn.hutool.http.HttpResponse;
-import com.cxzx.config.KafkaSXTTConfig;
+import com.cxzx.config.KafkaConfig;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.json.JsonWriteFeature;
 import com.fasterxml.jackson.databind.MapperFeature;
@@ -30,7 +30,7 @@ public class SyncMain {
 
     public static void main(String[] args) {
 
-        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaSXTTConfig.getKafkaConsumerProperties());
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(KafkaConfig.getSXTTKafkaConsumerProperties());
         ObjectMapper om = new ObjectMapper();
         om.setConfig(om.getSerializationConfig()
                 .with(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY)