SSE(Server-Sent Events)介绍
全称Server-Sent Events,顾名思义就是服务推送事件,服务器单向的往客户端推送消息。
Server-Sent Events(SSE)就像这个广播系统。当你打开一个特定的网站或应用时,服务器(就像电台)可以实时地发送(广播)信息到你的浏览器(就像收音机)。你的浏览器只是接收信息,并不向服务器发送任何信息。
使用SSE的优势是它简单、轻量级,而且基于普通的HTTP,所以非常容易实现和使用。但需要注意的是,它只支持从服务器到客户端的单向通信,也就是说,浏览器只能接收信息,不能像在聊天应用中那样回复信息。对于需要双向通信的应用,我们可能会考虑使用其他技术,例如WebSocket。
SSE vs WebSocket
SSE | WebSocket | |
---|---|---|
场景 | 专为服务器到客户端的单向实时通信 | 需要全双工的双向实时通信 |
协议 | 基于 HTTP,使用常规的 HTTP 请求/响应模型。 | 使用自己的协议(虽然它的握手是基于HTTP的) |
重连接 | 浏览器通常会在连接断开后自动尝试重新连接 | 通常需要自定义逻辑来处理重新连接 |
消息格式 | 文本格式,基于 UTF-8 编码 | 可以传输文本或二进制数据 |
谁更轻量级?
- 从协议的复杂性和通信模式上看,SSE 通常被认为是更轻量级的。它基于简单的 HTTP,对于单向的数据流(例如实时通知或实时更新)来说非常适合。
- 然而,WebSocket 提供了更多的灵活性和双向通信能力。对于需要高度交互的应用程序(如在线游戏、聊天应用等),WebSocket 更为适合。
总的来说,选择 SSE 还是 WebSocket 取决于特定应用的需求。如果仅需要服务器到客户端的单向实时通信,并希望保持简单,那么 SSE 可能是更好的选择。如果需要全双工通信和更大的灵活性,那么 WebSocket 更为合适。
实践
前端
sseConnect() {
const userId = getUserId();
this.eventSource = new EventSource(`/system/announcement/sse/connect/${userId}`);
this.eventSource.onopen = (event) => {
console.log('SSE Opened:', event);
};
this.eventSource.onmessage = (event) => {
if (event.data === 'heartbeat') {
return;
}
};
this.eventSource.onerror = (error) => {
console.error('SSE Error:', error);
};
},
因为后端会每隔30s给所有已经建立连接的客户端发送一个心跳防止锻炼,所以需要在前端判断当前收到的消息是心跳监测,还是业务消息。
后端
创建一个消息通知SSE类:
package com.sgwl.message.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* date: 2023/8/30 上午 10:09
*
* @author LIKEGAKKI
* Description:
*/
@Slf4j
@Service
public class MessageNotificationsServer {
/**
* 用来存在线连接用户信息
*/
private static ConcurrentHashMap<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
log.info("==消息通知SSE服务启动==");
// 启动定时任务发送心跳
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
sendHeartbeatToAll();
log.info("==消息通知SSE心跳监测==");
}, 30, 30, TimeUnit.SECONDS);
}
private void sendHeartbeatToAll() {
List<String> toRemove = new ArrayList<>();
sseEmitterMap.forEach((k, v) -> {
try {
v.send(SseEmitter.event().data("heartbeat"));
} catch (Exception e) {
log.info("消息通知SSE服务,用户:{} 心跳发送失败", k);
toRemove.add(k);
}
});
toRemove.forEach(sseEmitterMap::remove);
}
/**
* 连接成功方法
*
* @param userId 用户Id
*/
public SseEmitter connect(String userId) {
// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注册回调
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
log.info("创建新的消息通知sse连接,当前用户:{},总连接数:{}", userId, sseEmitterMap.size());
return sseEmitter;
}
/**
* sse推送消息
*
* @param userId
* @param message
*/
public void pushMessage(String userId, String message) {
SseEmitter sseEmitter = sseEmitterMap.get(userId);
try {
sseEmitter.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
log.error("sse用户[{}]推送异常:{}", userId, e.getMessage());
removeUser(userId);
}
}
/**
* 移除用户连接
*/
public void removeUser(String userId) {
SseEmitter emitter = sseEmitterMap.get(userId);
if (emitter != null) {
emitter.complete();
}
sseEmitterMap.remove(userId);
log.info("移除sse用户:{}", userId);
}
@PreDestroy
public void shutdownServer() {
log.info("==关闭消息通知SSE服务==");
sseEmitterMap.clear();
}
private Runnable completionCallBack(String employeeCode) {
return () -> {
log.info("结束sse用户连接:{}", employeeCode);
removeUser(employeeCode);
};
}
private Runnable timeoutCallBack(String employeeCode) {
return () -> {
log.info("连接sse用户超时:{}", employeeCode);
removeUser(employeeCode);
};
}
private Consumer<Throwable> errorCallBack(String employeeCode) {
return throwable -> {
log.info("sse用户连接异常:{}", employeeCode);
removeUser(employeeCode);
};
}
}
创建一个Controller用来建立SSE连接:
@RequestMapping("/system/announcement")
@Slf4j
@RestController
@RequiredArgsConstructor
public class SysAnnouncementController{
private final MessageNotificationsServer messageNotificationsServer;
/**
* 连接到sse
*/
@SaIgnore
@Operation(summary = "连接到sse")
@GetMapping(value = "/sse/connect/{userId}", produces = "text/event-stream")
@Parameters({
@Parameter(name = "userId", description = "")
})
public SseEmitter sseConnect(@PathVariable(value = "token") String userId) {
if (userId != null) {
return messageNotificationsServer.connect(userId);
}
return null;
}
}
成功示例:
页面一加载的时候会请求SSE建立连接。
连接成功后,等待服务端发送一个心跳监测
这里就可以看到服务端的心跳信息了