WebSocket 是一种双向通信协议,客户端与服务器之间的连接在关闭之前都是开放的。这对于需要频繁请求和响应的应用程序非常有用。最显著的应用包括聊天应用和任何实时更新应用。
Websocket 的替代方案
- SSE(服务器发送事件): 其主要理念是服务器向订阅的客户端发送事件。它比 Websocket 更轻便。但缺点是只能由服务器发送信息,不能反向发送
- 长轮询: 其原理是客户端向服务器发送请求,服务器将长时间保持连接,直到所需的资源可用。与网络套接字相比,这种方式效率不高。但当我们因故无法使用 websocket 时,它就会被广泛使用。
- 消息队列:这是 Websocket 的高级版本。根据我们选择的队列类型,我们可以同时向多个客户端发送事件。但这需要一些努力才能实现。
在 Golang 中实现 Websocket
有许多可用的库,如 gin、gorilla、melody、iris 等。但从性能和易用性两方面来看,gorilla 都是其中的佼佼者。
初始化连接
// Create Upgrader, which has all the buffer size, and Origin request.
// Check Origin you can have custom based on source, or just return true
// for all the source.
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
}
// create websocket connection and handle client
func createWebSocketConnection(w http.ResponseWriter, r *http.Request) {
// create connection
client, err := i.upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("unable to create new websocket connection. Error - %s", err)
}
// To send messsage to client
if err := client.WriteMessage(1, data); err != nil {
logger.Warnf("unable to write the message to client.")
}
// To read message from client
messageType, message, err := client.ReadMessage()
if err != nil {
logger.Errorf("unable to read message from client")
}
// messageType indicates the type of data .i.e. either text, binary..
// message is actual message
// err Error if any
}
处理多个用户
当我们处理多个用户时,总是建议使用互斥锁,以避免并发访问。下面我维护了(客户端 -> 通道)的映射。当客户端关闭时,我们可以通过该通道发送通知,这样就可以帮助完成清理任务。
// create Map which stores, all the websocket clients
type Client struct {
pool map[*websocket.Conn]chan bool
locker *sync.RWMutex
}
下面的方法将接收所有传入的 websocket 连接,并注册到现有工作流中:
// placeholder to store the clients
clientPool := Client{
pool : make(map[*websocket.Conn]chan bool),
locker : &sync.RWMutex{},
}
// Method which adds incoming connection
func handleConnection(w http.ResponseWriter, r *http.Request) {
// create websocket connection
ws, err := i.upgrader.Upgrade(w, r, nil)
if err != nil {
logger.Errorf("unable to create new websocket connection. Error - %s", err)
return
}
// add to client pool
channel := make(chan bool)
clientPool.locker.Lock()
clientPool.pool[ws] = channel
clientPool.locker.Unlock()
// Start the main work only once
if isWorkNotStarted {
isWorkNotStarted = false
// run in separate thread
go StartMainWork()
}
// wait for channel to close.
<-channel
}
func StartMainWork() {
// Do some work
sendNotificationToClients(data)
// Do Some more work
sendNotificationToClients(data)
// close all the clients
CloseConnection()
}
func sendNotificationToClients(data []byte) {
for client, channel := range clientPool.pool {
clientPool.locker.Lock()
if err := client.WriteMessage(1, data); err != nil {
logger.Warnf("unable to write the message to client.")
// close the thread
channel <- true
client.Close()
}
clientPool.locker.Unlock()
}
}
func closeConnection() {
for client, channel := range clientPool.pool {
client.Close()
channel <- true
}
}
设置 PingHandler
在某些情况下,如果客户端突然断开连接,服务器需要进行处理,在这种情况下,我们需要某种机制来检查客户端是否还活着。最广泛使用的机制是 “ping”。如果你使用的是 gorilla,它会提供接口,而我们需要自己实现。
// method to check if client is open or closed.
func isClientOpen(client *websocket.Conn) bool {
if err := client.WriteMessage(websocket.PingMessage, nil); err != nil {
logger.Warnf("unable to send the ping message, looks like client is closed.")
// close the client
client.Close()
}
}
go 提供了 timeTicker,使用它,我们可以在给定的固定时间后轻松生成一个方法。我们还可以制定一条规则,规定如果三次 ping 失败,我们将关闭连接。
// check ping for every 5 seconds
ticker := time.NewTicker(5 * time.Second)
// execute below in separate thread, with proper channel between them
for {
select {
case <-ticker.C:
isClientClose()
}
}
版权声明:本文内容转自互联网,本文观点仅代表作者本人。本站仅提供信息存储空间服务,所有权归原作者所有。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至1393616908@qq.com 举报,一经查实,本站将立刻删除。