孙永军 1 рік тому
батько
коміт
0a7eccad23

+ 2 - 0
src/main/java/com/smcic/SspServerApplication.java

@@ -3,11 +3,13 @@ package com.smcic;
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 @SpringBootApplication
 @MapperScan("com.smcic")
 @EnableScheduling
+@EnableAsync
 public class SspServerApplication {
 
     public static void main(String[] args) {

+ 3 - 8
src/main/java/com/smcic/api/operate/job/Kafka2Db.java

@@ -9,6 +9,7 @@ import lombok.extern.log4j.Log4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -26,14 +27,6 @@ public class Kafka2Db {
     @Resource
     private KafkaConsumer<String, String> consumer;
 
-    @Resource
-    private ThreadPoolExecutor threadPoolExecutor;
-
-    @PostConstruct
-    public void init(){
-        threadPoolExecutor.execute(this::run);
-    }
-
     @Resource
     private VoteInfoServiceImpl voteInfoService;
 
@@ -45,6 +38,8 @@ public class Kafka2Db {
 
     private final SnowflakeGenerator snowflakeGenerator = new SnowflakeGenerator(1, 1);
 
+
+    @Async("taskExecutor")
     public void run() {
         consumer.subscribe(Collections.singleton("operate_vote_queue"));
         String lockKey = "OPERATE_KAFKA_LOCK";

+ 2 - 0
src/main/java/com/smcic/api/operate/service/impl/VoteInfoServiceImpl.java

@@ -17,6 +17,7 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.Resource;
 import java.time.Duration;
 import java.time.LocalDate;
@@ -52,6 +53,7 @@ public class VoteInfoServiceImpl extends ServiceImpl<VoteInfoMapper, VoteInfo> i
     @Resource
     private ObjectMapper objectMapper;
 
+
     public void vote(String target, String source, String client, Integer operateId){
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
         String dt = formatter.format(LocalDate.now());

+ 2 - 3
src/main/java/com/smcic/api/tags/job/UserTagsConsumer.java

@@ -9,6 +9,7 @@ import com.smcic.api.tags.service.impl.UserTagsService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.util.TextUtils;
 import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
@@ -23,9 +24,6 @@ import java.util.stream.Collectors;
 @Slf4j
 public class UserTagsConsumer {
 
-    @Resource
-    private UserTagsService userTagsService;
-
     @Resource
     private UserTagsMapper userTagsMapper;
 
@@ -33,6 +31,7 @@ public class UserTagsConsumer {
     private ThreadPoolExecutor threadPoolExecutor;
 
 
+    @Async("taskExecutor")
     public void run(){
 
         System.out.println("启动队列消费");

+ 1 - 6
src/main/java/com/smcic/api/tags/service/impl/UserTagsService.java

@@ -22,14 +22,9 @@ public class UserTagsService extends ServiceImpl<UserTagsMapper, UserTags> imple
     @Resource
     private UserTagsConsumer userTagsConsumer;
 
-    @Resource
-    private ThreadPoolExecutor threadPoolExecutor;
-
     @PostConstruct
     public void init(){
-        threadPoolExecutor.execute(() -> {
-            userTagsConsumer.run();
-        });
+        userTagsConsumer.run();
     }
     public void insert(UserTagsDTO userTags){
         log.info("insert userTags:{}",userTags);

+ 19 - 0
src/main/java/com/smcic/core/conf/ThreadPoolFactory.java

@@ -3,6 +3,7 @@ package com.smcic.core.conf;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
 
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -10,6 +11,8 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 @Configuration
+
+@EnableAsync
 public class ThreadPoolFactory {
 
     @Bean(name="threadPoolExecutor")
@@ -27,4 +30,20 @@ public class ThreadPoolFactory {
                 new LinkedBlockingQueue<>(),
                 namedThreadFactory);
     }
+
+    @Bean(name="taskExecutor")
+    public ThreadPoolExecutor taskExecutor(){
+        // 给线程指定名称,方便查看线程编号
+        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("task-thread-pool-%d").build();
+
+        // 创建线程池
+
+        return new ThreadPoolExecutor(8,
+                128,
+                300,
+                TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                namedThreadFactory);
+    }
 }

+ 1 - 1
ssp-server.yml

@@ -4,7 +4,7 @@ metadata:
   name: ssp-server
 spec:
   minReadySeconds: 5
-  replicas: 1
+  replicas: 2
   selector:
     matchLabels:
       k8s-app: ssp-server