Kafka2Db.java 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package com.smcic.api.operate.job;
  2. import cn.hutool.core.lang.generator.SnowflakeGenerator;
  3. import com.fasterxml.jackson.core.JsonProcessingException;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.smcic.api.operate.entity.VoteInfo;
  6. import com.smcic.api.operate.service.impl.VoteInfoServiceImpl;
  7. import lombok.extern.log4j.Log4j;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.kafka.clients.consumer.KafkaConsumer;
  10. import org.springframework.data.redis.core.RedisTemplate;
  11. import org.springframework.scheduling.annotation.Async;
  12. import org.springframework.stereotype.Component;
  13. import javax.annotation.PostConstruct;
  14. import javax.annotation.Resource;
  15. import java.time.Duration;
  16. import java.util.ArrayList;
  17. import java.util.Collections;
  18. import java.util.List;
  19. import java.util.concurrent.ThreadPoolExecutor;
  20. @Component
  21. @Slf4j
  22. public class Kafka2Db {
  23. @Resource
  24. private KafkaConsumer<String, String> consumer;
  25. @Resource
  26. private VoteInfoServiceImpl voteInfoService;
  27. @Resource
  28. private ObjectMapper objectMapper;
  29. @Resource
  30. private RedisTemplate<String, String> redisTemplate;
  31. private final SnowflakeGenerator snowflakeGenerator = new SnowflakeGenerator(1, 1);
  32. @Async("taskExecutor")
  33. public void run() {
  34. consumer.subscribe(Collections.singleton("operate_vote_queue"));
  35. String lockKey = "OPERATE_KAFKA_LOCK";
  36. long id = snowflakeGenerator.next();
  37. while (true) {
  38. Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, String.valueOf(id), Duration.ofSeconds(3));
  39. if (Boolean.FALSE.equals(success)){
  40. break;
  41. }
  42. List<VoteInfo> voteInfos = new ArrayList<>();
  43. consumer.poll(Duration.ofMillis(300)).forEach((record) -> {
  44. log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp());
  45. VoteInfo voteInfo;
  46. try {
  47. voteInfo = objectMapper.readValue(record.value(), VoteInfo.class);
  48. voteInfos.add(voteInfo);
  49. } catch (JsonProcessingException e) {
  50. log.error("反序列化失败", e);
  51. }
  52. });
  53. if(!voteInfos.isEmpty()){
  54. try {
  55. voteInfoService.saveBatch(voteInfos);
  56. consumer.commitSync();
  57. }catch (Exception e){
  58. log.error("提交失败", e);
  59. }
  60. }
  61. redisTemplate.delete(lockKey);
  62. }
  63. }
  64. }