12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- package com.smcic.api.operate.job;
- import cn.hutool.core.lang.generator.SnowflakeGenerator;
- import com.fasterxml.jackson.core.JsonProcessingException;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.smcic.api.operate.entity.KeysConst;
- import com.smcic.api.operate.entity.VoteInfo;
- import com.smcic.api.operate.service.impl.VoteInfoServiceImpl;
- import lombok.extern.log4j.Log4j;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.time.Duration;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- import java.util.concurrent.ThreadPoolExecutor;
- @Component
- @Slf4j
- public class Kafka2Db {
- @Resource
- private KafkaConsumer<String, String> consumer;
- @Resource
- private VoteInfoServiceImpl voteInfoService;
- @Resource
- private ObjectMapper objectMapper;
- @Resource
- private RedisTemplate<String, String> redisTemplate;
- private final SnowflakeGenerator snowflakeGenerator = new SnowflakeGenerator(1, 1);
- @Async("taskExecutor")
- public void run() {
- consumer.subscribe(Collections.singleton(KeysConst.KAFKA_QUEUE));
- String lockKey = KeysConst.KAFKA_LOCK;
- long id = snowflakeGenerator.next();
- while (true) {
- Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, String.valueOf(id), Duration.ofSeconds(20));
- if (Boolean.FALSE.equals(success)){
- continue;
- }
- List<VoteInfo> voteInfos = new ArrayList<>();
- consumer.poll(Duration.ofMillis(300)).forEach((record) -> {
- log.info("读取数据 topic:{}, offset: {}, data: {}, 时间: {}", record.topic(),record.offset(),record.value(), record.timestamp());
- VoteInfo voteInfo;
- try {
- voteInfo = objectMapper.readValue(record.value(), VoteInfo.class);
- voteInfos.add(voteInfo);
- } catch (JsonProcessingException e) {
- log.error("反序列化失败", e);
- }
- });
- if(!voteInfos.isEmpty()){
- try {
- voteInfoService.saveBatch(voteInfos);
- consumer.commitSync();
- }catch (Exception e){
- log.error("提交失败", e);
- }
- }
- redisTemplate.delete(lockKey);
- }
- }
- }
|