第54节 OpenIM 重构确定协议


❤️💕💕记录sealosopen in new window开源项目的学习过程。k8s,docker和云原生的学习open in new window。Myblog:http://nsddd.topopen in new window


[TOC]

确定协议

客户端和服务端之间的协议 (1)心跳 (2)消息拉取 (3)推送 (4)消息发送

客户端内部的 (1)消息同步协程和会话协程之间

消息; 2.同步开始;3.同步结束; (2)ws协程和消息同步协程之间

心跳响应(最大seq);2.推送消息;3.连接成功 (3)任何发送ws请求协程和ws协程之间; 1.通过SendReqWaitResp封装

我们需要拿到数据:

(2)ws协程和消息同步协程之间 1.心跳响应(最大seq);2.推送消息;3.连接成功

先判断 cmd,然后做逻辑部分:

CmdMaxSeq       = "maxSeq"	//心跳响应(最大seq)
CmdPushMsg      = "pushMsg"	//推送信息
CmdConnSuccesss = "connSuccess"	//连接成功

第一个你会接受到推送过来的消息

第二就是在服务器定时的去获取到每一个群,或者是每一个会话里的所有的就是每一个绘画的最新的最大的也是最大的seq。这样我们这个模块就知道我们缺了哪些消息

第三个如果同年成功或者就连接成功,然后也有ws那个模块会丢一重庆成功的 Cmd给你,就是说你现在会接受三种类型的消息,像是那些通知在c群里面,这个有没有问题。

重连成功后就要通知你开始同步信息了。

结构体:

// The callback synchronization starts. The reconnection endsc
type MsgSyncer struct {
	loginUserID        string                // login user ID
	longConnMgr        *LongConnMgr          // long connection manager
	PushMsgAndMaxSeqCh chan common.Cmd2Value // channel for receiving push messages and the maximum SEQ number
	conversationCh     chan common.Cmd2Value // storage and session triggering
	syncedMaxSeqs      map[string]SyncedSeq  // map of the maximum synced SEQ numbers for all group IDs
	db                 db_interface.DataBase // data store
	syncTimes          int                   // times of sync
	ctx                context.Context       // context
	cancel             context.CancelFunc    // cancel function
}

conversationCh 是干嘛的?实际上这就是消息模块的一个切入,就是你刚才就是说会一些把某些数据给到这个东西就给到嵌入里面,它有哪些东西呢?这里面两种消息,就一种是消息,就是你同步的消息,刚才不是说同步一条或者同步10条是不是给到他,对吧?他来做存储,他自己要做存储和会话的触发。

所以说这里的 channel 收到的是三种信息:

  • 第一种是真正同步的消息
  • 第二种是同步开始、同步结束的一种通知,他要给客户端进行显示

loadSeq函数:

就是说你初始化的时候,实际上初始化的时候实际上你的就已经同步,但是你是不知道的,所以到时候DB那边可能要提供一些函数能告诉你,就是说比如说我有了我这个用户有哪有哪些会话,然后每个会话最大的一个是什么情况,我要在DB本地读出来寄到内存里面,可能是一个类似 Map的东西,在初始化的时候执行一次,这个时候就能确定 seqs

使用 switch:

case cmd := <-m.PushMsgAndMaxSegCh:
	m.handleRecvMsgAndSyncSeqs(cmd)

定义部分:

type Cmd2Value struct {
   Cmd   string
   Value interface{}
   Ctx   context.Context
}

value不一定是msgdata

func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.MsgData, ch chan Cmd2Value) error {
   if ch == nil {
      return utils.Wrap(errors.New("ch == nil"), "")
   }

   c2v := Cmd2Value{Cmd: constant.CmdPushMsg, Value: msg, Ctx: ctx}
   return sendCmd(ch, c2v, 100)
}

func TriggerCmdMaxSeq(seq sdk_struct.CmdMaxSeqToMsgSync, ch chan Cmd2Value) error {
   if ch == nil {
      return utils.Wrap(errors.New("ch == nil"), "")
   }
   c2v := Cmd2Value{Cmd: constant.CmdMaxSeq, Value: seq}
   return sendCmd(ch, c2v, 100)
}

func TriggerCmdConnected(ctx context.Context, ch chan Cmd2Value) error {
   if ch == nil {
      return utils.Wrap(errors.New("ch == nil"), "")
   }
   c2v := Cmd2Value{Cmd: constant.CmdConnSuccesss, Value: nil, Ctx: ctx}
   return sendCmd(ch, c2v, 100)
}

分别是这三个 Cmd2Value

调整:

  • ws *Ws 这个换掉
  • msgCache 可以删除
  • handleRecvMsgAndSyncSeqs要对cmd解析一下

案例

就拿我们的一个案例来说:

type Protocol struct {
    heartbeat  chan []byte   // 心跳
    pullMsg    chan []byte   // 消息拉取
    pushMsg    chan []byte   // 推送消息
    sendMsg    chan []byte   // 消息发送
    syncMsg    chan []byte   // 消息同步
    syncStart  chan []byte   // 同步开始
    syncFinish chan []byte   // 同步结束
    wsResp     chan []byte   // WS响应
    wsPush     chan []byte   // WS推送
    wsConn     chan []byte   // WS连接成功
    wsReq      chan []byte   // 任何发送WS请求
}

func (p *Protocol) init() {
    p.heartbeat = make(chan []byte)  // 初始化心跳协议
    p.pullMsg = make(chan []byte)    // 初始化消息拉取协议
    p.pushMsg = make(chan []byte)    // 初始化推送消息协议
    p.sendMsg = make(chan []byte)    // 初始化消息发送协议
    p.syncMsg = make(chan []byte)    // 初始化消息同步协议
    p.syncStart = make(chan []byte)  // 初始化同步开始协议
    p.syncFinish = make(chan []byte) // 初始化同步结束协议
    p.wsResp = make(chan []byte)     // 初始化WS响应协议
    p.wsPush = make(chan []byte)     // 初始化WS推送协议
    p.wsConn = make(chan []byte)     // 初始化WS连接成功协议
    p.wsReq = make(chan []byte)      // 初始化任何WS请求协议
}

internal/interaction 结构

ls internal/interaction -al
total 100
drwxr-xr-x  2 root root  4096 May  4 17:32 .
drwxr-xr-x 19 root root  4096 Apr 28 06:30 ..
-rw-r--r--  1 root root  1073 May  4 00:57 compressor.go
-rw-r--r--  1 root root   678 May  4 00:57 constant.go
-rw-r--r--  1 root root   574 May  4 00:57 context.go
-rw-r--r--  1 root root   745 May  4 00:57 encoder.go
-rw-r--r--  1 root root 14090 May  4 17:32 long_conn_mgr.go
-rw-r--r--  1 root root  1550 May  4 00:57 long_connection.go
-rw-r--r--  1 root root  3130 May  4 17:07 msg_sync.go
-rw-r--r--  1 root root  8878 May  4 17:32 msg_sync2.go
-rw-r--r--  1 root root  9472 May  4 17:32 msg_sync_self.go
-rw-r--r--  1 root root 12857 May  4 08:13 msy_sync_read_diffusion_group.go
-rw-r--r--  1 root root  2046 May  4 07:18 ws_default.go
-rw-r--r--  1 root root  1637 May  4 00:57 ws_js.go
-rw-r--r--  1 root root  3811 May  4 17:32 ws_resp_asyn.go
  • compressor.go: 压缩工具
  • context.go: 处理上下文
  • long_conn_mgr.go: 长连接管理器
  • msg_sync.go: 消息同步
  • msg_sync_self.go: 消息同步(自身)
  • ws_default.go: WebSocket 的默认实现
  • ws_resp_asyn.go: 异步的 WebSocket 响应
  • constant.go: 常量
  • encoder.go: 编码器
  • long_connection.go: 长连接
  • msg_sync2.go: 消息同步
  • msy_sync_read_diffusion_group.go: 消息同步(读取扩散群组)
  • ws_js.go: WebSocket 的 JavaScript 实现

新协议

type SeqRange struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   ConversationID string `protobuf:"bytes,1,opt,name=conversationID,proto3" json:"conversationID"`
   Begin          int64  `protobuf:"varint,2,opt,name=begin,proto3" json:"begin"`
   End            int64  `protobuf:"varint,3,opt,name=end,proto3" json:"end"`
   IsNotification bool   `protobuf:"varint,4,opt,name=isNotification,proto3" json:"isNotification"`
   Num            int64  `protobuf:"varint,5,opt,name=num,proto3" json:"num"`
}

向信息同步协程发送数据

// -- // 长连接协程向消息同步协程下发的数据:
// 1、推送消息,msg为msgData切片
func TriggerCmdPushMsg(ctx context.Context, msg *sdkws.PushMessages, ch chan Cmd2Value) error {
    if ch == nil {
        return utils.Wrap(errors.New("ch == nil"), "")
    }

    c2v := Cmd2Value{Cmd: constant.CmdPushMsg, Value: msg, Ctx: ctx}
    return sendCmd(ch, c2v, 100)
}

// 2、seq触发
func TriggerCmdMaxSeq(ctx context.Context,seq sdk_struct.CmdMaxSeqToMsgSync, ch chan Cmd2Value) error {
    if ch == nil {
        return utils.Wrap(errors.New("ch == nil"), "")
    }
    c2v := Cmd2Value{Cmd: constant.CmdMaxSeq, Value: seq,Ctx: ctx}
    return sendCmd(ch, c2v, 100)
}

// 3、连接成功触发
func TriggerCmdConnected(ctx context.Context, ch chan Cmd2Value) error {
    if ch == nil {
        return utils.Wrap(errors.New("ch == nil"), "")
    }
    c2v := Cmd2Value{Cmd: constant.CmdConnSuccesss, Value: nil, Ctx: ctx}
    return sendCmd(ch, c2v, 100)
}

const(
    MsgSyncBegin      = 1001 //
    MsgSyncProcessing = 1002 //
    MsgSyncEnd        = 1003 //
    MsgSyncFailed     = 1004
)
    
type CmdNewMsgComeToConversation struct {

    MsgList  []*sdkws.MsgData
    SyncFlag int
}

这个是ws协程丢给ch的三种类型的数据:

type CmdNewMsgComeToConversation struct {
    MsgList  []*sdkws.MsgData
    SyncFlag int
}

这个是你丢给 conversationch

END 链接