Spark — Adaptive Query Engine

Amit Singh Rathore
4 min readMay 28, 2022

--

You might be familiar with the concept of migrating from Spark 2 to Spark 3 for performance & cost benefits. One of the significant changes in Spark 3.0 was the AQE (adaptive query engine). In this article, we will go through the major features of Spark 3.0.

Some challenges of Spark that impact the efficiency and performance of jobs:

  • Shuffle Partitions in Wide Transformations
  • Dominating Values in GroupBy(Partition-Skew)
  • Suboptimal Join Strategy for Small Tables

Let’s explore how Adaptive Query Execution (AQE) addresses above mentioned challenges:

Dynamically Coalescing Shuffle Partitions

If we have too few shuffle partitions, each partition might get too much data, which will make the process look sequential. If the memory is insufficient for processing that much data, disk spill or OOM will occur. On the other hand, if we have too many shuffle partitions it will lead to us having too many small tasks causing an overhead on task scheduling and managing.

AQE solves the problem of the inadequate number of partitions by setting this value dynamically based on the runtime statistics collected from the completed upstream query stages. AQE starts to coalesce (best effort basis) the smaller partitions to fit the data in the target partition size.

Dynamically Optimizing Skew Joins

In jobs with skewed partitions, the job time becomes the max time taken by that single large (skewed) partition (straggler), although the smaller partitions might have finished sooner.

To solve this issue AQE splits the skewed partitions into smaller subpartitions and replicates their matching partition on the other side of the join so that more tasks are created to do the join in parallel. So, instead of having a big partition doing work, smaller subpartitions do the same work parallelly and reduce the overall job time.

Dynamically Switching Join Strategies

To decide the join type, Spark used to get the table stats and then used to apply a suitable join. But this strategy did not consider the actual relations subset being used.

Let us consider a scenario where we joined a large table with a smaller table containing duplicate records, we applied distinct on the smaller table to reduce its size to under 10–20 MB. Despite the smaller size, after applying a filter, the join operation will still use the Shuffle-Sort-Merge strategy instead of the more efficient Broadcast Hash Join.

To fix this AQE decides on the join strategy at the run time based on the accurate join relation size. AQE converts the sort-merge join to broadcast hash join if it is applicable. It also reduces network traffic by using localized shuffle.

source — Databricks

Parameters to Consider While Working with Adaptive Query Engine

spark.sql.adaptive.enabled=true
spark.sql.adaptive.forceApply=false
spark.sql.broadcastTimeout=600
spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.advisoryPartitionSizeInBytes=128m(Block size)
spark.sql.adaptive.coalescePartitions.minPartitionNum=1(Best case)
spark.sql.adaptive.coalescePartitions.initialPartitionNum=8192
spark.sql.adaptive.fetchShuffleBlocksInBatch=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=400M
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2
spark.sql.adaptive.optimizer.excludedRules

There are two relatively less talked about features as well:

DemoteBroadcastHashJoin

If the dataset has a high ratio of empty partitions, then Spark adds a no-broadcast-hash-join hint to avoid broadcasting it.

EliminateJoinToEmptyRelation

If applicable, it converts a join to an empty LocalRelation.

The Spark community is actively adding other optimizations that depend on stats to AQE. For example, AQE will also perform join reordering, which is currently done by CBO.

spark.sql.cbo.enabled true
spark.sql.cbo.joinReorder.enabled true

Skipping Redundant Stages Dynamically

When Spark encounters redundant operations, AQE can dynamically skip them to improve performance.
Let us say that we are doing a groupBy operation a dataframe that has filter and that filter results in empty relations. AQE detects that there is no data to aggregate and skips the groupBy operation, thus saving unnecessary computation and improving job performance. This optimization ensures that Spark only performs meaningful operations, reducing both runtime and resource usage.

Dynamically Optimizing Sort Operations

When working with large datasets, sorting can be an expensive operation. However, if the data is already sorted, sorting again can lead to unnecessary overhead. AQE detects when a DataFrame is already sorted and skips the sort step.

Note: Adaptive Query Execution can change the number of shuffle partitions and so is not supported for streaming queries (Spark Structured Streaming).

When spark.sql.adaptive.localShuffleReader.enabled=true and after converting SortMerge Join to BroadcastHash Join, Spark also does future optimize to reduce the network traffic by converting a regular shuffle to a localized shuffle.

I hope you found this article instructional and informative. If you have any feedback or queries, please let me know in the comments below.

--

--

Amit Singh Rathore
Amit Singh Rathore

Written by Amit Singh Rathore

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML

No responses yet