session中Read方法实现粘包拆包处理
定义简单协议,数据包头由4字节构成:
第1位固定为’$’
第2-3位为Body长度(uint16)
第4位固定为’#’
接收数据时若第1位和第4位不正确则认为接收到异常数据,同时关闭socket连接
本文代码查看github:
https://github.com/zboyco/go-server/tree/step-8
为了实现粘包拆包处理,我们自己实现一个buffer类来管理数据
在server目录中增加buffer.go,这里主要参考了https://studygolang.com/articles/12088
完整代码如下
buffer.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package server
import ( "errors" "io" )
type buffer struct { reader io.Reader buf []byte start int end int }
func newBuffer(reader io.Reader, len int) *buffer { buf := make([]byte, len) return &buffer{reader, buf, 0, 0} }
func (instance *buffer) len() int { return instance.end - instance.start }
func (instance *buffer) cleanBuf() { if instance.start == 0 { return } copy(instance.buf, instance.buf[instance.start:instance.end]) instance.end -= instance.start instance.start = 0 }
func (instance *buffer) read() (int, error) { instance.cleanBuf() n, err := instance.reader.Read(instance.buf[instance.end:]) if err != nil { return n, err } instance.end += n return n, nil }
func (instance *buffer) peek(len int) ([]byte, error) { if instance.len() < len { return nil, errors.New("可读取长度不够") } result := instance.buf[instance.start : instance.start+len] return result, nil }
func (instance *buffer) pick(offset int, len int) ([]byte, error) { result, err := instance.peek(offset + len) if err != nil { return nil, err } instance.start += (offset + len) return result[offset:], nil }
|
session.go 也需要修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package server
import ( "encoding/binary" "errors" "net" "time" )
const ( HeaderLen int = 4 HeaderStartByte byte = '$' HeaderEndByte byte = '#' )
type AppSession struct { ID int64 conn net.Conn activeDateTime time.Time buffer *buffer }
func (session *AppSession) Send(buf []byte) { session.conn.Write(buf) session.activeDateTime = time.Now() }
func (session *AppSession) Read() ([]byte, error) { needRead := session.buffer.len() < HeaderLen for { if needRead { _, err := session.buffer.read() if err != nil { return nil, err } } headBuf, err := session.buffer.peek(HeaderLen)
if err != nil { needRead = true continue }
if headBuf[0] != HeaderStartByte || headBuf[3] != HeaderEndByte { return nil, errors.New("接收到异常数据") }
bodyLen := int(binary.BigEndian.Uint16(headBuf[1:3]))
bodyBuf, err := session.buffer.pick(HeaderLen, bodyLen)
if err != nil { needRead = true continue }
session.activeDateTime = time.Now() return bodyBuf, nil } }
|