データ・フローとDelta Lake
データ・フローでは、アプリケーションでSpark 3.2.1以降を実行すると、デフォルトでDelta Lakeがサポートされます。
Delta Lakeでは、データ・レイク上にレイクハウス・アーキテクチャを構築できます。Delta Lakeでは、ACIDトランザクションとスケーラブルなメタデータ処理が実現されており、既存のデータ・レイク上にストリーミングとバッチ・データ処理が統合されます。Delta Lake 3.1.0は、データ・フローSpark 3.5.0処理エンジンでサポートされており、データ・フローSpark 3.2.1処理エンジンではDelta Lake 2.0.1および1.2.1がサポートされています。
- データ・フローのSparkバージョンは3.2.1 (以降)である必要があります。
- delta形式を使用します。
ロード・デルタ・レイク
データ・フローで使用するDelta Lakeをロードするには、次のステップに従います。
spark.oracle.deltalake.version
を使用して、使用するDelta Lakeのバージョンを指定します。これを次のいずれかの値に設定します。Sparkバージョン | spark.oracle.deltalake.versionの値 | バイナリがロードされました |
---|---|---|
3.5.0 | 3.1.0 |
Delta Lake 3.1.0 |
3.2.1 | 2.0.1 |
デルタレイク2.0.1 |
3.2.1 | 1.2.1 |
デルタ レイク1.2.1 |
3.5.0, 3.2.1 | none |
Delta Lakeバイナリはロードされません。指定する必要があります。 |
spark.oracle.deltalake.version
の値を設定しない場合、Delta Lake 1.2.1バイナリはデフォルトでロードされます。spark.oracle.deltalake.version
をnone
に設定した場合は、アプリケーションJARの一部としてDelta Lake依存性ライブラリを指定する必要があります。詳細は、Delta Lakeのパブリック・ドキュメンテーションを参照してください。
- delta-storage-3.1.0.jar
- delta-spark_2.12-3.1.0.jar
- delta-contribs_2.12-3.1.0.jar
- JavaまたはScalaアプリケーションの場合は、mavenリポジトリからのDelta Lake 3.1.0依存性を指定します。または、Pythonアプリケーションの場合、Delta Lakeライブラリをパッケージ化し、アプリケーションに提供します。
<dependency> <groupId>io.delta</groupId> <artifactId>delta-spark_2.12</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>3.1.0</version> </dependency>
- Spark構成を設定して、Delta Lakeを有効にします:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
- delta-core_2.12-2.0.1.jar
- delta-contribs_2.12-2.0.1.jar
- delta-storage-2.0.1.jar
- JavaまたはScalaアプリケーションの場合、mavenリポジトリからDelta Lake 2.0.1依存関係を指定します。または、Pythonアプリケーションの場合、Delta Lakeライブラリをパッケージ化し、アプリケーションに提供します。
<dependency> <groupId>io.delta</groupId> <artifactId>delta-core_2.12</artifactId> <version>2.0.1</version> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-contribs_2.12</artifactId> <version>2.0.1</version> </dependency>
- Delta Lakeを有効にするには、Spark構成を設定します:
spark.delta.logStore.oci.impl -> io.delta.storage.OracleCloudLogStore spark.sql.extensions -> io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog -> org.apache.spark.sql.delta.catalog.DeltaCatalog
Delta Lake APIの使用例
データ・フローでのDelta Lake APIの使用例。
データ・フローSparkエンジンでは、デフォルトでdelta
形式がサポートされています。Delta Lake APIは、Java、PythonおよびScala言語で使用できます。Delta Lake Python APIを使用している場合は、「データ・フローでのSpark-Submit機能」の説明に従って、カスタムのarchive.zip依存関係パッケージャを使用して、delta-sparkパッケージを使用します。
使用例
- JavaまたはScala
-
spark.read().format("delta").load(<path_to_Delta_table>) df.write().format("delta").save(<path_to_Delta_table>) val deltaTable = io.delta.tables.DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum()
- Python
-
spark.read.format("delta").option("versionAsOf", 1).load(<path_to_Delta_table>) from delta.tables import * deltaTable = DeltaTable.forPath(spark, <path_to_Delta_table>) deltaTable.vacuum() deltaTable.history()
- SQL
spark.sql("CONVERT TO DELTA parquet.`" + <path_to_Parquet_table> + "`"); spark.sql("DESCRIBE HISTORY delta.`" + <path_to_Delta_table> + "`");
例
ここでは、データ・フローでDelta Lakeの使用を開始する上で役立つコード例をいくつか示します
GitHubのoracle-samplesで、例を参照できます。