データ・フローとDelta Lake

データ・フローでは、アプリケーションでSpark 3.2.1以降を実行すると、デフォルトでDelta Lakeがサポートされます。

Delta Lakeでは、データ・レイク上にレイクハウス・アーキテクチャを構築できます。Delta Lakeでは、ACIDトランザクションとスケーラブルなメタデータ処理が実現されており、既存のデータ・レイク上にストリーミングとバッチ・データ処理が統合されます。Delta Lake 3.1.0は、データ・フローSpark 3.5.0処理エンジンでサポートされており、データ・フローSpark 3.2.1処理エンジンではDelta Lake 2.0.1および1.2.1がサポートされています。

データ・フローでDelta Lakeを使用するには:
  • データ・フローのSparkバージョンは3.2.1 (以降)である必要があります。
  • delta形式を使用します。
Delta Lakeおよびその使用方法の詳細は、Delta Lakeのリリース・ノートおよびDelta Lakeのドキュメントを参照してください。

ロード・デルタ・レイク

データ・フローで使用するDelta Lakeをロードするには、次のステップに従います。

Spark構成プロパティspark.oracle.deltalake.versionを使用して、使用するDelta Lakeのバージョンを指定します。これを次のいずれかの値に設定します。
Spark.oracle.deltalake.version値
Sparkバージョン spark.oracle.deltalake.versionの値 バイナリがロードされました
3.5.0 3.1.0 Delta Lake 3.1.0
3.2.1 2.0.1 デルタレイク2.0.1
3.2.1 1.2.1 デルタ レイク1.2.1
3.5.0, 3.2.1 none Delta Lakeバイナリはロードされません。指定する必要があります。
ノート

spark.oracle.deltalake.versionの値を設定しない場合、Delta Lake 1.2.1バイナリはデフォルトでロードされます。

spark.oracle.deltalake.versionnoneに設定した場合は、アプリケーションJARの一部としてDelta Lake依存性ライブラリを指定する必要があります。詳細は、Delta Lakeのパブリック・ドキュメンテーションを参照してください。

たとえば、Delta Lake 3.1.0をロードするには、次のライブラリをパッケージ化します。
  • delta-storage-3.1.0.jar
  • delta-spark_2.12-3.1.0.jar
  • delta-contribs_2.12-3.1.0.jar
次のステップに従います:
  1. JavaまたはScalaアプリケーションの場合は、mavenリポジトリからのDelta Lake 3.1.0依存性を指定します。
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-spark_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    <dependency>
        <groupId>io.delta</groupId>
        <artifactId>delta-contribs_2.12</artifactId>
        <version>3.1.0</version>
    </dependency>
    または、Pythonアプリケーションの場合、Delta Lakeライブラリをパッケージ化し、アプリケーションに提供します。
  2. Spark構成を設定して、Delta Lakeを有効にします:
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
たとえば、Delta Lake 2.0.1をロードするには、次のライブラリをパッケージ化します。
  • delta-core_2.12-2.0.1.jar
  • delta-contribs_2.12-2.0.1.jar
  • delta-storage-2.0.1.jar
また、次のステップを実行します。
  1. JavaまたはScalaアプリケーションの場合、mavenリポジトリからDelta Lake 2.0.1依存関係を指定します。
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.12</artifactId>
      <version>2.0.1</version>
    </dependency> 
    <dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-contribs_2.12</artifactId>
      <version>2.0.1</version>
      </dependency>
    または、Pythonアプリケーションの場合、Delta Lakeライブラリをパッケージ化し、アプリケーションに提供します。
  2. Delta Lakeを有効にするには、Spark構成を設定します:
    spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore
    spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
ノート

アプリケーションの作成またはアプリケーションの実行時に、コンソールで提供される拡張オプションでDelta Lakeを有効にすることもできます。

Delta Lake APIの使用例

データ・フローでのDelta Lake APIの使用例。

データ・フローSparkエンジンでは、デフォルトでdelta形式がサポートされています。Delta Lake APIは、Java、PythonおよびScala言語で使用できます。Delta Lake Python APIを使用している場合は、「データ・フローでのSpark-Submit機能」の説明に従って、カスタムのarchive.zip依存関係パッケージャを使用して、delta-sparkパッケージを使用します。

使用例

JavaまたはScala
spark.read().format("delta").load(<path_to_Delta_table>)
df.write().format("delta").save(<path_to_Delta_table>)
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
Python
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>)
deltaTable.vacuum()
deltaTable.history()
SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`");
spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");

ここでは、データ・フローでDelta Lakeの使用を開始する上で役立つコード例をいくつか示します

GitHubのoracle-samplesで、例を参照できます。