Main.java 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. package com.cxzx;
  2. import cn.hutool.core.collection.CollStreamUtil;
  3. import com.cxzx.config.KafkaConfig;
  4. import com.cxzx.db.KafkaData;
  5. import com.cxzx.db.Tidb;
  6. import lombok.extern.slf4j.Slf4j;
  7. import org.apache.kafka.clients.consumer.ConsumerRecord;
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;
  9. import java.sql.Timestamp;
  10. import java.time.Duration;
  11. import java.util.ArrayList;
  12. import java.util.List;
  13. import java.util.Map;
  14. @Slf4j
  15. public class Main {
  16. public static void main(String[] args) {
  17. var consumer = new KafkaConsumer<String, String>(KafkaConfig.getKafkaConsumerProperties());
  18. try (consumer) {
  19. consumer.subscribe(List.of("article_843ea558d5ef41a1877584c62762632d_app_35","material"));
  20. while (true) {
  21. var kafkaDataList = new ArrayList<KafkaData>();
  22. for (ConsumerRecord<String, String> record : consumer.poll(Duration.ofMillis(2000))) {
  23. kafkaDataList.add(new KafkaData(record.topic(),record.offset(), record.value(), new Timestamp(record.timestamp())));
  24. }
  25. for (Map.Entry<String, List<KafkaData>> entry : CollStreamUtil.groupByKey(kafkaDataList, KafkaData::getTopic).entrySet()) {
  26. Tidb.insert(entry.getValue(),entry.getKey());
  27. }
  28. consumer.commitSync();
  29. }
  30. } catch (Exception e) {
  31. log.error(e.getMessage(), e);
  32. }
  33. }
  34. }