第54节 OpenIM 重构确定协议
❤️💕💕记录sealos开源项目的学习过程。k8s,docker和云原生的学习。Myblog:http://nsddd.top
[TOC]
确定协议
客户端和服务端之间的协议 (1)心跳 (2)消息拉取 (3)推送 (4)消息发送
客户端内部的 (1)消息同步协程和会话协程之间
消息; 2.同步开始;3.同步结束; (2)ws协程和消息同步协程之间
心跳响应(最大seq);2.推送消息;3.连接成功 (3)任何发送ws请求协程和ws协程之间; 1.通过SendReqWaitResp封装
我们需要拿到数据:
(2)ws协程和消息同步协程之间 1.心跳响应(最大seq);2.推送消息;3.连接成功
先判断 cmd,然后做逻辑部分:
CmdMaxSeq = "maxSeq" //心跳响应(最大seq)
CmdPushMsg = "pushMsg" //推送信息
CmdConnSuccesss = "connSuccess" //连接成功
第一个你会接受到推送过来的消息
第二就是在服务器定时的去获取到每一个群,或者是每一个会话里的所有的就是每一个绘画的最新的最大的也是最大的seq。这样我们这个模块就知道我们缺了哪些消息
第三个如果同年成功或者就连接成功,然后也有ws那个模块会丢一重庆成功的 Cmd给你,就是说你现在会接受三种类型的消息,像是那些通知在c群里面,这个有没有问题。
重连成功后就要通知你开始同步信息了。
结构体:
// The callback synchronization starts. The reconnection endsc
type MsgSyncer struct {
loginUserID string // login user ID
longConnMgr *LongConnMgr // long connection manager
PushMsgAndMaxSeqCh chan common.Cmd2Value // channel for receiving push messages and the maximum SEQ number
conversationCh chan common.Cmd2Value // storage and session triggering
syncedMaxSeqs map[string]SyncedSeq // map of the maximum synced SEQ numbers for all group IDs
db db_interface.DataBase // data store
syncTimes int // times of sync
ctx context.Context // context
cancel context.CancelFunc // cancel function
}
conversationCh 是干嘛的?实际上这就是消息模块的一个切入,就是你刚才就是说会一些把某些数据给到这个东西就给到嵌入里面,它有哪些东西呢?这里面两种消息,就一种是消息,就是你同步的消息,刚才不是说同步一条或者同步10条是不是给到他,对吧?他来做存储,他自己要做存储和会话的触发。
所以说这里的 channel 收到的是三种信息:
- 第一种是真正同步的消息
- 第二种是同步开始、同步结束的一种通知,他要给客户端进行显示
loadSeq函数:
就是说你初始化的时候,实际上初始化的时候实际上你的就已经同步,但是你是不知道的,所以到时候DB那边可能要提供一些函数能告诉你,就是说比如说我有了我这个用户有哪有哪些会话,然后每个会话最大的一个是什么情况,我要在DB本地读出来寄到内存里面,可能是一个类似 Map的东西,在初始化的时候执行一次,这个时候就能确定 seqs
使用 switch:
case cmd := <-m.PushMsgAndMaxSegCh:
m.handleRecvMsgAndSyncSeqs(cmd)
定义部分:
type Cmd2Value struct {
Cmd string
Value interface{}
Ctx context.Context
}
value不一定是msgdata
func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.MsgData, ch chan Cmd2Value) error {
if ch == nil {
return utils.Wrap(errors.New("ch == nil"), "")
}
c2v := Cmd2Value{Cmd: constant.CmdPushMsg, Value: msg, Ctx: ctx}
return sendCmd(ch, c2v, 100)
}
func TriggerCmdMaxSeq(seq sdk_struct.CmdMaxSeqToMsgSync, ch chan Cmd2Value) error {
if ch == nil {
return utils.Wrap(errors.New("ch == nil"), "")
}
c2v := Cmd2Value{Cmd: constant.CmdMaxSeq, Value: seq}
return sendCmd(ch, c2v, 100)
}
func TriggerCmdConnected(ctx context.Context, ch chan Cmd2Value) error {
if ch == nil {
return utils.Wrap(errors.New("ch == nil"), "")
}
c2v := Cmd2Value{Cmd: constant.CmdConnSuccesss, Value: nil, Ctx: ctx}
return sendCmd(ch, c2v, 100)
}
分别是这三个 Cmd2Value
调整:
ws *Ws
这个换掉msgCache
可以删除handleRecvMsgAndSyncSeqs
要对cmd
解析一下
案例
就拿我们的一个案例来说:
type Protocol struct {
heartbeat chan []byte // 心跳
pullMsg chan []byte // 消息拉取
pushMsg chan []byte // 推送消息
sendMsg chan []byte // 消息发送
syncMsg chan []byte // 消息同步
syncStart chan []byte // 同步开始
syncFinish chan []byte // 同步结束
wsResp chan []byte // WS响应
wsPush chan []byte // WS推送
wsConn chan []byte // WS连接成功
wsReq chan []byte // 任何发送WS请求
}
func (p *Protocol) init() {
p.heartbeat = make(chan []byte) // 初始化心跳协议
p.pullMsg = make(chan []byte) // 初始化消息拉取协议
p.pushMsg = make(chan []byte) // 初始化推送消息协议
p.sendMsg = make(chan []byte) // 初始化消息发送协议
p.syncMsg = make(chan []byte) // 初始化消息同步协议
p.syncStart = make(chan []byte) // 初始化同步开始协议
p.syncFinish = make(chan []byte) // 初始化同步结束协议
p.wsResp = make(chan []byte) // 初始化WS响应协议
p.wsPush = make(chan []byte) // 初始化WS推送协议
p.wsConn = make(chan []byte) // 初始化WS连接成功协议
p.wsReq = make(chan []byte) // 初始化任何WS请求协议
}
internal/interaction 结构
❯ ls internal/interaction -al
total 100
drwxr-xr-x 2 root root 4096 May 4 17:32 .
drwxr-xr-x 19 root root 4096 Apr 28 06:30 ..
-rw-r--r-- 1 root root 1073 May 4 00:57 compressor.go
-rw-r--r-- 1 root root 678 May 4 00:57 constant.go
-rw-r--r-- 1 root root 574 May 4 00:57 context.go
-rw-r--r-- 1 root root 745 May 4 00:57 encoder.go
-rw-r--r-- 1 root root 14090 May 4 17:32 long_conn_mgr.go
-rw-r--r-- 1 root root 1550 May 4 00:57 long_connection.go
-rw-r--r-- 1 root root 3130 May 4 17:07 msg_sync.go
-rw-r--r-- 1 root root 8878 May 4 17:32 msg_sync2.go
-rw-r--r-- 1 root root 9472 May 4 17:32 msg_sync_self.go
-rw-r--r-- 1 root root 12857 May 4 08:13 msy_sync_read_diffusion_group.go
-rw-r--r-- 1 root root 2046 May 4 07:18 ws_default.go
-rw-r--r-- 1 root root 1637 May 4 00:57 ws_js.go
-rw-r--r-- 1 root root 3811 May 4 17:32 ws_resp_asyn.go
compressor.go
: 压缩工具context.go
: 处理上下文long_conn_mgr.go
: 长连接管理器msg_sync.go
: 消息同步msg_sync_self.go
: 消息同步(自身)ws_default.go
: WebSocket 的默认实现ws_resp_asyn.go
: 异步的 WebSocket 响应constant.go
: 常量encoder.go
: 编码器long_connection.go
: 长连接msg_sync2.go
: 消息同步msy_sync_read_diffusion_group.go
: 消息同步(读取扩散群组)ws_js.go
: WebSocket 的 JavaScript 实现
新协议
type SeqRange struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ConversationID string `protobuf:"bytes,1,opt,name=conversationID,proto3" json:"conversationID"`
Begin int64 `protobuf:"varint,2,opt,name=begin,proto3" json:"begin"`
End int64 `protobuf:"varint,3,opt,name=end,proto3" json:"end"`
IsNotification bool `protobuf:"varint,4,opt,name=isNotification,proto3" json:"isNotification"`
Num int64 `protobuf:"varint,5,opt,name=num,proto3" json:"num"`
}
向信息同步协程发送数据
// -- // 长连接协程向消息同步协程下发的数据:
// 1、推送消息,msg为msgData切片
func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.PushMessages, ch chan Cmd2Value) error {
if ch == nil {
return utils.Wrap(errors.New("ch == nil"), "")
}
c2v := Cmd2Value{Cmd: constant.CmdPushMsg, Value: msg, Ctx: ctx}
return sendCmd(ch, c2v, 100)
}
// 2、seq触发
func TriggerCmdMaxSeq(ctx context.Context,seq sdk_struct.CmdMaxSeqToMsgSync, ch chan Cmd2Value) error {
if ch == nil {
return utils.Wrap(errors.New("ch == nil"), "")
}
c2v := Cmd2Value{Cmd: constant.CmdMaxSeq, Value: seq,Ctx: ctx}
return sendCmd(ch, c2v, 100)
}
// 3、连接成功触发
func TriggerCmdConnected(ctx context.Context, ch chan Cmd2Value) error {
if ch == nil {
return utils.Wrap(errors.New("ch == nil"), "")
}
c2v := Cmd2Value{Cmd: constant.CmdConnSuccesss, Value: nil, Ctx: ctx}
return sendCmd(ch, c2v, 100)
}
const(
MsgSyncBegin = 1001 //
MsgSyncProcessing = 1002 //
MsgSyncEnd = 1003 //
MsgSyncFailed = 1004
)
type CmdNewMsgComeToConversation struct {
MsgList []*sdkws.MsgData
SyncFlag int
}
这个是ws协程丢给ch的三种类型的数据:
type CmdNewMsgComeToConversation struct {
MsgList []*sdkws.MsgData
SyncFlag int
}
这个是你丢给 conversationch
的
END 链接
✴️版权声明 © :本书所有内容遵循CC-BY-SA 3.0协议(署名-相同方式共享)©