package com.smcic.api.operate.job; import cn.hutool.core.lang.generator.SnowflakeGenerator; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.smcic.api.operate.entity.KeysConst; import com.smcic.api.operate.entity.VoteInfo; import com.smcic.api.operate.service.impl.VoteInfoServiceImpl; import lombok.extern.log4j.Log4j; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ThreadPoolExecutor; @Component @Slf4j public class Kafka2Db { @Resource private KafkaConsumer consumer; @Resource private VoteInfoServiceImpl voteInfoService; @Resource private ObjectMapper objectMapper; @Resource private RedisTemplate redisTemplate; private final SnowflakeGenerator snowflakeGenerator = new SnowflakeGenerator(1, 1); @Async("taskExecutor") public void run() { consumer.subscribe(Collections.singleton(KeysConst.KAFKA_QUEUE)); String lockKey = KeysConst.KAFKA_LOCK; long id = snowflakeGenerator.next(); while (true) { Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, String.valueOf(id), Duration.ofSeconds(20)); if (Boolean.FALSE.equals(success)){ continue; } List voteInfos = new ArrayList<>(); consumer.poll(Duration.ofMillis(300)).forEach((record) -> { log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp()); VoteInfo voteInfo; try { voteInfo = objectMapper.readValue(record.value(), VoteInfo.class); voteInfos.add(voteInfo); } catch (JsonProcessingException e) { log.error("反序列化失败", e); } }); if(!voteInfos.isEmpty()){ try { voteInfoService.saveBatch(voteInfos); consumer.commitSync(); }catch (Exception e){ log.error("提交失败", e); } } redisTemplate.delete(lockKey); } } }