Golang Distributed Asynchronous Task Queue Machinery Tutorial

created at 07-28-2021 views: 43

There are not many distributed task queues in Golang. At present, Machinery should be the only one that is more mature.

In this article, we briefly look at how to use Machinery. But we first briefly introduce the concept of asynchronous tasks.

If you are familiar with the asynchronous task framework in Python, you must have heard of Celery. What is the asynchronous task framework? The main function of asynchronous tasks is to put the code that needs to be executed for a long time into a separate program, such as calling a third-party mail interface, but this interface may be very slow to respond, and you want to ensure that your API responds in a timely manner. At this time, asynchronous tasks can be used for decoupling.

Generally speaking, asynchronous tasks are composed of several parts:

-Broker: Broker is used to transmit information. We can imagine it as a "messenger" or a "takeaway delivery person". Its role is to temporarily save the tasks generated for consumption
-Producer: it is responsible for generating tasks
-Consumer: it is responsible for consumption tasks
-result backend: This is not necessary, but if there is a need to save the result, then it is needed.

process

The producer publishes a task -> broker -> the consumer competes for a task, and then consumes -> (optional: after consumption, confirm to the broker that it has been consumed, and then the broker deletes the task,
Otherwise, the task will be resent after timeout) -> result backend to save the result

Machinery

First, let's pull down the Machinery code:

$ go get -u github.com/RichardKnop/machinery/v1

Machinery’s definition of a message is:

// Signature represents a single task invocation
type Signature struct {
    UUID           string
    Name           string
    RoutingKey     string
    ETA            *time.Time
    GroupUUID      string
    GroupTaskCount int
    Args           []Arg
    Headers        Headers
    Immutable      bool
    RetryCount     int
    RetryTimeout   int
    OnSuccess      []*Signature
    OnError        []*Signature
    ChordCallback  *Signature
}

Just as you might use json to write your own task queue.

Generally, the producer first calls signature := tasks.NewSignature to define the task, and then machineryServer.SendTask completes the generation of the task.

The asynchronous task of Machinery looks like this:

func Add(args ...int64) (int64, error) {
  sum := int64(0)
  for _, arg := range args {
    sum += arg
  }
  return sum, nil
}

It should be noted that the last parameter of the function must be error. Then register the task like this.

server.RegisterTasks(map[string]interface{}{
  "add":      Add,
})

The consumer first calls worker := machineryServer.NewWorker("send_sms", 10) and then worker.Launch() starts to listen to the broker and consume tasks. When you generate a task with the name add, this function will be called.

Generally, you can put the producer and the consumer in two files, define the main function separately, and then write your own Makefile, so that you can directly make and then generate two executable files, but I personally prefer to use the flag to identify it. What identity:

func main() {
    // parse cmd args
    flag.Parse()

    // init config
    initConfig()

    // init machinery worker
    initMachinery()

    // register tasks
    machineryServer.RegisterTask("sendSMS", sendSMS)

    if *worker {
        startWorker()
    } else {
        startWebServer()
    }
}
created at:07-28-2021
edited at: 07-28-2021: