You can configure Spark dynamic allocation with Data Flow in three ways.
Using the Console
When creating an Application, click
Enable Autoscaling. The default configuration is populated into the
Spark configuration properties. The minimum number of executors
matches the value of the spark.dynamicAllocation.minExecutors property. The
maximum number of executors matches the value of the
spark.dynamicAllocation.maxExecutors property. You can set different
values to the defaults for the spark.dynamicAllocation.executorIdleTimeout
and spark.dynamicAllocation.schedulerBacklogTimeout properties.
Using the API 🔗
Set both spark.dynamicAllocation.enabled and spark.dynamicAllocation.shuffleTracking.enabled to true for the Data Flow application.
A Spark application with dynamic allocation enabled requests more executors when it has
pending tasks waiting to be scheduled. This condition necessarily implies that the existing
set of executors is insufficient to simultaneously saturate all tasks that have been
submitted but not yet finished.
Spark requests executors in rounds. The actual request is triggered when there have been
pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds.
If the queue of pending tasks persists, the request is triggered again every
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds
thereafter. The number of executors requested in each round increases exponentially from the
previous round. For example, an application adds 1 executor in the first round, and then 2
executors, then 4 executors, and so on in later rounds.
The policy for removing executors is much simpler. A Spark application removes an executor
when it has been idle for more than
spark.dynamicAllocation.executorIdleTimeout seconds.
Note
Under most circumstances, this condition is mutually exclusive with
the request condition, in that an executor should not be idle if pending tasks are still
to be scheduled.
Using Spark-Submit 🔗
Populate the configuration properties. For
example:
Descriptions of the Spark dynamic allocation properties you can use with Data Flow, and their possible values.
Property Name
Default Value
Value Range
Description
Supported Spark Versions
spark.dynamicAllocation.enabled
false
true | false
Whether to use dynamic resource allocation or not. This property scales the
number of executors registered with the application up and down based on the
workload.
3.x
spark.dynamicAllocation.shuffleTracking.enabled
true
true
Enables shuffle file tracking for executors, which allows dynamic allocation
without the need for an external shuffle service. This option tries to keep alive
executors that are storing shuffle data for active jobs.
3.x
spark.dynamicAllocation.minExecutors
1
[1, maxExecutors]
The lower bound for the number of executors when dynamic allocation is
enabled.
3.x
spark.dynamicAllocation.maxExecutors
4
[minExecutors, 1000]
The upper bound for the number of executors when dynamic allocation is
enabled.
3.x
spark.dynamicAllocation.executorIdleTimeout
60
[60, 600]
If dynamic allocation is enabled, the time, in seconds, an executor must be
idle for, before it's removed.
3.x
spark.dynamicAllocation.schedulerBacklogTimeout
60
[60, 600]
If dynamic allocation is enabled, the time, in seconds, there have been tasks
pending, before new executors are requested
3.x
spark.dataflow.dynamicAllocation.quotaPolicy
min
min | max
spark.dataflow.dynamicAllocation.quotaPolicy=min
Data Flow reduces the tenancy quota during the start of the run by the value of
minExecutors. During run execution, the quota can be decreased
up to the value of maxExecutors, but it's a subject to
availability as other concurrent runs could use it too.
spark.dataflow.dynamicAllocation.quotaPolicy=max
Data Flow reduces the tenancy quota during the start of the run by the value of
maxExecutors. This guarantees that no other concurrent runs can
use it, and this run has the maximum resources available.