使用 Socket.D 实现简单的消息推送
前言
近期有一个需求,Java 后端要根据规则实时推送一些数据到前端,实现前端数据的实时更新。
后端向前端推送消息,就需要长连接,首先想到的就是使用 WebSocket,但是原生的 WebSocket 又有一些局限性,可能不是一个最佳的解决方案,经过阶段调研,最终选择使用 Socket.D。
Socket.D 简介
Socket.D 官网:https://socketd.noear.org/
Sokcet.D 是基于事件和语义消息流的网络应用协议。
- 事件:事件即路径、指令,每个消息都可事件路由,就像 MVC 中的请求路径,方便不同的业务安排不同的处理。
- 语义:语义即元信息标注,通过元信息进行语义描述,即对消息添加描述性的额外信息的操作,就像请求头,可以给消息数据标注内容类型,方便接收端做序列化等工作。
- 消息流:传输来回相关的消息会串成一个有关联的流,这一个流是通过 sid(streamId)为来回的相关消息建立起关联性,就算某一端重启,只要 sid 还存在(比如被持久化保存下来了),仍可实现流答复。
Sokcet.D 的其他特点:
- 语言无关:使用二进制输传数据,适配支持 tcp、ws、udp、kcp 等,也支持多语言、多平台。
- 断线重连:客户端因某些原因与服务端断开后,会通过心跳实现自动重连,或者在发送时触发自动重连。
- 多路复用:一个连接便可允许多个请求和响应消息同时运行,不需要多连接,也不需要连接池。
- 双向通讯:支持单链接双向互听互发,不管是哪一端都可以是业务的 “请求者” 或 “响应者”。
- 自动分片:数据超出 16MB,会自动分片、自动重组,分片大小可自定义配置。
- 接口简单:响应式接口,但用的是监听与回调风格,经典易用。
Socket.D 与其他协议对比:
对比项目 | socket.d | http | websocket | rsocket | socket.io |
---|---|---|---|---|---|
发消息 Qos0 | 有 | 无 | 有 | 有 | 有 |
发送并请求 Qos1 | 有 | 有 | 无 | 有 | 无 |
发送并订阅 | 有 | 无 | 无 | 有 | 无 |
答复或响应 | 有 | 有 | 无 | 有 | 无 |
单连接双向通讯 | 有 | 无 | 有但不便 | 有 | 有但不便 |
数据分片 | 有 | / | 无 | 有 | 有 |
断线自动重连 | 有 | / | 无 | 有 | 有 |
有元信息 | 有 | 有 | 无 | 有 | 无 |
有事件或路径 | 有 | 有 | 无 | 无 | 有 |
有流或消息关联性 | 有 | / | 无 | 有 | 无 |
Broker 模式集群 | 有 | 无 | 无 | 有 | 无 |
异步 | 异步 | 同步 | 异步 | 异步 | 异步 |
多路复用 | 有 | 无 | 有 | 有 | 有 |
接口体验 | 经典 | 经典 | 经典 | 响应式但复杂 | 经典 |
基础传输协议 | tcp/udp/ws | tcp | http | tcp/udp/ws | ws |
构建 Spring Boot 服务端
在项目中,需要 Java 后端发送数据给 Vue 前端,因此可以将 Java 后端作为服务端。
使用 IntelliJ IDEA 创建 Spring Boot 项目,并导入 Spring Web 依赖。
在 Spring Boot 项目中整合 Socket.D,使用具体的传输协议,要导入对应的适配包依赖,比如使用 sd:ws 协议,要导入 org.noear:socketd-transport-java-websocket 依赖,sd:ws 表示使用 Socket.D 应用协议 + WebSocket 传输协议。
Maven 项目配置文件(pom.xml):
<dependency>
<groupId>org.noear</groupId>
<artifactId>socketd-transport-java-websocket</artifactId>
<version>2.5.12</version>
</dependency>
Spring Boot 中构建 Socket.D 服务端,并使用 Spring IoC 容器管理服务端,要求:
- 在 IoC 容器中构建监听器的 Bean 对象,用于监听连接开启、消息接收、连接关闭等工作。
- 在 IoC 容器中构建 Socket.D Server 对象,用于启动 Socket.D 服务端。
Socket.D 自定义监听器(cn.duozai.sdserver.SdListener):
@Component // 将自定义监听器标记为Bean对象
public class SdListener implements Listener {
// 定义一个会话对象列表,用于存储连接的会话对象
// 也可以存到Map集合中,以k-v来区分每一个会话
private static final List<Session> sessionList = new ArrayList<>();
public List<Session> getSessionList() {
return sessionList;
}
/**
* 当打开连接时执行
*
* @param session 会话对象
* @return void
*/
@Override
public void onOpen(Session session) throws IOException {
// 连接打开时,得到会话对象,并将其存储下来,用于后续发送消息
// 每一个客户端连接到服务端时,都会创建一个会话对象,将其添加到会话对象列表中
sessionList.add(session);
}
/**
* 当收到消息时执行
*
* @param session 会话对象
* @param message 消息对象
* @return void
*/
@Override
public void onMessage(Session session, Message message) throws IOException {
}
/**
* 当关闭连接时执行
*
* @param session 会话对象
* @return void
*/
@Override
public void onClose(Session session) {
// 连接关闭时,从会话对象列表中移除该会话对象
sessionList.remove(session);
}
/**
* 当发生异常时执行
*
* @param session 会话对象
* @param error 异常对象
* @return void
*/
@Override
public void onError(Session session, Throwable error) {
}
}
自定义监听器可以标记为 Bean 对象,以交给 Spring IoC 容器进行管理。自定义监听器可以实现监听器接口,也可以重写内置监听器。
每一个客户端连接到服务端时,都会触发监听器的相关方法,得到一个连接会话,因此需要将每一个连接会话对象保存下来,用于后续发送消息。
Socket.D 服务端配置类(cn.duozai.sdserver.SdServerConfig):
@Component // 将Socket.D服务端配置类标记为Bean对象
public class SdServerConfig {
private Server server;
@Resource
SdListener sdListener;
/**
* Bean创建时执行
*
* @return void
*/
@PostConstruct // 监听Bean的创建
public void start() throws IOException {
// 在Socket.D服务端配置类的Bean对象创建时启动服务端
SocketD.createServer("sd:ws") // 指定使用Socket.D应用协议+WebSocket传输协议
.config(c -> c.port(8609)) // 指定服务端端口号为8609
.listen(sdListener) // 指定监听器
.start();
}
/**
* Bean销毁时执行
*
* @return void
*/
@PreDestroy // 监听Bean的销毁
public void stop() {
// 在Socket.D服务端配置类的Bean对象销毁时关闭服务端
server.stop();
}
}
Socket.D 服务端配置类需要监听 Bean 对象的创建和销毁,在 Bean 对象创建的时候启动 Socket.D 服务端,在 Bean 对象销毁的时候关闭 Socket.D 服务端。
由于 Spring IoC 容器中的 Bean 对象是单例的,在 Spring IoC 容器启动的时候就会被创建,因此在 Spring Boot 项目启动的时候,Socket.D 服务端也启动起来了。
构建 Vue 客户端
在项目中,需要 Vue 前端接收数据,因此可以将 Vue 前端作为客户端,接收服务端传递的数据。
使用 npm 创建 Vue 3 项目,并导入 Socket.D 依赖。
终端执行指令:
npm install @noear/socket.d@2.5.12
在 Vue 项目中,需要创建客户端,并连接到服务端,得到客户端会话对象。
根组件(src/App.vue):
<script setup lang="ts">
import {SocketD} from "@noear/socket.d";
// 创建客户端,连接到服务端,得到会话
const session = await SocketD.createClient("sd:ws://127.0.0.1:8609/")
.open();
</script>
运行 Vue 项目,访问根组件时,Socket.D 客户端就启动起来了,并且连接到了 Socket.D 服务端。
服务端发送消息
Java 后端发送消息到客户端,需要借助连接会话,此时可以调用自定义监听器中的连接会话对象,进行消息的传递。
测试控制器(cn.duozai.sdserver.DemoController):
@RestController
public class DemoController {
@Resource
SdListener sdListener;
/**
* 发送消息
*
* @return java.lang.String
*/
@GetMapping(value = "/demo")
public String test() throws IOException {
// 从自定义监听器对象中获取会话对象
List<Session> sessionList = sdListener.getSessionList();
for (int i = 0; i < sessionList.size(); i++) {
// 使用会话对象发送消息给客户端
// 参数1-event:事件名称,可以看作是请求路径
// 参数2-entity:消息实体,StringEntity即字符串实体,用于传输字符串
sessionList.get(i).send("/demo01", new StringEntity("client id = " + (i+1) + ":你好呀,Socket.D!"));
}
return "ok";
}
}
客户端接收消息
在 Vue 前端接收服务端传递的消息,需要配置监听器,监听消息的传递。
根组件(src/App.vue):
<script setup lang="ts">
import {SocketD} from "@noear/socket.d";
// 创建客户端,连接到服务端,得到会话
const session = await SocketD.createClient("sd:ws://127.0.0.1:8609/")
.listen(SocketD.newEventListener() // EventListener:创建内置事件监听器,可以针对不同的路径进行处理
.doOn("/demo01", (session, message) => { // 代理事件的路由处理
// 收到/demo01事件的消息时,执行该回调参数
// 参数1-session:会话对象
// 参数2-message:消息对象,包含sid流id+event事件+entity实体
// 在消息对象的原型链上,有一个dataAsString方法,可以直接获取消息体字符串
console.log("收到消息:", message.dataAsString())
}))
.open();
</script>
先运行 Java 后端,Socket.D 服务端启动,再运行 Vue 前端,Socket.D 客户端也启动了,且连接到了服务端,此时再访问 Java 后端的相关方法,即可实现服务端给客户端发送消息的需求。
结语
Socket.D 具备了 HTTP、WebSocket、Socket.io 等协议的优点,又避开了它们的缺点,确实可以称之为更具普世性的通用协议。
使用 Socket.D,可以更简便的实现一些需求,Socket.D 也提供了一些新特性,结合这些新特性,可以在实际应用中玩的更 “花”。
在使用 Socket.D 的过程中,依然要勤于查阅文档,才能把 Socket.D 玩透。