How China’s Biggest Logistics Platform Handles Task Dispatch

With millions of packages to be delivered each day, Cainiao needs a powerful engine to streamline operations

Image for post
Image for post

In China, more than 100 million packages are delivered every day. Cainiao, a logistics platform affiliated to Alibaba Group, handles a significant portion of those packages. Cainiao faces daunting challenges in ensuring timely deliveries and keeping customers satisfied when things fall behind schedule.

In response to this challenge, the Alibaba tech team developed a lightweight scheduled task dispatch engine, capable of dealing with the sheer scale of Cainiao’s delivery market. This system works through the real-time monitoring of each link in the logistics chain, and by removing dependence on fixed hard disk storage.

What is the standard solution to task dispatch?

The standard solution to task dispatch is to write scheduled tasks into a relational database, use a thread to periodically enquire into which tasks are approaching expiry, query expiring tasks periodically through a thread, and then execute the task according to the logic.

This solution is easy to implement and works well for scenarios where a stand-alone machine or a small amount of traffic is involved. However, attempting to use this approach in a distributed and traffic-intensive scenario creates a lot of complexity and challenges:

1. It is necessary to design a load balancing strategy for the cluster task schedule.

2. Even if requirement 1 is met, the timing of scheduled tasks in some scenarios may be concentrated at a certain point in time, resulting in excessive pressure on a single node of the cluster.

3. A reasonable estimation of capacity is required to prevent subsequent linear storage extension from becoming too complicated.

The main restriction of the above solution is the coupling of task timing and task storage. Decoupling time from storage renders task storage effectively stateless, greatly increasing the range of storage options available and significantly reducing storage constraints.

How can the time wheel decouple task timing from the task itself?

The time wheel solution takes the familiar concept of a clock and adapts it to software design. The main idea is to define a clock cycle (such as 12 hours) and a step size which can be related to each time the clock hand moves (such as a one-second). When the hand moves to a new clock hand position, the system receives those tasks which are mounted on that clock hand position and executes them.

The below figure illustrates how the time wheel algorithm works.

Image for post
Image for post
Time wheel

How can time wheel scheduled storage tasks help the solution?

Previous solutions from the Alibaba tech team check whether messages have expired on a periodic sequential query basis. Such solutions are comparatively constrained regarding discrete time. This means that previous versions can only support a few scheduled message delay levels.

In response to this problem, the Alibaba tech team created a new time wheel and linked list-based solution that can do the following:

· Support discrete scheduled messages

· Address the complexity of managing individual task lists for each clock hand position on the time wheel

The following figure illustrates the solution’s key points.

Alibaba MQ scheduled message solution

The central idea behind this solution is to write the task list to the disk and use the linked list on the disk to chain the task list. To achieve this chain effect, each task needs a file offset which points to the next task. In this way the entire task linked list can be obtained if the header of the linked list is obtained.

With this solution, the entire task list does not need to be stored for each clock hand position. For each clock hand position, only the header of the linked list needs to be stored. This reduces demand on memory.

This solution is acceptable for a system with positioning similar to middleware, but for a system that is positioned in ordinary applications, the solution’s deployment requirements are too high because a fixed hard disk is needed.

With the current containerization and dynamic capacity management trends, ordinary applications need to rely on fixed hard disks, which adds complexity to system operation and maintenance. The lightweight scheduled task dispatch engine can help address this issue.

What makes the lightweight scheduled task dispatch engine so useful?

Like the time wheel and linked list-based solution, the lightweight scheduled task dispatch engine is also based on the core concept of the time wheel.

The lightweight scheduled task dispatch engine also removes the system’s dependence on disk storage. Since the engine doesn’t use disk storage, it must use specialized storage services (such as Hbase, Redis, and so on). The underlying storage is therefore replaced with a storage structure that is compatible with storage services.

Image for post
Image for post
Adjusted time wheel and linked list solution

The task is designed as a structured table, and for each task the offset is replaced with a task ID (the above figure “Alibaba MQ scheduled message solution” shows the system that uses the file offset). The ID is used to chain each task on the linked list. Only the linked list header ID is associated with the time wheel. The purpose of the solution implemented here is simply replacing the file offset with an ID, so as to remove disk dependence.

These stages of the solution address some issues, but there are still some problems to be considered.

Problem 1: Parallel extraction is not possible for the single linked list, and this affects extraction efficiency. When many tasks are scheduled at a certain time, the scheduled task processing delay is severe.

Problem 2: The task is not stored on the local disk, the entire cluster’s scheduled tasks are in concentrated storage, and each node in the cluster has its own time wheel.

How can task linked list partitioning accelerate extraction of single linked lists?

The advantage of linked lists is that they remove the need to store the entire task list in memory. Only needing to store a simple ID reduces memory consumption but creates another issue.

Linked lists are efficient from a writing perspective, but less efficient from a reading perspective. If you need to mount a long task linked list at a certain point in time, you cannot use concurrent methods to improve reading efficiency.

How does partitioning improve task queue extraction efficiency for a certain point in time?

Partitioning can be used to split a single linked list from a certain point in time into multiple linked lists. This means that when tasks for that point in time are extracted, they can be processed in parallel according to the size of the linked list collection. This accelerates overall task extraction speed.

The following figure shows the solution after adjusting in accordance with partitioning principles.

Image for post
Image for post
Design after partitioning

How can data be recovered after cluster deployment and node restart?

To recover data after cluster deployment and node restart, three related challenges need to be addressed:

1. How can “time wheel metadata” be recovered when it is not written to disk?

“Time wheel metadata” is information such as the time wheel’s clock hand positions before restart, and the scheduled task linked list data on the time wheel’s clock hand positions since restart. This information is essential for data recovery to be possible.

Scheduled task storage problems can be solved using the adjusted time wheel and linked list and design after partitioning solutions detailed above.

Specialized storage services can also be used to address the metadata storage problem. However this does not solve the problem fully and leads to the next challenge.

2. How can the cluster node obtain its own metadata?

Since each cluster node is stateless, it does not know how to read its own metadata (that is, the query conditions) from the storage service when it started.

Metadata can sometimes be associated with the node IP or MAC address. Under current dynamic capacity scheduling conditions, though, it is impractical for a machine to have a fixed IP, as it cannot be guaranteed which application the machine will run on. Since the physical environment cannot be relied on, only the logical environment can be relied on to distinguish the time wheel of each node.

Therefore, each node in the cluster is assigned a unique logical ID, so that each machine obtains time wheel data through their own ID.

3. How are logical IDs assigned to nodes?

IDs are assigned by the master node. One node from the entire cluster needs to be elected as the master node. In this way, other nodes can register with the master node, and the master node will assign an ID to each node. Each node can then use this ID to obtain the metadata from previous time wheels in the storage service, and the time wheel can be finally initialized.

The following figure illustrates node registration and ID allocation.

Image for post
Image for post
Node registration and ID allocation process

Note that the master node itself is also registered and assigned an ID. The reason for this is that there is no separation between the master node and non-master nodes in ordinary applications, and the master node also participates in the entire computing operation. The master node, though, is also additionally responsible for cluster management.

The following figure illustrates single-node startup.

Image for post
Image for post
Single-node startup process

How can cluster management be automated?

Based on the master node, then, a set of cluster management mechanisms can be constructed. For ordinary applications, cluster extension and reduction is part of normal operations.

However, in dynamic scheduling scenarios, even the application owner is not aware of cluster extension and reduction operations.

The cluster can perceive that it has been extended or reduced. This is because the master node can perceive the survival status of other nodes in the cluster. The master node adjusts cluster load according to changes in the cluster capacity.

How can scheduled task extraction clustering relieve cluster pressure?

Previous sections have detailed how task extraction concurrency can be improved by splitting a single linked list into multiple linked lists on the time wheel at a certain point in time.

A set of cluster management solutions has also been explained. If the concurrency is concentrated in a single node, and the node can only perform extraction individually, then when the number of expired tasks on the current node becomes too large at a given moment, pressure on certain nodes in the cluster becomes very high.

Computing cluster capabilities can be optimized with the cluster management capabilities described in this article: Collections of expired task linked lists can be distributed to other cluster nodes through soft load balancing Since task data is stored centrally, all tasks from a linked list can be extracted if other nodes can obtain the ID of the task linked list header, thus balancing cluster pressure. The following figure illustrates the process.

Image for post
Image for post
Cluster parallel task extraction

Summary

This article describes how the solution evolved and the problems that were encountered at various stages. The following process flowchart summarizes the key points and provides some additional context.

Image for post
Image for post

(Original article by He Bibo何碧波)

Alibaba Tech

First-hand and in-depth information about Alibaba’s latest technology → Search “Alibaba Tech” on Facebook

Written by

First-hand & in-depth information about Alibaba's tech innovation in Artificial Intelligence, Big Data & Computer Engineering. Follow us on Facebook!

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store