Kafka2Db.java 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package com.smcic.api.operate.job;
  2. import cn.hutool.core.lang.generator.SnowflakeGenerator;
  3. import com.fasterxml.jackson.core.JsonProcessingException;
  4. import com.fasterxml.jackson.databind.ObjectMapper;
  5. import com.smcic.api.operate.entity.KeysConst;
  6. import com.smcic.api.operate.entity.VoteInfo;
  7. import com.smcic.api.operate.service.impl.VoteInfoServiceImpl;
  8. import lombok.extern.log4j.Log4j;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.kafka.clients.consumer.KafkaConsumer;
  11. import org.springframework.data.redis.core.RedisTemplate;
  12. import org.springframework.scheduling.annotation.Async;
  13. import org.springframework.stereotype.Component;
  14. import javax.annotation.PostConstruct;
  15. import javax.annotation.Resource;
  16. import java.time.Duration;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.List;
  20. import java.util.concurrent.ThreadPoolExecutor;
  21. @Component
  22. @Slf4j
  23. public class Kafka2Db {
  24. @Resource
  25. private KafkaConsumer<String, String> consumer;
  26. @Resource
  27. private VoteInfoServiceImpl voteInfoService;
  28. @Resource
  29. private ObjectMapper objectMapper;
  30. @Resource
  31. private RedisTemplate<String, String> redisTemplate;
  32. private final SnowflakeGenerator snowflakeGenerator = new SnowflakeGenerator(1, 1);
  33. @Async("taskExecutor")
  34. public void run() {
  35. consumer.subscribe(Collections.singleton(KeysConst.KAFKA_QUEUE));
  36. String lockKey = KeysConst.KAFKA_LOCK;
  37. long id = snowflakeGenerator.next();
  38. while (true) {
  39. Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, String.valueOf(id), Duration.ofSeconds(20));
  40. if (Boolean.FALSE.equals(success)){
  41. continue;
  42. }
  43. List<VoteInfo> voteInfos = new ArrayList<>();
  44. consumer.poll(Duration.ofMillis(300)).forEach((record) -> {
  45. log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp());
  46. VoteInfo voteInfo;
  47. try {
  48. voteInfo = objectMapper.readValue(record.value(), VoteInfo.class);
  49. voteInfos.add(voteInfo);
  50. } catch (JsonProcessingException e) {
  51. log.error("反序列化失败", e);
  52. }
  53. });
  54. if(!voteInfos.isEmpty()){
  55. try {
  56. voteInfoService.saveBatch(voteInfos);
  57. consumer.commitSync();
  58. }catch (Exception e){
  59. log.error("提交失败", e);
  60. }
  61. }
  62. redisTemplate.delete(lockKey);
  63. }
  64. }
  65. }