package com.cxzx; import cn.hutool.core.collection.CollStreamUtil; import com.cxzx.config.KafkaConfig; import com.cxzx.db.KafkaData; import com.cxzx.db.Tidb; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.sql.Timestamp; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @Slf4j public class Main { public static void main(String[] args) { var consumer = new KafkaConsumer(KafkaConfig.getKafkaConsumerProperties()); try (consumer) { consumer.subscribe(List.of("article_843ea558d5ef41a1877584c62762632d_app_35","material", "article_843ea558d5ef41a1877584c62762632d_app","article_843ea558d5ef41a1877584c62762632d_app_84")); while (true) { var kafkaDataList = new ArrayList(); for (ConsumerRecord record : consumer.poll(Duration.ofMillis(60000))) { kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp()))); } for (Map.Entry> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) { Tidb.insert(entry.getValue(),entry.getKey()); } consumer.commitSync(); } } catch (Exception e) { log.error(e.getMessage(), e); } } }