Bladeren bron

Merge remote-tracking branch 'origin/master'

孙永军 1 jaar geleden
bovenliggende
commit
35f89414eb
2 gewijzigde bestanden met toevoegingen van 5 en 3 verwijderingen
  1. 4 2
      src/main/java/com/cxzx/Main.java
  2. 1 1
      src/main/java/com/cxzx/config/KafkaConfig.java

+ 4 - 2
src/main/java/com/cxzx/Main.java

@@ -21,10 +21,12 @@ public class Main {
         var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaConsumerProperties());
 
         try (consumer) {
-            consumer.subscribe(List.of("article","material"));
+            consumer.subscribe(List.of("article_843ea558d5ef41a1877584c62762632d_app_35","material",
+                    "article_843ea558d5ef41a1877584c62762632d_app","article_843ea558d5ef41a1877584c62762632d_app_84",
+                    "test_article_843ea558d5ef41a1877584c62762632d_app_79"));
             while (true) {
                 var kafkaDataList = new ArrayList<KafkaData>();
-                for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(2000))) {
+                for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(60000))) {
                     kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp())));
                 }
                 for (Map.Entry<String, List<KafkaData>> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) {

+ 1 - 1
src/main/java/com/cxzx/config/KafkaConfig.java

@@ -37,7 +37,7 @@ public class KafkaConfig {
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 300);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
         props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300 * 1000);
         return props;
     }