Spark-Submit Functionality in Data Flow

Find out how to use Data Flow with Spark-submit.

Spark-Submit Compatibility

You can use spark-submit compatible options to run your applications using Data Flow.

Spark-submit is an industry standard command for running applications on Spark clusters. The following spark-submit compatible options are supported by Data Flow:

  • --conf
  • --files
  • --py-files
  • --jars
  • --class
  • --driver-java-options
  • --packages
  • main-application.jar or main-application.py
  • arguments to main-application. Arguments passed to the main method of your main class (if any).

The --files option flattens your file hierarchy, so all files are placed at the same level in the current working directory. To keep the file hierarchy, use either archive.zip, or --py-files with a JAR, ZIP or EGG dependecy module.

The --packages option is used to include any other dependencies by supplying a comma-delimited list of Maven coordinates. For example,
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2
All transitive dependencies are handled when using this command.
Note

With the --packages option, each Run's driver pod needs to download dependencies dynamically, which relies on network stability and access to Maven central or other remote repositories. Use the Data Flow Dependency Packager to generate a dependency archive for production.
Note

For all spark-submit options on Data Flow, the URI must begin oci://.... URIs starting with local://... or hdfs://... are not supported. Use the fully qualified domain names (FQDN) in the URI. Load all files, including main-application, to Oracle Cloud Infrastructure Object Storage.

Creating a Spark-Submit Data Flow Application explains how to create an application in the console using spark-submit. You can also use spark-submit with a Java SDK or from the CLI. If you are using CLI, you do not have to create a Data Flow Application to run your Spark application with spark-submit compatible options on Data Flow. This is useful if you already have a working spark-submit command in a different environment. When you follow the syntax of the run submit command, an Application is created, if one doesn't already exist in the main-application URI.

Installing Public CLI with the run submit Command

These steps are needed to install a public CLI with the run submit command for use with Data Flow:

  1. Create a customized Python environment to use as the destination of the CLI.
    python3.9 -m venv cli-3
    source cli-3/bin/activate
  2. Install the public Command Line Interface (CLI).
  3. Verify that the run submit commands are loaded:
    oci --version
     
    oci data-flow run submit
    Usage: oci data-flow run submit [OPTIONS]
     
    Error: Missing option(s) --compartment-id, --execute.
  4. Authenticate the session:
    oci session authenticate
    • Select a region from the list.
    • Enter the name of the profile to create.
    • Create a token profile:
      oci iam region list --config-file /Users/<user-name>/.oci/config --profile <profile_name> --auth security_token

Using Spark-submit in Data Flow

You can take your spark-submit CLI command and convert it into a compatible CLI command in Data Flow.

The spark-submit compatible command in Data Flow, is the run submit command. If you already have a working Spark application in any cluster, you are familiar with the spark-submit syntax. For example:
spark-submit --master spark://<IP-address>:port \
--deploy-mode cluster \
--conf spark.sql.crossJoin.enabled=true \
--files oci://file1.json \
--class org.apache.spark.examples.SparkPi \ 
--jars oci://file2.jar <path_to>/main_application_with-dependencies.jar 1000
For running this application in Data Flow, the CLI command is:
oci data-flow run submit \
--compartment-id <compartment-id> \
--execute "--conf spark.sql.crossJoin.enabled=true
 --files oci://<bucket-name>@<namespace>/path/to/file1.json
 --jars oci://<bucket-name>@<namespace>/path/to/file3.jar
 oci://<bucket-name>@<namespace>/path_to_main_application_with-dependencies.jar 1000"
To get the compatible Data Flow command, follow these steps:
  1. Upload all the files, including the main application, in the Object Storage.
  2. Replace the existing URIs with the corresponding oci://... URI.
  3. Remove any unsupported or reserved spark-submit parameters. For example, --master and --deploy-mode are reserved for Data Flow and a user doesn't need to populate them.

  4. Add --execute parameter and pass in a spark-submit compatible command string. To build the --execute string, keep the supported spark-submit parameters, and main-application and its arguments, in sequence. Put them inside a quoted string (single-quote or double-quotes).

  5. Replace spark submit with the Oracle Cloud Infrastructure standard command prefix, oci data-flow run submit.
  6. Add the Oracle Cloud Infrastructure mandatory argument and parameter pairs for --profile, --auth security_token, and --compartment-id.

Run Submit Examples

Some examples of run submit in Data Flow.

  • Examples of run submit using Oci-cli.
  • An example of run submit using Oci-curl.

Oci-cli Examples

Examples of run submit using oci-cli in Data Flow.

Basic run submit:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--execute "--conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
Run submit with optional configurations of --jars, --files, and pyfiles:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--execute "--jars oci://<bucket-name>@<tenancy-name>/a.jar
 --files \"oci://<bucket-name>@<tenancy-name>/b.json\"
 --py-files oci://<bucket-name>@<tenancy-name>/c.py
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
Run submit with archiveUri, --jars, --files, and pyfiles:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--archive-uri  "oci://<bucket-name>@<tenancy-name>/mmlspark_original.zip" \
--execute "--jars local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar
 --files \"local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar\"
 --py-files local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
Run submit with URL validation inside jars, files, and pyfiles:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--archive-uri  "oci://<bucket-name>@<tenancy-name>/mmlspark_original.zip" \
--execute "--jars oci://<bucket-name>@<tenancy-name>/fake.jar
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
#result
{'opc-request-id': '<opc-request-id>', 'code': 'InvalidParameter',
 'message': 'Invalid OCI Object Storage uri. The object was not found or you are not authorized to access it.
 {ResultCode: OBJECTSTORAGE_URI_INVALID,
 Parameters: [oci://<bucket-name>@<tenancy-name>/fake.jar]}', 'status': 400}

To enable Resource Principal Auth, add the Spark property in the conf file using Spark submit, and add the following configuration in the execute method:

--execute 
"--conf dataflow.auth=resource_principal
 --conf other-spark-property=other-value" 

Oci-curl Example

An example of run submit using oci-curl in Data Flow.

oci-curl <IP-Address>:443 POST /Users/<user-name>/workspace/sss/dependency_test/spark-submit-test.json
 /20200129/runs --insecure --noproxy <IP-Address>
 
{
"execute": "--jars local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar
 --files \"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar\"
 --py-files local:///opt/spark/conf/spark.properties
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
     oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10",
"displayName": "spark-submit-test",
"sparkVersion": "2.4",
"driverShape": "VM.Standard2.1",
"executorShape": "VM.Standard2.1",
"numExecutors": 1,
"logsBucketUri": "",
"freeformTags": {},
"definedTags": {}
}