Getting Started

How to get started using Spark dynamic allocation with Data Flow.

The prerequisites and configuration needed to use dynamic allocation with Data Flow.

Prerequisites

  • Use Spark 3.x.
  • Spark dynamic allocation is beneficial only for jobs with processing times of more than 10 minutes.
  • Although Spark dynamic allocation can be used with Structured Streaming, it's optimized for batch jobs. For more information, see Spark Dynamic Allocation and Spark Structured Streaming.
  • Enabling Spark dynamic allocation enables shuffle tracking.
  • External shuffle service isn't supported.

Configuration of Spark Dynamic Allocation

You can configure Spark dynamic allocation with Data Flow in three ways.

Using the Console

When creating an Application, click Enable Autoscaling. The default configuration is populated into the Spark configuration properties. The minimum number of executors matches the value of the spark.dynamicAllocation.minExecutors property. The maximum number of executors matches the value of the spark.dynamicAllocation.maxExecutors property. You can set different values to the defaults for the spark.dynamicAllocation.executorIdleTimeout and spark.dynamicAllocation.schedulerBacklogTimeout properties.

Using the API

Set both spark.dynamicAllocation.enabled and spark.dynamicAllocation.shuffleTracking.enabled to true for the Data Flow application.

A Spark application with dynamic allocation enabled requests more executors when it has pending tasks waiting to be scheduled. This condition necessarily implies that the existing set of executors is insufficient to simultaneously saturate all tasks that have been submitted but not yet finished.

Spark requests executors in rounds. The actual request is triggered when there have been pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds. If the queue of pending tasks persists, the request is triggered again every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds thereafter. The number of executors requested in each round increases exponentially from the previous round. For example, an application adds 1 executor in the first round, and then 2 executors, then 4 executors, and so on in later rounds.

The policy for removing executors is much simpler. A Spark application removes an executor when it has been idle for more than spark.dynamicAllocation.executorIdleTimeout seconds.
Note

Under most circumstances, this condition is mutually exclusive with the request condition, in that an executor should not be idle if pending tasks are still to be scheduled.

Using Spark-Submit

Populate the configuration properties. For example:
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=1
--conf spark.dynamicAllocation.maxExecutors=4
--conf spark.dynamicAllocation.executorIdleTimeout=60
--conf spark.dynamicAllocation.shuffleTracking.enabled=true

Minimum Configuration

As a minimum, set the following properties with these values:
  • spark.dynamicAllocation.enabled: true
  • spark.dynamicAllocation.shuffleTracking.enabled: true

  • spark.dynamicAllocation.minExecutors: 1

  • spark.dynamicAllocation.maxExecutors: 4

  • spark.dynamicAllocation.executorIdleTimeout: 60

  • spark.dynamicAllocation.schedulerBacklogTimeout: 60

  • spark.dataflow.dynamicAllocation.quotaPolicy: max

Property Descriptions

Descriptions of the Spark dynamic allocation properties you can use with Data Flow, and their possible values.

Property Name Default Value Value Range Description Supported Spark Versions
spark.dynamicAllocation.enabled false true | false Whether to use dynamic resource allocation or not. This property scales the number of executors registered with the application up and down based on the workload. 3.x
spark.dynamicAllocation.shuffleTracking.enabled true true Enables shuffle file tracking for executors, which allows dynamic allocation without the need for an external shuffle service. This option tries to keep alive executors that are storing shuffle data for active jobs. 3.x
spark.dynamicAllocation.minExecutors 1 [1, maxExecutors] The lower bound for the number of executors when dynamic allocation is enabled. 3.x
spark.dynamicAllocation.maxExecutors 4 [minExecutors, 1000] The upper bound for the number of executors when dynamic allocation is enabled. 3.x
spark.dynamicAllocation.executorIdleTimeout 60 [60, 600] If dynamic allocation is enabled, the time, in seconds, an executor must be idle for, before it's removed. 3.x
spark.dynamicAllocation.schedulerBacklogTimeout 60 [60, 600] If dynamic allocation is enabled, the time, in seconds, there have been tasks pending, before new executors are requested 3.x
spark.dataflow.dynamicAllocation.quotaPolicy min min | max

spark.dataflow.dynamicAllocation.quotaPolicy=min

Data Flow reduces the tenancy quota during the start of the run by the value of minExecutors. During run execution, the quota can be decreased up to the value of maxExecutors, but it's a subject to availability as other concurrent runs could use it too.

spark.dataflow.dynamicAllocation.quotaPolicy=max

Data Flow reduces the tenancy quota during the start of the run by the value of maxExecutors. This guarantees that no other concurrent runs can use it, and this run has the maximum resources available.

3.x

For more information see the Spark documentation on Dynamic Resource Allocation and Dynamic Allocation.