孙永军 1 year ago
parent
commit
0a5c8981e9

+ 5 - 3
src/main/java/com/sxtvs/open/api/chat/service/impl/KimiChatServiceImpl.java

@@ -64,7 +64,7 @@ public class KimiChatServiceImpl extends ServiceImpl<KimiChatMapper, KimiChat> i
         return headers;
     }
 
-    public void chatCompletions(KimiRequest kimiRequest, Long dataId) {
+    public void chatCompletions(KimiRequest kimiRequest, Long groupId) {
 
         HttpClient client = HttpClient.newBuilder().build();
 
@@ -89,9 +89,9 @@ public class KimiChatServiceImpl extends ServiceImpl<KimiChatMapper, KimiChat> i
                         while ((line = reader.readLine()) != null) {
                             log.info("流数据 -- {} ", line);
                             if (!TextUtils.isEmpty(line)) {
-                                sseService.sendData( dataId, line );
+                                sseService.sendData( groupId, line );
                                 if (!"data: [DONE]".equals(line) && !line.isEmpty()) {
-                                    stream2db(line.substring(6), dataId);
+                                    stream2db(line.substring(6), groupId);
                                 }
                             }
                         }
@@ -102,12 +102,14 @@ public class KimiChatServiceImpl extends ServiceImpl<KimiChatMapper, KimiChat> i
                         try {
                             reader.close();
                             resp.close();
+
                         } catch (IOException e) {
                             log.error("关闭链接异常 {}", e.getMessage());
                         }
                     }
 
                 }).join();
+        sseService.complate(groupId);
     }
 
     private void stream2db(String resp, Long groupId){

+ 9 - 4
src/main/java/com/sxtvs/open/api/review/controller/SseController.java

@@ -40,12 +40,17 @@ public class SseController {
         return sseEmitter;
     }
 
-    @GetMapping(path = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    @Resource
+    private KimiChatServiceImpl kimiChatService;
+
+    @PostMapping(path = "/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
     @NoAPIResponse
-    public SseEmitter handleSsePost(@RequestParam("token") String token,
-                                    @RequestParam("groupId") Long gid) throws IOException {
+    public SseEmitter handleSsePost(@RequestBody KimiChat kimiChat) throws IOException {
         SseEmitter sseEmitter = new SseEmitter(0L); // 设置超时时间
-        sseService.monitor(gid, sseEmitter);
+        sseService.monitor(kimiChat.getGroupId(), sseEmitter);
+        poolExecutor.execute(() -> {
+            kimiChatService.create(kimiChat);
+        });
         sseEmitter.send(SseEmitter.event().name("hello").data("hello"));
         return sseEmitter;
     }

+ 7 - 0
src/main/java/com/sxtvs/open/api/review/service/impl/SSEService.java

@@ -60,4 +60,11 @@ public class SSEService {
         }
         return false;
     }
+
+    public void complate(Long gid) {
+        if (sseDataMap.containsKey(gid)){
+            SseEmitter sseEmitter = sseDataMap.get(gid);
+            sseEmitter.complete();
+        }
+    }
 }