There are not many distributed task queues in Golang. At present, Machinery should be the only one that is more mature.
In this article, we briefly look at how to use Machinery. But we first briefly introduce the concept of asynchronous tasks.
If you are familiar with the asynchronous task framework in Python, you must have heard of Celery. What is the asynchronous task framework? The main function of asynchronous tasks is to put the code that needs to be executed for a long time into a separate program, such as calling a third-party mail interface, but this interface may be very slow to respond, and you want to ensure that your API responds in a timely manner. At this time, asynchronous tasks can be used for decoupling.
Generally speaking, asynchronous tasks are composed of several parts:
-Broker: Broker is used to transmit information. We can imagine it as a "messenger" or a "takeaway delivery person". Its role is to temporarily save the tasks generated for consumption
-Producer: it is responsible for generating tasks
-Consumer: it is responsible for consumption tasks
-result backend: This is not necessary, but if there is a need to save the result, then it is needed.
process
The producer publishes a task -> broker -> the consumer competes for a task, and then consumes -> (optional: after consumption, confirm to the broker that it has been consumed, and then the broker deletes the task,
Otherwise, the task will be resent after timeout) -> result backend to save the result
First, let's pull down the Machinery code:
$ go get -u github.com/RichardKnop/machinery/v1
Machinery’s definition of a message is:
// Signature represents a single task invocation
type Signature struct {
UUID string
Name string
RoutingKey string
ETA *time.Time
GroupUUID string
GroupTaskCount int
Args []Arg
Headers Headers
Immutable bool
RetryCount int
RetryTimeout int
OnSuccess []*Signature
OnError []*Signature
ChordCallback *Signature
}
Just as you might use json to write your own task queue.
Generally, the producer first calls signature := tasks.NewSignature to define the task, and then machineryServer.SendTask completes the generation of the task.
The asynchronous task of Machinery looks like this:
func Add(args ...int64) (int64, error) {
sum := int64(0)
for _, arg := range args {
sum += arg
}
return sum, nil
}
It should be noted that the last parameter of the function must be error. Then register the task like this.
server.RegisterTasks(map[string]interface{}{
"add": Add,
})
The consumer first calls worker := machineryServer.NewWorker("send_sms", 10)
and then worker.Launch()
starts to listen to the broker and consume tasks. When you generate a task with the name add
, this function will be called.
Generally, you can put the producer and the consumer in two files, define the main function separately, and then write your own Makefile, so that you can directly make and then generate two executable files, but I personally prefer to use the flag to identify it. What identity:
func main() {
// parse cmd args
flag.Parse()
// init config
initConfig()
// init machinery worker
initMachinery()
// register tasks
machineryServer.RegisterTask("sendSMS", sendSMS)
if *worker {
startWorker()
} else {
startWebServer()
}
}