Main.java 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. package com.cxzx;
  2. import cn.hutool.core.collection.CollStreamUtil;
  3. import com.cxzx.config.KafkaConfig;
  4. import com.cxzx.db.KafkaData;
  5. import com.cxzx.db.Tidb;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.kafka.clients.consumer.ConsumerRecord;
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;
  9. import java.sql.Timestamp;
  10. import java.time.Duration;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Map;
  14. @Slf4j
  15. public class Main {
  16. public static void main(String[] args) {
  17. var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaConsumerProperties());
  18. try (consumer) {
  19. consumer.subscribe(List.of("article_843ea558d5ef41a1877584c62762632d_app_35","material",
  20. "article_843ea558d5ef41a1877584c62762632d_app","article_843ea558d5ef41a1877584c62762632d_app_84"));
  21. while (true) {
  22. var kafkaDataList = new ArrayList<KafkaData>();
  23. for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(60000))) {
  24. kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp())));
  25. }
  26. for (Map.Entry<String, List<KafkaData>> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) {
  27. Tidb.insert(entry.getValue(),entry.getKey());
  28. }
  29. consumer.commitSync();
  30. }
  31. } catch (Exception e) {
  32. log.error(e.getMessage(), e);
  33. }
  34. }
  35. }