Adaptive Query Execution with the RAPIDS Accelerator for Apache Spark

The benefits of AQE are not specific to CPU execution and can provide additional performance improvements in conjunction with GPU-acceleration.

The main benefit of AQE is that queries can be optimized during execution based on statistics that may not be available when initially planning the query.

Specific optimizations offered by AQE include:

  • Coalescing shuffle partitions
  • Using broadcast hash joins if one side of the join can fit in memory
  • Optimizing skew joins

AQE works by converting leaf exchange nodes in the plan to query stages and then schedules those query stages for execution. As soon as at least one query stage completes, the rest of the plan is re-optimized (using a combination of logical and physical optimization rules). This process is repeated until all child query stages are materialized, and then the final query stage is executed. This logic is contained in the AdaptiveSparkPlanExec.getFinalPhysicalPlan method.

With AQE enabled, the physical plan will contain an AdaptiveSparkPlanExec operator. This could be the root node or could be wrapped in an InsertIntoHadoopFsRelationCommand operator if the query action is to write results to disk.

Rather than replace the AdaptiveSparkPlanExec operator with a GPU-specific version, we have worked with the Spark community to allow custom query stage optimization rules to be provided, to support columnar plans.

However, Spark considers the final output ofAdaptiveSparkPlanExec to be row-based. The supportsColumnar method always returns false, and calling doExecuteColumnar will throw an exception. For this reason, the RAPIDS optimizer will insert a columnar-to-row transition as the root node, if necessary. In the case where the adaptive plan is wrapped in a write to a columnar source, then there is special handling at runtime to avoid an unnecessary columnar-to-row transition followed by a row-to-columnar transition.

Optimizer Rules

On startup, the SQLExecPlugin plugin registers two distinct sets of optimizer rules:

extensions.injectColumnar(_ => ColumnarOverrideRules())
extensions.injectQueryStagePrepRule(_ => GpuQueryStagePrepOverrides())

The ColumnarOverrideRules are used whether AQE is enabled or not, and the GpuQueryStagePrepOverrides rules are specific to AQE.

There are four sets of optimizer rules used by AQE.

queryStagePreparationRules

This set of rules is applied once before any query stages are created and is also applied once for each re-optimization of the plan, after one or more query stages have completed. The RAPIDS Accelerator GpuQueryStagePrepOverrides rule is applied as part of this rule set.

This rule does not directly transform the plan into a new plan but tags nodes in the Spark plan where they cannot be supported on the GPU. This is necessary because when individual query plans are created and then passed to the plugin for optimization, we do not have any information about the parent query stages, so we rely on the plan being tagged upfront.

queryStageOptimizerRules

This set of rules is applied to an Exchange node when creating a query stage and will result in a BroadcastQueryStageLike or ShuffleQueryStageLike node being created. This set of rules does not involve the RAPIDS Accelerator and applies optimizations such as optimizing skewed joins and coalescing shuffle partitions.

postStageCreationRules

This set of rules is applied after a new query stage has been created. This will apply ColumnarOverrideRules and this is where the query stage gets translated into a GPU plan. These rules rely on the plan being tagged by an earlier run of the queryStagePreparationRules rules.

finalStageCreationRules

The final query stage is optimized with this set of rules, which is a combination of the queryStageOptimizerRules and the postStageCreationRules, with special handling to filter out some rules that do not apply to the final query stage.

Query Stage Re-use

The logic in AdaptiveSparkPlanExec.getFinalPhysicalPlan attempts to cache query stages for re-use. The original Spark logic used the canonicalized version of the Exchange node as the key for this cache but this can result in errors if there are both CPU and GPU query stages that are created from Exchange nodes that have equivalent canonical plan. This issue was resolved in SPARK-35093 and the fix is available in Spark versions 3.0.3+, 3.1.2+, and 3.2.0+.