Cover

Go Message Queue 的简易实现

· Technology

前言

消息队列,即 Message Queue,是我们在构建 Gin 等持久化 Golang 应用程序的常用组件。

消息队列是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

实际上,消息队列常常保存在链表结构中。拥有权限的进程可以向消息队列中写入或读取消息。

目前,有很多消息队列有很多开源的实现,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、RabbitMQ、IBM MQ、Apache Qpid、Apache RocketMQ和HTTPSQS。

消息队列在实际应用中包括如下四个场景:

  • 应用耦合:多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败
  • 异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间
  • 限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况
  • 消息驱动的系统:系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者(可能有多个)负责对消息进行处理

通常对于微服务架构的 Message Queue 都是使用更为专业的 MQ 框架。但对于我们个人构建的小型应用程序,部署一个大型的微服务 MQ 框架是毫无必要的,对于大多数人来说这也是难以学习的。

所以,在没有较为严苛的可用性要求和业务承载量的情况,我们可以基于内存构建一个仅在 Go 持久化应用程序内部运行的 Message Queue 组件。

实现

Golang 提供了 Channel、 Goroutine 和 Context 等内置特性用于并发程序的辅助,这让我们可以免于自行构建线程池等较为繁杂的应用程序架构。 在本文中,我们基于 Channel 实现 Message Queue。

简易线程池 & 基本类型

在构建消息队列组件之前,我们要了解消息队列的主要构成。 发布/订阅模式下的消息队列组件包括三个角色:

  • 角色主题 Topic
  • 发布者 Publisher
  • 订阅者 Subscriber
Image

发布者将消息发送到 Topic,系统将这些消息传递给多个订阅者。 同时,发布/订阅模式的消息队列具有如下特性:

  • 每个消息可以有多个订阅者
  • 发布者和订阅者之间有时间上的依赖性。针对某个 Topic 的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
  • 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行

根据上述内容,我们可以大体建立消息队列的框架

// mq/mq.go

// CallbackFunc 用于定义消息处理函数
type CallbackFunc func(Message)

// Pool 用于定义消息队列的接口,所有的消息队列都需要实现这个接口
type Pool interface {
  Publish(topic string, message Message)                 // Publish 发布消息
  Subscribe(topic string, buffer int) <-chan Message     // Subscribe 订阅消息
  SubscribeCallback(topic string, callback CallbackFunc) // SubscribeCallback 订阅函数消息
  Unsubscribe(topic string, channel <-chan Message)      // Unsubscribe 取消订阅
}

// Message 对应单一消息的结构体
type Message struct {
  Topic   string      // Topic 用于标识消息的对应的主题,消费者可以通过 Topic 过滤消息
  Context interface{} // Context 用于存放消息的内容
}

func init() {
  gob.Register(&Message{}) 
}

如上述代码,我们定义了传入消息的结构体 Message 和消息队列的处理池 Pool

其中 Pool 具有四个要求实现的方法:

  • Publish(topic string, message Message) 发布消息,传入 Topic 标识和消息内容,系统会将其传给所有订阅者。
  • Subscribe(topic string, buffer int) <-chan Message 订阅消息,通过传入所需消息的 Topic 和所需消息的多少,得到一个 Go Channel。后续可以通过 Channel 获取信息。
  • SubscribeCallback(topic string, callback CallbackFunc) 注册回调函数。通过传入消息的 Topic 为其注册回调函数,通常适用于较为单一的队列任务(例如 开卡、邮件发送等)。
  • Unsubscribe(topic string, channel <-chan Message) 通过传入 Topic 和之前通过 Subscribe 获取的 Channel 取消注册,避免不必要的消息推送。

内存实现

在文中,我们通过内存进行处理池的实现。

// mq/memory.go

// MemoryMessageQueue 内存实现的处理池本体
type MemoryMessageQueue struct {
  topics    map[string][]chan Message // topics 用于存放所有的 channel 订阅者
  callbacks map[string][]CallbackFunc // callbacks 用于存放所有的 callback function 订阅者
  sync.RWMutex
}

var _ Pool = &MemoryMessageQueue{}

func NewMemoryMQ() Pool {
  return &MemoryMessageQueue{
    topics:    make(map[string][]chan Message),
    callbacks: make(map[string][]CallbackFunc),
  }
}

// Publish 发布消息
func (m *MemoryMessageQueue) Publish(topic string, message Message) {
  // 锁定全局队列,获取所有的订阅者
  m.RLock()
  subscribersChan, okChan := m.topics[topic]
  subscribersCallback, okCallback := m.callbacks[topic]
  m.RUnlock()
  // 这是为了避免在发布消息的时候对订阅者进行修改,从而导致订阅者的遍历出现问题
  
  if okChan {
    go func(subscribersChan []chan Message) {
      for i := 0; i < len(subscribersChan); i++ {
        select {
        case subscribersChan[i] <- message:
        case <-time.After(time.Second):
        }
      }
    }(subscribersChan)
  }
  
  if okCallback {
    for i := 0; i < len(subscribersCallback); i++ {
      go subscribersCallback[i](message)
    }
  }
}

// Subscribe 订阅消息,通过 channel 异步返回。buffer 指定 Message 的数量
func (m *MemoryMessageQueue) Subscribe(topic string, buffer int) <-chan Message {
  ch := make(chan Message, buffer)
  m.Lock()
  m.topics[topic] = append(m.topics[topic], ch)
  m.Unlock()
  return ch
}

// SubscribeCallback 订阅函数消息,通过 callback function 异步执行。
func (m *MemoryMessageQueue) SubscribeCallback(topic string, callback CallbackFunc) {
  m.Lock()
  m.callbacks[topic] = append(m.callbacks[topic], callback)
  m.Unlock()
}

// Unsubscribe 取消订阅 channel 消息,sub 传入 Subscribe 返回的 channel,用于辨识取消哪个订阅者
func (m *MemoryMessageQueue) Unsubscribe(topic string, sub <-chan Message) {
  m.Lock()
  defer m.Unlock()
  
  subscribers, ok := m.topics[topic]
  if !ok {
    return
  }
  
  var newSubs []chan Message
  for _, subscriber := range subscribers {
    if subscriber == sub {
      continue
    }
    newSubs = append(newSubs, subscriber)
  }
  
  m.topics[topic] = newSubs
}

观测代码可知,我们通过内存(全局变量)和 Goroutine 实现消息的通知、订阅者的存储和异步处理。

Publish 中,我们通过获取所有当前 Topic 对应的 Channel 和 Callback Function 并逐个异步传入消息来完成消息通知的任务。

SubscribeSubscribeCallback 中,我们通过 Go builtin 提供的 sync.RWMutex 实现一个简易的线程锁,避免在操作订阅时触发消息通知从而导致漏收消息等。

全局处理池

我们可以定义对外暴露的全局变量,使外部应用统一操作此接口,以此避免程序内运行着过多的处理池导致内存占用增大。

// mq/global.go

var Global Pool

func Init() {
  Global = NewMemoryMQ()
}

当然,仅局限于此是不行的,我们还未正式对其进行初始化。

// main.go

package main

import "project/mq"

func init() {
  mq.Init()
}

func main() {
}

特性

该简易消息队列组件自然也是具有其对应的优缺点。

由于程序的单一性,此组件难以做到跨程序异步操作等,使用范围仅限当前应用程序内。但相应的,我们也无需配置繁杂的订阅接口,同时可以避免不必要的大型消息队列应用。

我个人对此组件的应用也仅局限于邮件通知队列等任务队列和异步统计接口、异步开卡接口等异步接口。如有不足欢迎指出!

Comments

Send Comments

Markdown supported. Please keep comments clean.