孙永军 2 anni fa
parent
commit
38c643c3fc

+ 5 - 0
src/main/java/com/sxtvs/open/api/news/controller/YoumeiDataController.java

@@ -33,4 +33,9 @@ public class YoumeiDataController {
     public Page<YoumeiEsData> search(@RequestBody DataRequestDTO dataRequestDTO){
         return youmeiDataService.search(dataRequestDTO);
     }
+
+    @RequestMapping("/put/MWEZOWQ2NDYXMJU0NW")
+    public void put2es(){
+        youmeiDataService.putDataRun();
+    }
 }

+ 46 - 0
src/main/java/com/sxtvs/open/api/news/service/impl/YoumeiDataServiceImpl.java

@@ -6,8 +6,10 @@ import co.elastic.clients.elasticsearch._types.SortOrder;
 import co.elastic.clients.elasticsearch._types.query_dsl.MultiMatchQuery;
 import co.elastic.clients.elasticsearch._types.query_dsl.Query;
 import co.elastic.clients.elasticsearch._types.query_dsl.RangeQuery;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
 import co.elastic.clients.elasticsearch.core.SearchRequest;
 import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
 import co.elastic.clients.json.JsonData;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -15,10 +17,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.sxtvs.open.api.news.dto.DataRequestDTO;
 import com.sxtvs.open.api.news.entity.YoumeiData;
 import com.sxtvs.open.api.news.entity.YoumeiEsData;
+import com.sxtvs.open.api.news.entity.YoumeiOffset;
 import com.sxtvs.open.api.news.mapper.YoumeiDataMapper;
 import com.sxtvs.open.api.news.service.IYoumeiDataService;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
 import java.io.IOException;
@@ -37,6 +41,7 @@ import java.util.Map;
  * @since 2023-02-20
  */
 @Service
+@Slf4j
 public class YoumeiDataServiceImpl extends ServiceImpl<YoumeiDataMapper, YoumeiData> implements IYoumeiDataService {
     @Resource
     private ElasticsearchClient elasticsearchClient;
@@ -92,4 +97,45 @@ public class YoumeiDataServiceImpl extends ServiceImpl<YoumeiDataMapper, YoumeiD
 
         return page;
     }
+
+    public void putDataRun(){
+        (new Thread(this::putData)).start();
+    }
+
+    public void putData()  {
+        log.info("ES 入数据开始");
+        YoumeiOffset youmeiOffset = youmeiOffsetService.getById(4);
+        long offset = youmeiOffset.getOffset() + 1L;
+        long max = 0L;
+        while (true){
+            List<YoumeiData> youmeiDataList = lambdaQuery().gt(YoumeiData::getOffset, offset).last("limit 1000").list();
+            log.info("本次数据{}条,offset从{}开始",youmeiDataList.size(), offset);
+            if (youmeiDataList.size() == 0){
+                break;
+            }
+            max = youmeiDataList.get(youmeiDataList.size()-1).getOffset();
+            List<BulkOperation> bulkOperations = new ArrayList<>();
+
+            youmeiDataList.forEach(a -> {
+                bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(String.valueOf(a.getOffset())).document(a))));
+            });
+
+            try {
+                elasticsearchClient.bulk(BulkRequest.of(x ->x.index("news_data").operations(bulkOperations)));
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            if (youmeiDataList.size() < 1000){
+                break;
+            }
+
+            offset += 1000;
+        }
+        if(max > 0L){
+            youmeiOffset.setOffset(max);
+            youmeiOffsetService.updateById(youmeiOffset);
+        }
+        log.info("ES 入数据结束");
+    }
 }

+ 2 - 48
src/main/java/com/sxtvs/open/job/PutEsJob.java

@@ -1,64 +1,18 @@
 package com.sxtvs.open.job;
 
-import co.elastic.clients.elasticsearch.ElasticsearchClient;
-import co.elastic.clients.elasticsearch.core.BulkRequest;
-import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
-import com.sxtvs.open.api.news.entity.YoumeiData;
-import com.sxtvs.open.api.news.entity.YoumeiOffset;
 import com.sxtvs.open.api.news.service.impl.YoumeiDataServiceImpl;
-import com.sxtvs.open.api.news.service.impl.YoumeiOffsetServiceImpl;
 import jakarta.annotation.Resource;
-import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
 @Component
-@Slf4j
 public class PutEsJob {
-    @Resource
-    private ElasticsearchClient elasticsearchClient;
 
     @Resource
     private YoumeiDataServiceImpl youmeiDataService;
 
-    @Resource
-    private YoumeiOffsetServiceImpl youmeiOffsetService;
-
-//    @Scheduled(cron = "0 38 * * * *")
+    // @Scheduled(cron = "0 38 * * * *")
     public void putData()  {
-        log.info("ES 入数据开始");
-        YoumeiOffset youmeiOffset = youmeiOffsetService.getById(4);
-        long offset = youmeiOffset.getOffset() + 1L;
-        long max = 0L;
-        while (true){
-            List<YoumeiData> youmeiDataList = youmeiDataService.lambdaQuery().gt(YoumeiData::getOffset, offset).last("limit 1000").list();
-            log.info("本次数据{}条,offset从{}开始",youmeiDataList.size(), offset);
-            if (youmeiDataList.size() == 0){
-                break;
-            }
-            max = youmeiDataList.get(youmeiDataList.size()-1).getOffset();
-            List<BulkOperation> bulkOperations = new ArrayList<>();
-
-            youmeiDataList.forEach(a -> {
-                bulkOperations.add(BulkOperation.of(b -> b.index(c -> c.id(String.valueOf(a.getOffset())).document(a))));
-            });
-
-            try {
-                elasticsearchClient.bulk(BulkRequest.of(x ->x.index("news_data").operations(bulkOperations)));
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-
-            offset += 1000;
-        }
-        if(max > 0L){
-            youmeiOffset.setOffset(max);
-            youmeiOffsetService.updateById(youmeiOffset);
-        }
-        log.info("ES 入数据结束");
+        youmeiDataService.putDataRun();
     }
 }