Go实现简单的Socket服务端笔记(八)

session中Read方法实现粘包拆包处理

定义简单协议,数据包头由4字节构成:
第1位固定为’$’
第2-3位为Body长度(uint16)
第4位固定为’#’
接收数据时若第1位和第4位不正确则认为接收到异常数据,同时关闭socket连接

本文代码查看github:
https://github.com/zboyco/go-server/tree/step-8


为了实现粘包拆包处理,我们自己实现一个buffer类来管理数据
在server目录中增加buffer.go,这里主要参考了https://studygolang.com/articles/12088

完整代码如下

buffer.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package server

import (
"errors"
"io"
)

type buffer struct {
reader io.Reader //
buf []byte //缓存
start int //有效位开始索引
end int //有效位结束索引
}

func newBuffer(reader io.Reader, len int) *buffer {
buf := make([]byte, len)
return &buffer{reader, buf, 0, 0}
}

func (instance *buffer) len() int {
return instance.end - instance.start
}

//cleanBuf 清理缓存中已提取的数据
func (instance *buffer) cleanBuf() {
if instance.start == 0 {
return
}
copy(instance.buf, instance.buf[instance.start:instance.end])
instance.end -= instance.start
instance.start = 0
}

//read 接收数据
func (instance *buffer) read() (int, error) {
instance.cleanBuf()
n, err := instance.reader.Read(instance.buf[instance.end:])
if err != nil {
return n, err
}
instance.end += n
return n, nil
}

//peek 查看指定长度字节数据
func (instance *buffer) peek(len int) ([]byte, error) {
if instance.len() < len {
return nil, errors.New("可读取长度不够")
}
result := instance.buf[instance.start : instance.start+len]
return result, nil
}

//pick 提取指定长度字节数据
func (instance *buffer) pick(offset int, len int) ([]byte, error) {
result, err := instance.peek(offset + len)
if err != nil {
return nil, err
}
instance.start += (offset + len)
return result[offset:], nil
}

session.go 也需要修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package server

import (
"encoding/binary"
"errors"
"net"
"time"
)

const (
//HeaderLen 数据包头长度
HeaderLen int = 4
//HeaderStartByte 数据包头部起始码
HeaderStartByte byte = '$'
//HeaderEndByte 数据包头部结束码
HeaderEndByte byte = '#'
)

//AppSession 客户端结构体
type AppSession struct {
ID int64 //连接唯一标识
conn net.Conn //socket连接
activeDateTime time.Time //最后活跃时间
buffer *buffer //数据流
}

//Send 发送数据
func (session *AppSession) Send(buf []byte) {
session.conn.Write(buf)
//更新最后活跃时间
session.activeDateTime = time.Now()
}

//Read 读取数据
//每次读取必然返回一个完整数据包或者错误信息
func (session *AppSession) Read() ([]byte, error) {
//判断是否需要读取数据
needRead := session.buffer.len() < HeaderLen
for {
if needRead {
_, err := session.buffer.read()
if err != nil {
return nil, err
}
}
//查看前4个数据包头数据
headBuf, err := session.buffer.peek(HeaderLen)

if err != nil {
needRead = true
continue
}

//判断1和4位是否为指定的起始码和结束码
if headBuf[0] != HeaderStartByte || headBuf[3] != HeaderEndByte {
return nil, errors.New("接收到异常数据")
}

//计算数据包内容长度
bodyLen := int(binary.BigEndian.Uint16(headBuf[1:3]))

//提取数据内容
bodyBuf, err := session.buffer.pick(HeaderLen, bodyLen)

if err != nil {
needRead = true
continue
}

//更新最后活跃时间
session.activeDateTime = time.Now()
return bodyBuf, nil
}
}