孙永军 1 éve
szülő
commit
7311a5ff6a

+ 34 - 15
src/main/java/com/sxtvs/open/api/chat/service/impl/KimiChatServiceImpl.java

@@ -12,12 +12,21 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.sxtvs.open.api.review.service.impl.SSEService;
 import jakarta.annotation.Resource;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.hc.core5.http.io.HttpClientResponseHandler;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
 import org.apache.http.util.TextUtils;
 import org.apache.logging.log4j.util.Strings;
 import org.springframework.http.*;
 import org.springframework.stereotype.Service;
 import org.springframework.web.client.RestTemplate;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.URI;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
@@ -74,25 +83,35 @@ public class KimiChatServiceImpl extends ServiceImpl<KimiChatMapper, KimiChat> i
                     kimiRequest
                 )))
                 .build();
-log.info("发送kimi数据 {}",  JSON.toJSONString(kimiRequest));
+        log.info("发送kimi数据 {}",  JSON.toJSONString(kimiRequest));
         // 发送请求并接收响应
-        client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
-                .thenApply(HttpResponse::body)
-                .thenAccept(resp -> {
-//                    System.out.println(x);
-                    log.info("流数据 -- {}",resp);
-                    if (!TextUtils.isEmpty(resp)){
-                        Arrays.stream(resp.split("\n")).forEach(x -> {
-                            if(!TextUtils.isEmpty(x.replaceAll("\n", "").trim())){
-                                sseService.sendData( dataId, x );
-                                stream2db(x.substring(6), dataId);
+        client.sendAsync(request, HttpResponse.BodyHandlers.ofInputStream())
+                .thenApply( HttpResponse::body )
+                .thenAccept( resp -> {
+                    BufferedReader reader = new BufferedReader(new InputStreamReader(resp));
+                    try {
+                        String line = "";
+                        while ((line = reader.readLine()) != null) {
+                            if (!TextUtils.isEmpty(line)) {
+                                sseService.sendData( dataId, line );
+                                if (!"data: [DONE]".equals(line)){
+                                    stream2db(line.substring(6), dataId);
+                                }
                             }
-                        });
+                        }
+                    } catch (IOException e) {
+                        log.error("io错误 {}", e.getMessage());
+                    } finally {
+                        // 服务器端主动关闭时,客户端手动关闭
+                        try {
+                            reader.close();
+                            resp.close();
+                        } catch (IOException e) {
+                            log.error("关闭链接异常 {}", e.getMessage());
+                        }
                     }
 
-                })
-                .join();
-
+                }).join();
     }
 
     private void stream2db(String resp, Long groupId){