As data grows, SQL execution tends to take longer. This imposes higher requirements on the optimization capability and execution modes of databases. As the promise of the cloud expands to the development of databases, traditional users are keen to migrate their business to the cloud to make use of the elasticity and scalability of the cloud. However, with rapid business expansion, and despite the dynamic resource scale-outs enabled by the cloud, SQL execution remains slow and fails to meet expectations. In many cases, the resource scale-outs are not fully utilized. Traditional commercial database services, such as Oracle and SQL Server, support parallel query engines to maximize the use of system resources and accelerate SQL execution.
This article demonstrates some of the key issues and core technologies of parallel query engines of cloud databases that enable parallel optimization and execution at a lower cost.
2. How Parallel Queries Work
A query similar to online analytical processing (OLAP) involves a significant volume of data. This volume exceeds the memory capacity of the database. Most of the data may not be cached to the buffer of the database. Instead, the data must be dynamically loaded to the buffer when the query is executed. This causes a large number of I/O operations, which are time-consuming. Therefore, to accelerate SQL execution, the first consideration is how to accelerate I/O operations.
Due to hardware limits, each I/O operation takes the same amount of time. The difference between sequential and random I/O operations is narrower because of the wide application of solid state drives (SSDs). How can you accelerate I/O operations more effectively? Parallel I/O operations are a simple solution. If I/O operations are performed by multiple threads and each thread reads only part of the data, the data can be loaded to the database buffer in a shorter time.
The preceding figure shows the process of parallel data reading. Each worker represents a thread. If the data to be read is partitioned, each thread can read one partition. The data to be read can also be sharded based on a fixed size, such as the size of a data page. Then, each thread can read a shard by round-robin polling.
If each worker reads one of the partitions of the data to be read, the volume of data distributed to each worker may be different. On the contrary, if each worker reads a shard by round-robin polling and the size of each shard is small, the volume of data processed by each worker can be approximately the same.
Data must be processed immediately after the data is read to the buffer. Otherwise, the data will be evicted when the buffer is used up, which is contrary to the original purpose of I/O acceleration. Therefore, parallel data reading must be combined with synchronous parallel data processing, which is fundamental to parallel query acceleration.
Traditional optimizers can only generate serial execution plans. The first step to implementing synchronous parallel data reading and processing is to transform the exiting optimizers so the improved optimizers can generate parallel execution plans. For example, optimizers must be able to decide which tables can be read (or which operations can be executed) in a parallel manner and whether the parallel reading or execution can bring satisfactory benefits.
The transformation to parallel data reading and processing does not always reduce costs. Let’s imagine that a table contains a small amount of data, for example, only a few rows. If you read data from such a table in a parallel manner, the costs of building multiple threads and data synchronization between the threads may be much higher than the benefits. In this case, the parallel execution consumes more resources and time, which is not worthwhile. Therefore, consider the costs before you undertake parallel execution plans. Otherwise, you may face more severe performance deterioration.
3. Select Tables for Parallel Scanning
The selection of tables for parallel scanning is the foundation of parallel execution plans. You must calculate and compare the costs of parallel scanning of different tables and then select the tables suitable for parallel scanning. This is the first step of iterating parallel execution plans. The calculated costs of parallel scanning may help you find a better
JOIN order, especially when a large number of tables are to be joined, and thus, more iteration space is needed. You can retain the original
JOIN order to make the optimization process more efficient. In addition, the access methods of the tables to be joined may vary. Access methods include full-table scanning, index scanning, and index range scanning. These access methods also affect the final costs of parallel scanning.
In most cases, the largest table is selected for parallel scanning because this produces the maximum benefits. You can also select multiple tables for parallel scanning at the same time. This more complex scenario is discussed in the next section.
In the following example, the
SELECT statement is used to query the top ten customers that spent the most amount of money in 1996:
SELECT c.c_name, sum(o.o_totalprice) as s
FROM customer c, orders o
WHERE c.c_custkey = o.o_custkey
AND o_orderdate >= '1996-01-01'
AND o_orderdate <= '1996-12-31'
GROUP BY c.c_name
ORDER BY s DESC
The orders table in the statement is a fact table and contains a large amount of data about orders. The customer table in the statement is a dimension table and contains a relatively small amount of data about customers. The following figure shows a parallel execution plan that is generated for the SQL statement in this example.
In this execution plan, the orders table is selected for parallel scanning and the scanning is completed by 32 workers. Each worker scans a shard of the orders table. Then, an
INDEX LOOKUP JOIN operation based on the
o_custkey field is performed on the orders table and the customer table. The results of the
JOIN operation are sent to a collector, which performs the
ORDER BY, and
LIMIT operations on the results.
4. Join Multiple Tables Selected for Parallel Scanning
You can select multiple tables for parallel scanning. For example, if your database contains two or more fact tables, you can select more than one table for parallel scanning. The following code shows an example:
SELECT o.o_custkey, sum(l.l_extendedprice) as s
FROM orders o, lineitem l
WHERE o.o_custkey = l.l_orderkey
GROUP BY o.o_custkey
ORDER BY s
Both the orders table and the lineitem table in the statement are fact tables and contain a large amount of data. The following figure shows a parallel execution plan that is generated for the SQL statement in this example:
In this execution plan, both the orders table and the lineitem table are selected for parallel scanning, and the scanning of each table is completed by 32 workers. How is parallel scanning implemented for multiple tables? Let’s imagine you select two tables for parallel scanning. For
JOIN operations performed on two tables, common
JOIN algorithms include Nested Loop
HASH JOIN. Make sure you choose a suitable table scanning method based on the
JOIN algorithm to ensure the accuracy of results.
For example, the
HASH JOIN algorithm is used. In a serial execution plan, the
HASH JOIN algorithm creates a hash table named
Build. Next, the algorithm reads data from a hash table named
Probe, calculates the hash of the data, and then performs hash matching on the calculation results and the Build table. If the matching succeeds, the algorithm returns the results. Otherwise, the algorithm repeats the process of data reading and hash matching. In a parallel execution plan, the parallelization optimizer transforms the working principle of the
HASH JOIN algorithm to adapt to the parallel execution plan. The following two solutions are available for this transformation.
The first solution is to partition the two tables based on a hash key and make sure data with the same hash values reside in the same partition and are joined by the same thread. The second solution is to create a shared hash table named
Build. This table is shared by all the threads that perform hash joins. At the same time, each thread reads the shard allocated to the thread from the
Probe table and then performs a hash join. You can estimate the costs of the two solutions and choose the more cost-effective one.
- In the first solution, all the data in the tables need to be read, partitioned based on the selected hash key, and distributed to different threads for data processing. This requires the use of an additional Repartition operator, which sends data to different threads based on the partition rules.
- In the second solution, after the shared hash table named
Buildis created, each thread reads a shard from the
Probetable and then performs a hash join. The
Probetable does not need to be sharded based on a hash key, but the shards read by threads cannot overlap.
5. Parallel Execution of Complex Operators in Statistical Analysis
GROUP BY operations are essential for statistical analysis. In particular, if a
JOIN operation generates a large amount of data, the following
GROUP BY operation on the results of the
JOIN operation is the most time-consuming in SQL execution. Therefore, the parallel execution of
GROUP BY operations is also a top priority for parallel query engines.
The following figure shows a parallel execution plan where
GROUP BY operations are parallelized. This parallel execution plan applies to the preceding SQL statement used to query the top ten customers that paid the most amount of money in 1996.
Compared with the earlier parallel execution plan for this SQL statement, this execution plan adds another collector. The description of the additional collector reads Using temporary; Using filesort. This indicates that each time the additional collector receives data from the workers, the collector performs a
GROUP BY operation and then an
ORDER BY operation on the data. Among the two collectors, only the first collector participates in user sessions. The additional collector is used to implement parallel execution in workers, including the parallel execution of
ORDER BY, and
LIMIT operations. The description of the first collector reads Merge sort. This indicates that each time the first collector receives data from the workers, the collector only performs a merge sort operation on the data and then returns the results to users. You may want to know why the first collector replaces a
GROUP BY operation with a merge sort operation. You may also want to know why the execution plans in these examples do not contain
In response to the second question, whether the description of an execution plan contains information about
LIMIT operations depends on the display mode. The description of an execution plan contains information about
LIMIT operations only in tree mode, as shown in the following figure, but not in regular mode.
The figure shows that
LIMIT operations are used in two ways in this execution plan. At the top of the plan tree,
LIMIT operations are performed in user sessions, and the results are returned to users. In the middle of the plan tree,
LIMIT operations are performed by workers. For each worker, each time after an
ORDER BY operation is complete, a
LIMIT operation follows. This way, the volume of data returned to user sessions is reduced, which improves overall performance.
As to the question of why a single
GROUP BY operation by a worker can ensure the accuracy of results, here is the answer. In most cases, each worker only reads one shard of all the data to be processed. Performing a
GROUP BY operation on the data shard processed by each worker is likely to produce invalid results because data in the same group may reside in different shards read by other workers. Therefore, if data in the same group reside in the same data shard and this shard is only read by one worker, the accuracy of
GROUP BY results is ensured. The preceding execution plan displayed in tree mode shows that a Repartition operation is performed on the results of the
JOIN operation based on the
GROUP BY key, which is
c.c_name in this example. This way, data in the same group are distributed to the same worker, and data shards processed by different workers do not overlap. Therefore, the accuracy of the results of the
GROUP BY operation is ensured.
In this case, the
GROUP BY results of each worker are part of the eventual results to be returned to users. Therefore, you can also assign
ORDER BY and
LIMIT operations to each worker to improve the efficiency of parallel execution.
6. Linear Acceleration of Parallel Query Engines for TPC-H Queries
The following figure shows the effects of a parallel query engine on accelerating TPC-H queries. All TPC-H queries were accelerated, and overall, the queries were executed nearly 13 times faster. In all, the execution of 70% of the queries is over eight times faster. Queries in Q6 and Q12 were executed over 32 times faster.
Databases are the core of application systems, and optimizers are the core of databases. The success or failure of a database service depends on the quality and performance of its optimizer, and the creation of a brand new optimizer is a huge challenge to a development team. The stability of the optimizer is a great obstacle, not to mention its technical complexity. Therefore, even traditional commercial database providers can only improve existing optimizers and enhance the support for parallel execution in the pursuit of building a mature parallelization optimizer. The same is true for Alibaba Cloud PolarDB. In the design and development of the parallel query engine, Alibaba Cloud fully utilizes its accumulation of optimizer technologies and implementation base and continues to make improvements. With these efforts, Alibaba Cloud provides an optimizer technology solution with constant iterations that ensure the stable performance of the improved optimizer and pave the way for technological innovation.
PolarDB is a cloud-native distributed relational database service developed by Alibaba Cloud. In 2020, PolarDB was named a Leader by Gartner and won the first prize during the Science and Technology Award of the Chinese Institute of Electronics. PolarDB is based on a cloud-native distributed database architecture and provides the capabilities of large-scale online transaction processing and parallel processing of complex queries. PolarDB is a leader in the field of cloud-native distributed databases and has been recognized by the market. PolarDB demonstrates best practices within Alibaba Group. During the 2020 Tmall Double 11, PolarDB provided full support and a processing capability of 140 million transactions per second during traffic peaks, which hit a record high.