zyx 1 year ago
parent
commit
8589ea79a1

+ 1 - 1
src/main/java/com/cxzx/Main.java

@@ -24,7 +24,7 @@ public class Main {
             consumer.subscribe(List.of("article_843ea558d5ef41a1877584c62762632d_app_35","material","article_843ea558d5ef41a1877584c62762632d_app"));
             while (true) {
                 var kafkaDataList = new ArrayList<KafkaData>();
-                for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(2000))) {
+                for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(60000))) {
                     kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp())));
                 }
                 for (Map.Entry<String, List<KafkaData>> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) {

+ 1 - 1
src/main/java/com/cxzx/config/KafkaConfig.java

@@ -37,7 +37,7 @@ public class KafkaConfig {
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
 
         //注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 300);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
         props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300 * 1000);
         return props;
     }