SSEService.java 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package com.smcic.api.conference.service.impl;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.stereotype.Service;
  4. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
  5. import java.io.IOException;
  6. import java.util.Map;
  7. import java.util.concurrent.ConcurrentHashMap;
  8. @Service
  9. @Slf4j
  10. public class SSEService {
  11. private Map<Long, Map<String,SseEmitter>> sseDataMap = new ConcurrentHashMap<>();
  12. public void monitor(Long cid, String uuid, SseEmitter sseEmitter) {
  13. if(!sseDataMap.containsKey(cid)){
  14. sseDataMap.put(cid, new ConcurrentHashMap<>());
  15. }
  16. sseDataMap.get(cid).put(uuid,sseEmitter);
  17. }
  18. public void sendData(Long cid, Object data) {
  19. try {
  20. if (sseDataMap != null && sseDataMap.containsKey(cid)) {
  21. Map<String, SseEmitter> stringSseEmitterMap = sseDataMap.get(cid);
  22. for (String uuid : stringSseEmitterMap.keySet()){
  23. stringSseEmitterMap.get(uuid).send(SseEmitter.event().name("data").data(data));
  24. }
  25. }
  26. }catch (IOException e){
  27. log.error("发送数据失败",e);
  28. }
  29. }
  30. public void complate(Long cid, String uuid) {
  31. if (sseDataMap.containsKey(cid) && sseDataMap.get(cid).containsKey(uuid)) {
  32. Map<String, SseEmitter> stringSseEmitterMap = sseDataMap.get(cid);
  33. SseEmitter sseEmitter = stringSseEmitterMap.get(uuid);
  34. sseEmitter.complete();
  35. log.info("关闭连接");
  36. stringSseEmitterMap.remove(uuid);
  37. if(stringSseEmitterMap.isEmpty()){
  38. sseDataMap.remove(cid);
  39. }
  40. }
  41. }
  42. }