SSE(Server-Sent Events)介绍

全称Server-Sent Events,顾名思义就是服务推送事件,服务器单向的往客户端推送消息。

Server-Sent Events(SSE)就像这个广播系统。当你打开一个特定的网站或应用时,服务器(就像电台)可以实时地发送(广播)信息到你的浏览器(就像收音机)。你的浏览器只是接收信息,并不向服务器发送任何信息。

使用SSE的优势是它简单、轻量级,而且基于普通的HTTP,所以非常容易实现和使用。但需要注意的是,它只支持从服务器到客户端的单向通信,也就是说,浏览器只能接收信息,不能像在聊天应用中那样回复信息。对于需要双向通信的应用,我们可能会考虑使用其他技术,例如WebSocket。

SSE vs WebSocket

SSE WebSocket
场景 专为服务器到客户端的单向实时通信 需要全双工的双向实时通信
协议 基于 HTTP,使用常规的 HTTP 请求/响应模型。 使用自己的协议(虽然它的握手是基于HTTP的)
重连接 浏览器通常会在连接断开后自动尝试重新连接 通常需要自定义逻辑来处理重新连接
消息格式 文本格式,基于 UTF-8 编码 可以传输文本或二进制数据

谁更轻量级?

  • 从协议的复杂性和通信模式上看,SSE 通常被认为是更轻量级的。它基于简单的 HTTP,对于单向的数据流(例如实时通知或实时更新)来说非常适合。
  • 然而,WebSocket 提供了更多的灵活性和双向通信能力。对于需要高度交互的应用程序(如在线游戏、聊天应用等),WebSocket 更为适合。

总的来说,选择 SSE 还是 WebSocket 取决于特定应用的需求。如果仅需要服务器到客户端的单向实时通信,并希望保持简单,那么 SSE 可能是更好的选择。如果需要全双工通信和更大的灵活性,那么 WebSocket 更为合适。

实践

前端

sseConnect() {
      const userId = getUserId();
      this.eventSource = new EventSource(`/system/announcement/sse/connect/${userId}`);

      this.eventSource.onopen = (event) => {
        console.log('SSE Opened:', event);
      };

      this.eventSource.onmessage = (event) => {
        if (event.data === 'heartbeat') {
          return;
        }
      };

      this.eventSource.onerror = (error) => {
        console.error('SSE Error:', error);
      };
    },

因为后端会每隔30s给所有已经建立连接的客户端发送一个心跳防止锻炼,所以需要在前端判断当前收到的消息是心跳监测,还是业务消息。

后端

创建一个消息通知SSE类:

package com.sgwl.message.server;

import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * date: 2023/8/30 上午 10:09
 *
 * @author LIKEGAKKI
 * Description:
 */
@Slf4j
@Service
public class MessageNotificationsServer {

    /**
     * 用来存在线连接用户信息
     */
    private static ConcurrentHashMap<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        log.info("==消息通知SSE服务启动==");
        // 启动定时任务发送心跳
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            sendHeartbeatToAll();
            log.info("==消息通知SSE心跳监测==");
        }, 30, 30, TimeUnit.SECONDS);
    }

    private void sendHeartbeatToAll() {
        List<String> toRemove = new ArrayList<>();
        sseEmitterMap.forEach((k, v) -> {
            try {
                v.send(SseEmitter.event().data("heartbeat"));
            } catch (Exception e) {
                log.info("消息通知SSE服务,用户:{} 心跳发送失败", k);
                toRemove.add(k);
            }
        });
        toRemove.forEach(sseEmitterMap::remove);

    }


    /**
     * 连接成功方法
     *
     * @param userId 用户Id
     */
    public SseEmitter connect(String userId) {
        // 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
        SseEmitter sseEmitter = new SseEmitter(0L);
        // 注册回调
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        sseEmitterMap.put(userId, sseEmitter);
        log.info("创建新的消息通知sse连接,当前用户:{},总连接数:{}", userId, sseEmitterMap.size());
        return sseEmitter;
    }

    /**
     * sse推送消息
     *
     * @param userId
     * @param message
     */
    public void pushMessage(String userId, String message) {
        SseEmitter sseEmitter = sseEmitterMap.get(userId);
        try {
            sseEmitter.send(message, MediaType.APPLICATION_JSON);
        } catch (IOException e) {
            log.error("sse用户[{}]推送异常:{}", userId, e.getMessage());
            removeUser(userId);
        }
    }

    /**
     * 移除用户连接
     */
    public void removeUser(String userId) {
        SseEmitter emitter = sseEmitterMap.get(userId);
        if (emitter != null) {
            emitter.complete();
        }
        sseEmitterMap.remove(userId);
        log.info("移除sse用户:{}", userId);
    }


    @PreDestroy
    public void shutdownServer() {
        log.info("==关闭消息通知SSE服务==");
        sseEmitterMap.clear();
    }


    private Runnable completionCallBack(String employeeCode) {
        return () -> {
            log.info("结束sse用户连接:{}", employeeCode);
            removeUser(employeeCode);
        };
    }

    private Runnable timeoutCallBack(String employeeCode) {
        return () -> {
            log.info("连接sse用户超时:{}", employeeCode);
            removeUser(employeeCode);
        };
    }

    private Consumer<Throwable> errorCallBack(String employeeCode) {
        return throwable -> {
            log.info("sse用户连接异常:{}", employeeCode);
            removeUser(employeeCode);
        };
    }

}

创建一个Controller用来建立SSE连接:

@RequestMapping("/system/announcement")
@Slf4j
@RestController
@RequiredArgsConstructor
public class SysAnnouncementController{

    private final MessageNotificationsServer messageNotificationsServer;
    
/**
     * 连接到sse
     */
    @SaIgnore
    @Operation(summary = "连接到sse")
    @GetMapping(value = "/sse/connect/{userId}", produces = "text/event-stream")
    @Parameters({
        @Parameter(name = "userId", description = "")
    })
    public SseEmitter sseConnect(@PathVariable(value = "token") String userId) {
        if (userId != null) {
            return messageNotificationsServer.connect(userId);
        }
        return null;
    }
}

成功示例:

​ 页面一加载的时候会请求SSE建立连接。

连接成功后,等待服务端发送一个心跳监测

这里就可以看到服务端的心跳信息了