|
@@ -48,8 +48,8 @@ public class Main {
|
|
|
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
|
|
|
|
|
|
//注意该值不要改得太大,如果Poll太多数据,而不能在下次Poll之前消费完,则会触发一次负载均衡,产生卡顿。
|
|
|
- props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 300);
|
|
|
- props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
|
|
|
+ props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
|
|
|
+ props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600 * 1000);
|
|
|
|
|
|
|
|
|
var consumer = new KafkaConsumer<String, String>(props);
|
|
@@ -58,7 +58,7 @@ public class Main {
|
|
|
consumer.subscribe(List.of("article"));
|
|
|
while (true) {
|
|
|
var kafkaDataList = new ArrayList<KafkaData>();
|
|
|
- for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(20 * 1000))) {
|
|
|
+ for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(2000))) {
|
|
|
kafkaDataList.add(new KafkaData(record.offset(), record.value(), new Timestamp(record.timestamp())));
|
|
|
var offsetMap = Map.of(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
|
|
|
consumer.commitSync(offsetMap);
|