|
@@ -27,11 +27,9 @@ public class Main {
|
|
|
var kafkaDataList = new ArrayList<KafkaData>();
|
|
|
for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(2000))) {
|
|
|
kafkaDataList.add(new KafkaData(record.offset(), record.value(), new Timestamp(record.timestamp())));
|
|
|
- var offsetMap = Map.of(new TopicPartition(record.topic(), record.partition()),
|
|
|
- new OffsetAndMetadata(record.offset() + 1));
|
|
|
- consumer.commitSync(offsetMap);
|
|
|
}
|
|
|
Tidb.insert(kafkaDataList);
|
|
|
+ consumer.commitSync();
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
log.error(e.getMessage(), e);
|