Go 实现一个分组排队取号叫号的逻辑

本文github代码库:https://github.com/zboyco/queuegroup


业务场景(这个包产生的原因)

  1. 在我们的项目中,由同一个客服端发送到服务器的请求,必须按照请求的顺序进行处理(数据只存在服务端,下一个请求的基础数据就是上一个请求运算完成后的结果);
  2. 无法保证客服端请求一定严格收到返回后才发送下个请求,偶尔客户端会短时间内发送多个请求,如果直接并发处理会导致数据错误或丢失(多个请求使用旧数据进行运算)。

思路

  1. 由于请求需要的基础数据依赖于上一次请求的结果,自然想到需要将请求串型处理,说到串型处理,那就是队列了,由于我们只需要对同一客户端发送的请求进行串型处理,所以就需要分组队列,每个组单独一个队列即可。
  2. 在我们的需求中,主要是每个请求的基础数据依赖于上一个请求,那么只需要在当前请求运算结束以前阻塞后面的请求获取基础数据即可,也就是在当前请求返回前,独占这个队列,不让后面的请求使用。

基于上面两点,我联想到了车站排队购票或者医院排队挂号等线下排队的情形,基本和我们的需求相同,再结合银行的取号排队方式,实现一个分组队列方法:

  • 1个请求 >>>>>> 一个需要办理业务的人
  • 请求需要的基础数据 >>>>>> 柜台自助机
  • 请求入队 >>>>>> 排队取号
  • 独占基础数据 >>>>>> 霸占自助机
  • 请求离队 >>>>>> 释放自助机

逻辑步骤如下:

  1. 进入窗口(一个组)取号(窗口不存在则新开窗口)
  2. 等待该窗口叫号
  3. 被叫到号的人(一个请求)开始办理业务
  4. 办理完成后(或超时)离开该窗口(组)
  5. 继续叫号
  6. 若该窗口一段时间(闲置时间)没有人取号办理业务,关闭该窗口以节约资源(也可以不关闭)

需要暴露的方法:

  1. GetQueue 获取队列
    这个方法需要返回一个指定id的队列,如果这个队列是为空则需要新建这个队列,这个操作需要保证一致性,在多协程下可能造成数据丢失,所以采用mutex来保证数据安全。
  2. QueueUp 排队取号
    从指定队列中取一个号(我称为票据ticket),我们需要一个队列来保存所以客户取到的ticket,在go语言中,有缓冲的chan通道就可以实现队列的功能,先进先出。
  3. Wait 等待叫号
    该方法需要阻塞当前协程,当队列叫到这个号的时候放通该协程,chan通道正好满足这个需求,于是我采用chan bool来当作这个ticket,也就是说在QueueUp方法中取到的号就是一个chan bool
  4. Leave 离队
    离队方法就是需要告诉队列,当前协程逻辑已经完成,离开队列,可以放通下一个协程了。

实现

1.首先建立“票据”类型,使用chan bool通道实现:

1
2
// Ticket 排队票据
type Ticket chan bool

Ticket需要实现两个方法,就是上面说到的等待叫号和离队两个方法,基于通道的特性,方法实现很简单:

1
2
3
4
5
6
7
8
9
// Wait 等待叫号
func (t *Ticket) Wait() {
<-(*t)
}

// Leave 离开队伍
func (t *Ticket) Leave() {
(*t) <- true
}

等待叫号的方法很好理解,就是等待一个信号,因为从通道里读取数据会阻塞当前协程,所以在没收到信号之前,调用当前票据wait方法的协程将被阻塞,直到收到信号。
离开队伍的方法则是往当前ticket写入一个信号,队列收到离队信号后则会继续呼叫下一个ticket(这个过程有队列管理员完成,将在后面介绍)。

2.建立队列模型
队列模型需要记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Queue 队列
type Queue struct {
groupID int64 // 组ID(窗口ID)
ticketChan chan Ticket // 排号队列
currentTicket Ticket // 当前办理号
status bool // 队列状态(true:启用;false:关闭)
}

func (q *Queue) callTicket() {
q.currentTicket <- true
}

func (q *Queue) closeTicket() {
close(q.currentTicket)
q.currentTicket = nil
}