データ・フローでのSpark-Submit機能

データ・フローをSpark-submitとともに使用する方法を確認します。

Spark-Submitとの互換性

spark-submit互換オプションを使用すると、データ・フローを使用してアプリケーションを実行できます。

Spark-submitは、Sparkクラスタでアプリケーションを実行するための業界標準のコマンドです。データ・フローでは、次のspark-submit互換オプションがサポートされています。

  • --conf
  • --files
  • --py-files
  • --jars
  • --class
  • --driver-java-options
  • --packages
  • main-application.jarまたはmain-application.py
  • main-applicationへの引数。メイン・クラスのmainメソッドに渡される引数(存在する場合)。

--filesオプションを使用すると、ファイル階層がフラット化され、すべてのファイルが現在の作業ディレクトリの同じレベルに配置されます。ファイル階層を保持するには、JAR、ZIPまたはEGG依存性モジュールでarchive.ZIPまたは--py-filesを使用します。

--packagesオプションを使用すると、Maven座標をカンマで区切ったリストを指定して、他の依存関係を含めることができます。次に例を示します
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2
このコマンドを使用すると、すべての推移的な依存性が処理されます。
ノート

--packagesオプションを使用すると、各実行のドライバ・ポッドで依存関係の動的なダウンロードが必要になりますが、これは、ネットワークの安定性と、Mavenセントラル・リポジトリまたはその他のリモート・リポジトリへのアクセスに依存します。データ・フローの依存関係パッケージャを使用して、本番用の依存関係アーカイブを生成します。
ノート

データ・フローでのすべてのspark-submitオプションについて、URIはoci://...で始まる必要があります。local://...またはhdfs://...で始まるURIはサポートされていません。URIでは完全修飾ドメイン名(FQDN)を使用します。main-applicationを含むすべてのファイルをOracle Cloud Infrastructure Object Storageにロードします。

Spark-Submitデータ・フロー・アプリケーションの作成では、spark-submitを使用してコンソールでアプリケーションを作成する方法について説明します。spark-submitは、Java SDKとともに使用したり、CLIから使用したりすることもできます。CLIを使用している場合、データ・フローでspark-submit互換オプションを使用してSparkアプリケーションを実行するためにデータ・フロー・アプリケーションを作成する必要はありません。これは、別の環境でspark-submitコマンドがすでに動作している場合に便利です。run submitコマンドの構文に従うと、アプリケーションがまだmain-application URIに存在しない場合は作成されます。

パブリックCLIとrun submitコマンドのインストール

データ・フローで使用するために、パブリックCLIとrun submitコマンドをインストールするには、次のステップが必要です。

  1. CLIの宛先として使用するカスタマイズされたPython環境を作成します。
    python3.9 -m venv cli-3
    source cli-3/bin/activate
  2. パブリック・コマンド・ライン・インタフェース(CLI)をインストールします。
  3. run submitコマンドがロードされていることを確認します:
    oci --version
     
    oci data-flow run submit
    Usage: oci data-flow run submit [OPTIONS]
     
    Error: Missing option(s) --compartment-id, --execute.
  4. セッションを認証します:
    oci session authenticate
    • リストからリージョンを選択します。
    • 作成するプロファイルの名前を入力します。
    • トークン・プロファイルを作成します:
      oci iam region list --config-file /Users/<user-name>/.oci/config --profile <profile_name> --auth security_token

データ・フローでのSpark-submitの使用

spark-submitのCLIコマンドを、互換性のあるデータ・フローでのCLIコマンドに変換できます。

データ・フローでのspark-submitの互換コマンドは、run submitコマンドです。いずれかのクラスタで動作しているSparkアプリケーションがすでにある場合は、spark-submit構文をよく理解しているはずです。例:
spark-submit --master spark://<IP-address>:port \
--deploy-mode cluster \
--conf spark.sql.crossJoin.enabled=true \
--files oci://file1.json \
--class org.apache.spark.examples.SparkPi \ 
--jars oci://file2.jar <path_to>/main_application_with-dependencies.jar 1000
このアプリケーションをデータ・フローで実行する場合、CLIコマンドは:
oci data-flow run submit \
--compartment-id <compartment-id> \
--execute "--conf spark.sql.crossJoin.enabled=true
 --files oci://<bucket-name>@<namespace>/path/to/file1.json
 --jars oci://<bucket-name>@<namespace>/path/to/file3.jar
 oci://<bucket-name>@<namespace>/path_to_main_application_with-dependencies.jar 1000"
互換性のあるデータ・フロー・コマンドを取得するには、次のステップに従います:
  1. メイン・アプリケーションを含むすべてのファイルをオブジェクト・ストレージにアップロードします。
  2. 既存のURIを対応するoci://... URIに置き換します。
  3. サポートされていないか予約されているspark-submitパラメータを削除します。たとえば、--masterおよび--deploy-modeはデータ・フロー用に予約されており、ユーザーがこれらに値を移入する必要はありません。

  4. --executeパラメータを追加し、spark-submit互換コマンド文字列を渡します。--execute文字列を構築するには、サポートされているspark-submitパラメータと、main-applicationおよびその引数を順に保持します。引用符で囲まれた文字列(一重引用符または二重引用符)内にそれらを配置します。

  5. spark submitをOracle Cloud Infrastructure標準コマンド接頭辞oci data-flow run submitに置き換えます。
  6. --profile--auth security_tokenおよび--compartment-idのOracle Cloud Infrastructure必須引数とパラメータのペアを追加します。

Run Submitの例

データ・フローのrun submitの例。

  • Oci-cliを使用したrun submitの例。
  • Oci-curlを使用したrun submitの例。

Oci-cliの例

データ・フローでのoci-cliを使用したrun submitの例。

基本的なrun submit:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--execute "--conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
--jars--filesおよびpyfilesのオプション構成を使用したRun submit:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--execute "--jars oci://<bucket-name>@<tenancy-name>/a.jar
 --files \"oci://<bucket-name>@<tenancy-name>/b.json\"
 --py-files oci://<bucket-name>@<tenancy-name>/c.py
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
archiveUri--jars--filesおよびpyfilesを使用したRun submit:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--archive-uri  "oci://<bucket-name>@<tenancy-name>/mmlspark_original.zip" \
--execute "--jars local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar
 --files \"local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar\"
 --py-files local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
jarsfilesおよびpyfiles内でのURL検証を使用したRun submit:
oci --profile oci-cli --auth security_token data-flow run submit \
--compartment-id <compartment-id> \
--archive-uri  "oci://<bucket-name>@<tenancy-name>/mmlspark_original.zip" \
--execute "--jars oci://<bucket-name>@<tenancy-name>/fake.jar
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
 oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10"
#result
{'opc-request-id': '<opc-request-id>', 'code': 'InvalidParameter',
 'message': 'Invalid OCI Object Storage uri. The object was not found or you are not authorized to access it.
 {ResultCode: OBJECTSTORAGE_URI_INVALID,
 Parameters: [oci://<bucket-name>@<tenancy-name>/fake.jar]}', 'status': 400}

リソース・プリンシパル認証を有効にするには、Spark submitを使用してconfファイルにSparkプロパティを追加し、実行メソッドに次の構成を追加します:

--execute 
"--conf dataflow.auth=resource_principal
 --conf other-spark-property=other-value" 

Oci-curlの例

データ・フローでのoci-curlを使用したrun submitの例。

oci-curl <IP-Address>:443 POST /Users/<user-name>/workspace/sss/dependency_test/spark-submit-test.json
 /20200129/runs --insecure --noproxy <IP-Address>
 
{
"execute": "--jars local:///opt/dataflow/java/mmlspark_2.11-0.18.1.jar
 --files \"local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar\"
 --py-files local:///opt/spark/conf/spark.properties
 --conf spark.sql.crossJoin.enabled=true
 --class org.apache.spark.examples.SparkPi
     oci://<bucket-name>@<tenancy-name>/spark-examples_2.11-2.3.1-SNAPSHOT-jar-with-dependencies.jar 10",
"displayName": "spark-submit-test",
"sparkVersion": "2.4",
"driverShape": "VM.Standard2.1",
"executorShape": "VM.Standard2.1",
"numExecutors": 1,
"logsBucketUri": "",
"freeformTags": {},
"definedTags": {}
}