AHdark

AHdark Blog

Senior high school student with a deep passion for coding. Driven by a love for problem-solving, I’m diving into algorithms while honing my skills in TypeScript, Rust, and Golang.
telegram
tg_channel
twitter
github

Simple Implementation of Go Message Queue

Introduction#

Message Queue is a commonly used component when building persistent Golang applications like Gin.

A message queue is a method of inter-process communication or communication between different threads of the same process, where the software queue is used to handle a series of inputs, usually from users. Message queues provide an asynchronous communication protocol, where each record in the queue contains detailed information, including the time of occurrence, the type of input device, and specific input parameters. In other words, the sender and receiver of the message do not need to interact with the message queue simultaneously. Messages are stored in the queue until the receiver retrieves them.

In practice, message queues are often stored in a linked list structure. Authorized processes can write to or read messages from the message queue.

Currently, there are many open-source implementations of message queues, including JBoss Messaging, JORAM, Apache ActiveMQ, Sun Open Message Queue, RabbitMQ, IBM MQ, Apache Qpid, Apache RocketMQ, and HTTPSQS.

In practical applications, message queues include the following four scenarios:

  • Application coupling: Multiple applications process the same message through the message queue, avoiding the failure of the entire process due to interface call failures.
  • Asynchronous processing: Multiple applications process the same message in the message queue concurrently, reducing processing time compared to serial processing.
  • Rate limiting and peak shaving: Widely used in flash sales or purchasing activities to avoid application system crashes due to excessive traffic.
  • Message-driven systems: The system is divided into message queues, message producers, and message consumers, where producers are responsible for generating messages, and consumers (which can be multiple) are responsible for processing messages.

Typically, for microservice architecture, a more specialized MQ framework is used for Message Queue. However, for small applications built by individuals, deploying a large microservice MQ framework is unnecessary and difficult for most people to learn.

Therefore, in the absence of stringent availability requirements and business load, we can build a Message Queue component based on memory that runs only within the Go persistent application.

Implementation#

Golang provides built-in features such as Channel, Goroutine, and Context to assist with concurrent programming, allowing us to avoid building complex application architectures like thread pools ourselves. In this article, we implement Message Queue based on Channel.

Simple Thread Pool & Basic Types#

Before building the message queue component, we need to understand the main components of a message queue. The message queue component in the publish/subscribe model includes three roles:

  • Role Topic
  • Publisher
  • Subscriber

image

The publisher sends messages to the Topic, and the system delivers these messages to multiple subscribers. At the same time, the message queue in the publish/subscribe model has the following characteristics:

  • Each message can have multiple subscribers.
  • There is a temporal dependency between publishers and subscribers. For a subscriber to a specific Topic, it must create a subscriber before it can consume messages from the publisher.
  • To consume messages, subscribers need to subscribe to the role topic in advance and remain online.

Based on the above content, we can roughly establish the framework of the message queue.

// mq/mq.go

// CallbackFunc defines the message handling function
type CallbackFunc func(Message)

// Pool defines the interface for the message queue, which all message queues need to implement
type Pool interface {
  Publish(topic string, message Message)                 // Publish publishes a message
  Subscribe(topic string, buffer int) <-chan Message     // Subscribe subscribes to messages
  SubscribeCallback(topic string, callback CallbackFunc) // SubscribeCallback subscribes to function messages
  Unsubscribe(topic string, channel <-chan Message)      // Unsubscribe cancels the subscription
}

// Message corresponds to the structure of a single message
type Message struct {
  Topic   string      // Topic identifies the corresponding topic of the message, allowing consumers to filter messages by Topic
  Context interface{} // Context stores the content of the message
}

func init() {
  gob.Register(&Message{})
}

As shown in the code above, we define the structure Message for incoming messages and the processing pool Pool for the message queue.

The Pool has four required methods to implement:

  • Publish(topic string, message Message) publishes a message, passing in the Topic identifier and message content, and the system will deliver it to all subscribers.
  • Subscribe(topic string, buffer int) <-chan Message subscribes to messages, obtaining a Go Channel by passing in the required Topic and the number of messages needed. Subsequent information can be obtained through the Channel.
  • SubscribeCallback(topic string, callback CallbackFunc) registers a callback function. By passing in the Topic of the message, a callback function is registered for it, usually suitable for more singular queue tasks (e.g., card opening, email sending, etc.).
  • Unsubscribe(topic string, channel <-chan Message) cancels the registration by passing in the Topic and the Channel obtained from Subscribe, avoiding unnecessary message pushes.

Memory Implementation#

In this article, we implement the processing pool using memory.

// mq/memory.go

// MemoryMessageQueue is the main body of the memory implementation of the processing pool
type MemoryMessageQueue struct {
  topics    map[string][]chan Message // topics stores all channel subscribers
  callbacks map[string][]CallbackFunc // callbacks stores all callback function subscribers
  sync.RWMutex
}

var _ Pool = &MemoryMessageQueue{}

func NewMemoryMQ() Pool {
  return &MemoryMessageQueue{
    topics:    make(map[string][]chan Message),
    callbacks: make(map[string][]CallbackFunc),
  }
}

// Publish publishes a message
func (m *MemoryMessageQueue) Publish(topic string, message Message) {
  // Lock the global queue to get all subscribers
  m.RLock()
  subscribersChan, okChan := m.topics[topic]
  subscribersCallback, okCallback := m.callbacks[topic]
  m.RUnlock()
  // This is to avoid modifying subscribers while publishing messages, which could lead to issues during subscriber traversal.

  if okChan {
    go func(subscribersChan []chan Message) {
      for i := 0; i < len(subscribersChan); i++ {
        select {
        case subscribersChan[i] <- message:
        case <-time.After(time.Second):
        }
      }
    }(subscribersChan)
  }

  if okCallback {
    for i := 0; i < len(subscribersCallback); i++ {
      go subscribersCallback[i](message)
    }
  }
}

// Subscribe subscribes to messages and returns asynchronously through the channel. Buffer specifies the number of Messages.
func (m *MemoryMessageQueue) Subscribe(topic string, buffer int) <-chan Message {
  ch := make(chan Message, buffer)
  m.Lock()
  m.topics[topic] = append(m.topics[topic], ch)
  m.Unlock()
  return ch
}

// SubscribeCallback subscribes to function messages and executes asynchronously through the callback function.
func (m *MemoryMessageQueue) SubscribeCallback(topic string, callback CallbackFunc) {
  m.Lock()
  m.callbacks[topic] = append(m.callbacks[topic], callback)
  m.Unlock()
}

// Unsubscribe cancels the subscription of channel messages, sub is the channel returned from Subscribe, used to identify which subscriber to cancel.
func (m *MemoryMessageQueue) Unsubscribe(topic string, sub <-chan Message) {
  m.Lock()
  defer m.Unlock()

  subscribers, ok := m.topics[topic]
  if !ok {
    return
  }

  var newSubs []chan Message
  for _, subscriber := range subscribers {
    if subscriber == sub {
      continue
    }
    newSubs = append(newSubs, subscriber)
  }

  m.topics[topic] = newSubs
}

From the code, we can see that we implement message notification, subscriber storage, and asynchronous processing through memory (global variables) and Goroutines.

In Publish, we complete the task of message notification by obtaining all current Channels and Callback Functions corresponding to the Topic and asynchronously passing messages to each.

In Subscribe and SubscribeCallback, we implement a simple thread lock using Go's built-in sync.RWMutex to avoid triggering message notifications during subscription operations, which could lead to missed messages.

Global Processing Pool#

We can define a global variable exposed to the outside, allowing external applications to uniformly operate this interface, thus avoiding excessive processing pools running in the program that lead to increased memory usage.

// mq/global.go

var Global Pool

func Init() {
  Global = NewMemoryMQ()
}

Of course, limiting ourselves to this is not enough; we have not formally initialized it.

// main.go

package main

import "project/mq"

func init() {
  mq.Init()
}

func main() {
}

Features#

This simple message queue component naturally has its corresponding advantages and disadvantages.

Due to the singularity of the program, this component is difficult to achieve cross-program asynchronous operations, and its usage is limited to the current application. However, correspondingly, we do not need to configure complex subscription interfaces, and we can avoid unnecessary large message queue applications.

My personal application of this component is also limited to email notification queues and other task queues, asynchronous statistics interfaces, asynchronous card opening interfaces, and other asynchronous interfaces. Any shortcomings are welcome to be pointed out!

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.