123456789101112131415161718192021222324252627282930313233343536373839404142 |
- package com.cxzx;
- import cn.hutool.core.collection.CollStreamUtil;
- import com.cxzx.config.KafkaConfig;
- import com.cxzx.db.KafkaData;
- import com.cxzx.db.Tidb;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.sql.Timestamp;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Map;
- @Slf4j
- public class Main {
- public static void main(String[] args) {
- var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaConsumerProperties());
- try (consumer) {
- consumer.subscribe(List.of("article_843ea558d5ef41a1877584c62762632d_app_35","material",
- "article_843ea558d5ef41a1877584c62762632d_app","article_843ea558d5ef41a1877584c62762632d_app_84"));
- while (true) {
- var kafkaDataList = new ArrayList<KafkaData>();
- for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(60000))) {
- kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp())));
- }
- for (Map.Entry<String, List<KafkaData>> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) {
- Tidb.insert(entry.getValue(),entry.getKey());
- }
- consumer.commitSync();
- }
- } catch (Exception e) {
- log.error(e.getMessage(), e);
- }
- }
- }
|