Go 语言实现 WebSocket 推送
2021/9/30 13:10:45
本文主要是介绍Go 语言实现 WebSocket 推送,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
作者:周慧婷
写在前面
系统开发的过程中,我们经常需要实现消息推送的需求。单端单实例的情况下很好处理(网上有许多教程这里不做展开),但在分布式系统及多端需要推送的情况下该如何处理呢?
在分布式系统中,消息推送服务端是多实例的。某系统中一个服务生成一条消息,这条消息需要实时推送到多个终端,此时该如何进行有效的 WebSocket 推送呢?首先一起看看如下场景:
假设推送消息由消息实例 2 产生,但是终端真正连接的消息实例是实例 1 和实例 3,并没有连接到生产消息的实例 2,系统是如何将实例 2 的消息同步推送到终端 1 和终端 2 的呢?下文将详细描述。
基本原理
为了满足需求我们采用 redis 做协同中间件,用于存储用户信息、生成用户连接的唯一性标识以及 pod address,消息的生产者实例通过订阅 redis 获取终端连接的唯一性标识和 pod address,并通知到对应的消息实例,最终由相应连接终端的消息实例通过 WebSocket 将消息发推送到用户终端。具体流程如下图:
服务端实现
Client
Client 组件的作用,是当用户与消息服务中某个实例建立连接后,管理这个连接的信息,这里通过一个 Golang 结构体来定义:
type Client struct { UUID string UserID string Socket *websocket.Conn Send chan []byte }
结构体中的数据类型说明如下:
- UUID:对连接进行唯一性的标识,通过此标识可以查找到连接信息。
- UserID:用户 ID。
- Socket:连接对象。
- Send:消息数据 channel。
我们为 Client 结构体实现了两个方法:Read、Write 来处理消息的接受和发送。
Read 方法
Read 方法比较简单,从终端接收请求消息后,消息实例通过 WebSocket 回应接收消息状态,并不返回请求结果。结果通过 Write 方法返回。
func (c *Client) Read(close, renewal chan *Client) { defer func() { close <- c }() for { _, message, err := c.Socket.ReadMessage() if err != nil { break } // ... // message logic } }
Write 方法
Write 方法将请求结果返回给终端。Client 会监听 send channel,当 channel 有数据时,通过 socket 连接将消息发送给终端。
func (c *Client) Write(close chan *Client) { for { select { case message, ok := <-c.Send: if !ok { return } c.Socket.WriteMessage(websocket.TextMessage, message) case <-c.Ctx.Done(): return } } }
ClientManger
ClientManager 组件相当于连接池,可以管理所有的终端连接,并提供注册、注销、续期功能。
type ClientManager struct { sync.RWMutex Clients map[string]*Client Register chan *Client Unregister chan *Client Renewal chan *Client }
结构体的数据类型说明如下:
- Clients:是一个集合,用于存储创建的 Client 对象。
- Register:注册的 channel。
- 把连接注册到 Clients 中,并通过 key-value 加入 Client 集合中,key 是连接的唯一性标识 ,value 是连接本身。
- 把连接的唯一性标识和用户的 ID 以及建立连接的 pod address 信息,存储到 redis 中。
- Unregister:注销的 channel。
- 从 ClientManager 组件的 Clients 集合中移除连接对象。
- 删除 redis 对应的缓存信息。
- Renewal:续期的 channel,用于对 redis 的键续期。
ClientManager 只提供了一个 Start 方法,Start 方法提供监听注册、注销以及续期的 channel,通过监听这些 channel 来管理创建的连接对象。当这些 channel 有数据时,执行对应的操作。
func (manager *ClientManager) Start(ctx context.Context) { for { select { case conn := <-manager.Register: manager.Lock() manager.Clients[conn.UUID] = conn manager.Unlock() _, err := manager.affair.Register(ctx, &RegisterReq{ UserID: conn.UserID, UUID: conn.UUID, IP: manager.IP, }) case conn := <-manager.Unregister: _, err := manager.affair.Unregister(ctx, &UnregisterReq{ UserID: conn.UserID, UUID: conn.UUID, }) conn.Socket.Close() close(conn.Send) delete(manager.Clients, conn.UUID) case conn := <-manager.Renewal: //... // Key renewal to redis } } }
消息推送
当一个消息服务实例生产用户的消息,需要推送消息给终端时,推送步骤如下:
- 根据 userID 从 redis 读取数据,得到连接唯一性标识和 pod address 地址,这些信息是在终端第一次与服务端建立连接的时候写入 redis 的。
- 此时根据 pod address,向对应的服务器发送请求。
- 相应的消息服务实例接收到请求。
服务端接收请求的处理逻辑如下:
- 根据传递过来连接唯一性标识的参数,找到标识对应的连接。我们为 ClientManager 提供了一个 Write 方法。
func (manager *ClientManager) Write(message *Message) error { manager.RLock() client, ok := manager.Clients[message.Recipient] manager.RUnlock() if !ok { return errors.New("client miss [" + message.Recipient + "]") } return client.SendOut(message) }
此方法用到 ClientManager 组件的 Clients 集合,根据唯一性标识找到对应的 Client。再利用 Client 的 SendOut 方法,写出数据到终端。
2.定义 Client 的 SendOut 方法。此方法只负责:把接收到的消息转换为字节数组后,发送 Client 的 Send Channel 中。
func (c *Client) SendOut(message *Message) error { content, err := json.Marshal(message.Content) if err != nil { return err } c.Send <- content return nil }
- 发送数据给终端。在前文介绍 Client 组件中,已说明 Client 组件的 send channel 有数据时,会读取 channel 产生的数据,通过连接对象发送给对应的终端。
总结
以上是 Web Socket 推送消息给终端的主要思路:通过 redis 把用户的信息以及连接的标识和 pod address 存储起来,当某个消息服务实例产生消息,从 redis 读取信息,通知连接着终端的消息服务实例,再由这些服务实例通过 WebSocket 对象给终端发送消息。全象云低代码平台也集成了消息的实时推送,用户使用平台时能及时获取最新消息状态。
下期我们将为大家带来 Knative Serving 自定义弹性伸缩,请大家持续关注。
这篇关于Go 语言实现 WebSocket 推送的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24MongoDB资料:新手入门完全指南
- 2024-12-20go-zero 框架的 RPC 服务 启动start和停止 底层是怎么实现的?-icode9专业技术文章分享
- 2024-12-19Go-Zero 框架的 RPC 服务启动和停止的基本机制和过程是怎么实现的?-icode9专业技术文章分享
- 2024-12-18怎么在golang中使用gRPC测试mock数据?-icode9专业技术文章分享
- 2024-12-15掌握PageRank算法核心!你离Google优化高手只差一步!
- 2024-12-15GORM 中的标签 gorm:"index"是什么?-icode9专业技术文章分享
- 2024-12-11怎么在 Go 语言中获取 Open vSwitch (OVS) 的桥接信息(Bridge)?-icode9专业技术文章分享
- 2024-12-11怎么用Go 语言的库来与 Open vSwitch 进行交互?-icode9专业技术文章分享
- 2024-12-11怎么在 go-zero 项目中发送阿里云短信?-icode9专业技术文章分享
- 2024-12-11怎么使用阿里云 Go SDK (alibaba-cloud-sdk-go) 发送短信?-icode9专业技术文章分享