データ・フローとデータ・サイエンスの統合

データ・フローを使用すると、データ・フローに対して対話形式でアプリケーションを実行するよう、データ・サイエンス・ノートブックを構成できます。

データ・フローでは、完全管理型のJupyter Notebookを使用して、データ・サイエンティストやデータ・エンジニアが、データ・エンジニアリング・アプリケーションおよびデータ・サイエンス・アプリケーションの作成、ビジュアル化、コラボレーション、デバッグを実行できるようにしています。これらのアプリケーションは、Python、ScalaおよびPySparkで記述できます。また、データ・サイエンス・ノートブック・セッションをデータ・フローに接続して、アプリケーションを実行することも可能です。データ・フロー・カーネルおよびアプリケーションは、Oracle Cloud Infrastructureデータ・フローで実行されます。

Apache Sparkは、データを大規模に処理するよう設計された分散コンピューティング・システムです。大規模なSQL、バッチ、ストリーム処理および機械学習のタスクがサポートされています。Spark SQLでは、データベースのようなサポートが行われます。構造化データを問い合せるには、Spark SQLを使用します。ANSI標準のSQL実装です。

データ・フロー・セッションでは、データ・フロー・クラスタの自動スケーリング機能がサポートされています。詳細は、データ・フローのドキュメントの「自動スケーリング」を参照してください。

データ・フロー・セッションでは、カスタマイズ可能なSparkランタイム環境として、conda環境の使用がサポートされています。

図1. データ・フローに対するデータ・サイエンス・ノートブック
データ・サイエンス・ノートブックは、データ・フロー・マジックを使用してデータ・フローにリクエストを送信し、NotebookSession APIを使用してデータ・フロー・サーバーでSparkコードを実行します。
制限事項
  • データ・フロー・セッションの存続期間は、最大7日間または10,080分(maxDurationInMinutes)です。

  • データ・フロー・セッションのデフォルトのアイドル・タイムアウト値(idleTimeoutInMinutes)は480分(8時間)です。別の値を構成できます。
  • データ・フロー・セッションを使用できるのは、データ・サイエンス・ノートブック・セッションを介した場合のみです。
  • サポートされているのは、Sparkバージョン3.5.0および3.2.1のみです。
ヒント

データ・フロー・スタジオでのデータ・サイエンスの使用に関するチュートリアル・ビデオをご覧ください。また、データ・サイエンスとデータ・フローの統合の詳細は、Oracle Accelerated Data Science SDKのドキュメントも参照してください。

Conda環境のインストール

データ・フロー・マジックとデータ・フローを使用するには、次のステップを実行します。

  1. データ・サイエンスでノートブック・セッションを作成するか開きます
    • ノートブック・セッションは、テナンシでサービスが有効になっているリージョンに存在する必要があります。
    • ノートブック・セッションは、ノートブック・セッションの動的グループのコンパートメントに存在する必要があります。
  2. ノートブック・セッションでpyspark32_p38_cpu_v3 conda環境をインストールしてアクティブ化します:
    copy odsc conda install -s pyspark32_p38_cpu_v3
    ノート

    更新されたconda環境pyspark32_p38_cpu_v3をインストールすることをお薦めします。この環境には、最大タイムアウト期間を超えたために停止したセッションを自動的に再起動するサポートが含まれているためです。

    以前の安定したconda環境はpyspark32_p38_cpu_v2でした。

  3. pyspark32_p38_cpu_v3 conda環境をアクティブ化します。
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

データ・フローおよびデータ・サイエンスの使用

データ・フローとデータ・サイエンスを組み合せて使用してアプリケーションを実行するには、次のステップを実行します。

  • データ・フローでノートブックを使用するようポリシーが設定されていることを確認します。

  • データ・サイエンス・ポリシーが正しく設定されていることを確認します。

  • サポートされているすべてのコマンドのリストを表示するには、%helpコマンドを使用します。
  • 次のステップのコマンドは、Spark 3.5.0とSpark 3.2.1の両方に適用されます。例では、Spark 3.5.0が使用されています。使用するSparkのバージョンに従って、sparkVersionの値を設定します。
  1. ADSで認証を設定します。
    • ADS SDKは、データ・フロー・マジックで使用される認証タイプを制御するために使用されます。
    • デフォルトでは、API KEY認証が使用されます。認証タイプを変更するには、ads.set_auth("resource_principal")コマンドを使用します:
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. データ・フロー・マジック拡張のロード
    %load_ext dataflow.magics
  3. (オプション)magicコマンド%create_sessionを使用して、データ・フロー・セッションを作成します:
    共通セッション
    次の例は、フレキシブル・シェイプで新しいセッションを作成する方法を示しています:
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    アーカイブURIを使用したセッション
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    カスタムconda環境を使用したセッション
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    メタストアを使用したセッション
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (オプション)既存のデータ・フロー・セッションを使用します:
    %use_sessionコマンドを使用します。コンソールからOCIDをコピーします。
    %use_session -s <session_OCID> -r <region_name>
  5. %config_sessionコマンドを使用してセッションを構成します:
    • 現在の構成を確認するには:
      %config
    • ドライバおよびエグゼキュータを変更するには:
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • 自動スケーリングを適用するには:
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      セッションの作成時に、自動スケーリング・ポリシーを適用することもできます。
  6. (オプション)既存のセッションをアクティブ化するには、%activate_sessionコマンドを使用します:
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. セッションを停止するには、%stop_sessionコマンドを使用します:
    %stop_session

Conda環境でのデータ・フローSpark環境のカスタマイズ

公開されているconda環境をランタイム環境として使用できます。

  1. ノートブック・セッションに、Spark 3.5.0およびデータ・フローをインストールします:
    odsc conda install -s pyspark32_p38_cpu_v3
  2. condaを使用してライブラリをインストールします。
  3. conda環境を公開します。
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. 次のようにして、データ・フロー・クラスタを起動します:
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    ノート

    セッション構成には、次のパラメータが含まれている必要があります:
    • "sparkVersion":"3.5.0"
    • "language":"PYTHON"
    • オブジェクト・ストレージのconda環境へのspark.archivesパスを含む"configuration"

データ・フローでのspark-nlpの実行

Spark-nlpをインストールし、データ・フローで実行するには、次のステップに従います。

Conda環境を使用したデータ・フローSpark環境のカスタマイズのステップ1および2を完了している必要があります。spark-nlpライブラリは、pyspark32_p38_cpu_v2 conda環境に事前にインストールされています。

  1. 事前にトレーニング済のspark-nlpモデルおよびパイプラインをインストールします。

    事前にトレーニング済のspark-nlpモデルが必要な場合は、モデルをダウンロードして、conda環境フォルダに解凍します。データ・フローでは、パブリック・インターネットへのエグレスがまだサポートされていません。データ・フローのAWS S3から、事前にトレーニング済のモデルを動的にダウンロードすることはできません。

    事前にトレーニング済のモデルは、zipアーカイブとしてモデル・ハブからダウンロードできます。モデルはconda環境フォルダに解凍されます。サンプル・モデルは、https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.htmlです:
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. 「Conda環境でのデータ・フローSpark環境のカスタマイズ」のステップ3を参照して、conda環境を公開します。
  3. データ・フロー・クラスタを起動します。

    ノートブック・セッションpyspark30_p37_cpu_v5カーネルで実行されているノートブック・セルで、次を再確認してください

    spark.jars.packagesパラメータ。インストールしたspark-nlpのバージョンが反映されます。
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. spark-nlp GitHubリポジトリのコードのスニペットを使用してテストします。
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    ノートブック・セルには次の出力があります。
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

次に、データFlowMagicの使用例を示します。

PySpark

変数scはSparkを表し、%%spark magicコマンドを使用すると利用可能になります。次のセルは、データFlowMagicセルでscを使用する方法を大まかに表した例です。セルでは、番号のリストからRDDnumbersを作成する.parallelize()メソッドがコールされています。RDDに関する情報が出力されます。.toDebugString()メソッドにより、RDDの説明が返されます。
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

-c SQLオプションを使用すると、セルでSpark SQLコマンドを実行できます。この項では、citi bikeデータセットを使用します。次のセルでは、データセットをSparkデータフレームに読み取り、表として保存します。この例は、Spark SQLの表示に使用されます。

Citibikeデータセットが、オブジェクト・ストレージにアップロードされます。レルムでも同様の操作が必要な場合があります。
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

次の例では、-c sqlオプションを使用して、セルのコンテンツがSparkSQLであることをデータFlowMagicに通知します。-o <variable>オプションは、Spark SQL操作の結果を取得し、定義された変数に格納します。この場合、

df_bike_tripsは、ノートブックで使用できるPandasデータフレームです。
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
データの最初の数行を出力します:
df_bike_trips.head()
同様に、sqlContextを使用して表を問い合せることができます:
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
最後に、表を説明します:
%%spark -c sql
SHOW TABLES

自動ビジュアライゼーション・ウィジェット

データFlowMagicには、パンダ・データフレームをビジュアル化できるautovizwidgetが付属しています。display_dataframe()関数では、Pandasデータフレームがパラメータとして取得され、ノートブックに対話型のGUIが生成されます。このタブには、データの可視化が表、円グラフ、散布図、棒グラフなど様々な形式で表示されます。

次のセルでは、ノートブックのSpark SQLセクションで作成されたdf_peopleデータフレームを使用してdisplay_dataframe()がコールされます:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

データ・サイエンティストが行う一般的なタスクは、データのビジュアル化です。データセットが大きい場合、通常は不可能であるため、ほとんどの場合、データ・フローSparkクラスタからノートブック・セッションにデータをプルすることはお薦めしません。この例では、サーバー側のリソースを使用してプロットを生成し、それをノートブックに含める方法を証明します。

df_bike_tripsデータフレームはセッションで定義され、再利用されます。Matplotlibを生成するには、必要なライブラリを含めてプロットを生成します。%matplot plt magicコマンドを使用すると、サーバー側でレンダリングされていても、ノートブックのプロットを表示できます:
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

詳細な例

データ・フローおよびデータ・サイエンス・サンプルに関するその他の例は、GitHubを参照してください。