このページは機械翻訳したものです。

Sparkストリーミングの開始

データ・フローでSparkストリーミングを使用する前に、設定が必要です。

Apache Sparkでは、バッチ処理、ストリーム処理および機械学習が1つのAPIに統合されます。データ・フローでは、標準のApache Sparkランタイム内でSparkアプリケーションが実行されます。ストリーミング・アプリケーションを実行すると、データ・フローは別のランタイムを使用せず、別の方法でSparkアプリケーションを実行します:
ストリーミング実行と非ストリーミング実行の違い
相違 非ストリーミング実行 ストリーミング実行
認証 リクエスト元のユーザーのOn-Behalf-Of (OBO)トークンが使用されます。OBOトークンは24時間後に期限切れになるため、長時間実行されるジョブには適していません。 実行のリソース・プリンシパルに関連付けられたセッション・トークンを使用して、Oracle Cloud Infrastructureオブジェクト・ストレージにアクセスします。長時間実行するジョブに適しています。
再起動ポリシー Sparkランタイムの終了コードが0以外の場合は失敗します。 Sparkランタイムの終了コードが0以外の場合は、最大10回再起動します。
パッチ適用ポリシー ジョブの存続時間が24時間未満と想定されるため、パッチ適用ポリシーはありません。 自動月次パッチ。
  1. Sparkストリーミング・アプリケーションを作成します。
    アプリケーションが実行されると、リソース・プリンシパル認証が使用され、パッチ適用と再起動は自動的に行われます。
  2. Sparkストリーミングのポリシーの設定
    Sparkストリーミング・アプリケーションでは、Oracle Cloud Infrastructureリソースに対して、リソース・プリンシパル・セッション・トークンを使用して認証されるため、これらのリソースにアクセスするには、アプリケーションを認可するIAMポリシーを作成する必要があります。データ・フロー実行はオンデマンドで開始されるため、実行が開始されるまで割り当てられないため、IAMポリシーで実行OCIDを使用できません。かわりに、実行のリソースを永続リソースに接続し、IAMポリシーでそれを参照します。これを実行する最も一般的な方法は、次の2つです:
    親アプリケーションID
    データ・フロー実行を作成したデータ・フロー・アプリケーションに接続し、データ・フロー・アプリケーションIDをIAMポリシーに指定します。特定のアプリケーションの権限を設定するには、そのアプリケーションから開始されたすべての実行に一致する動的グループを作成し、動的グループがIAMリソースにアクセスすることを認可します。各実行には、親アプリケーションとその実行を関連付けるタグがあります。このタグは、動的グループの一致ルールに使用できます。
    ノート

    このタグはIAMの任意のユーザー・ポリシーでは使用できないため、動的グループを作成する必要があります。
    たとえば、IDがocid1.dataflowapplication.oc1.iad.Aのデータ・フロー・アプリケーションがある場合は、次のような動的グループを作成します:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    ポリシーは次のとおりです:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    ターゲット・コンパートメントID

    データ・フロー実行を作成したコンパートメントに接続し、IAMポリシーにコンパートメントIDを指定します。このアプローチは、コンパートメントで実行されているすべてのSparkアプリケーションが、これらのリソースにアクセスできるため、あまり限定的ではありません。CLIを介してspark-submitを使用する場合は、アプリケーションIDと実行IDの両方がオンデマンドであるため、この方法を使用する必要があります。

    たとえば、IDがocid1.tenancy.oc1.Cのコンパートメントに、IDがocid1.dataflowrun.oc1.iad.R2の実行がある場合、ポリシーは次のようになります:
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Oracle Cloud Infrastructure Streamingへの接続

Oracle Cloud Infrastructure Streamingへの接続方法を学習します。

ストリーミングの設定:
  • Oracle Streaming Serviceを設定し、ストリームを作成します。
    ノート

    Oracle Streaming Serviceには次の制限があります:
    • ストリーム内のメッセージは、24時間以上7日間まで保持されます。
    • ストリーム内のすべてのメッセージは、保持期間が経過すると、メッセージが読まれたかどうかに関係なく削除されます。
    • ストリームは、ストリームの作成後は変更できません。
    • テナンシのデフォルトの制限は、ライセンスに応じて0または5パーティションです。パーティションを増やす必要がある場合は、サービス制限の引上げをリクエストできます。
    • ストリームの作成後にストリームのパーティション数を変更することはできません。
    • 1つのストリームで最大50個のコンシューマ・グループを読み取ることができます。
    • 各パーティションで書き込めるデータは合計1MB/秒です。データの書込み制限を超えていない場合、PUTリクエスト数に制限はありません。
    • 各パーティションのGETリクエストは、コンシューマ・グループごとに1秒当たり5つです。1つのストリームでサポートできるコンシューマ・グループは最大50で、1つのストリーム内の1つのパーティションを読み取れるのは、コンシューマ・グループ内の1つのコンシューマのみであるため、1つのパーティションでサポートできるGETリクエストは1秒当たり最大250です。
    • プロデューサがストリームに公開できるのは、1MB以下のメッセージです。
    • リクエストは1MB以下にする必要があります。リクエストのサイズは、Base64からデコードされた後のキーとメッセージの合計です。
  • ストリーミング・ポリシーをデータ・フローに追加します。
JavaまたはPythonを使用してKafkaに接続します。次の2つのいずれかの方法で認証します:
  • プレーン・パスワードまたは認証トークンを使用します。この方法は、環境間のクイック・テストに適しています。たとえば、Spark構造化ストリーミング・アプリケーションのプロトタイピングでは、アプリケーションをローカルで、Oracle Streaming Serviceに対してデータ・フローで実行します。
    ノート

    アプリケーション引数のパスワードのハードコーディング(公開)は安全ではないため、本番実行にはこの方法を使用しないでください。
  • リソース・プリンシパル認証は、プレーン・パスワードや認証トークンよりも安全です。Oracle Streaming Serviceで、より柔軟に認証を行えます。リソース・プリンシパル認証を使用するには、ストリーミング・ポリシーを設定します。

Javaサンプル・アプリケーションおよびPythonサンプル・アプリケーションを使用できます。

  1. Kafkaへの接続に使用するストリーム・プールを探します。
    1. 「ホーム」をクリックします。
    2. 「ストリーミング」をクリックします。
    3. 「ストリーム・プール」をクリックします。
    4. 使用するストリーム・プールをクリックして、詳細を確認します。
    5. 「Kafka接続設定」をクリックします。
    6. 次の情報をコピーします:
      • ストリーム・プールOCID
      • ブートストラップ・サーバー
      • 接続文字列
      • セキュリティ・プロトコル(例: SASL_SSL)
      • セキュリティ・メカニズム(例: PLAIN)
      ノート

      接続文字列のパスワードがAUTH_TOKENに設定されている場合、認証トークンを作成するか、ユーザー名(username="<tenancy>/<username>/<stream_pool_id>")で指定されたユーザーの既存の認証トークン(password="<auth_token>")を使用します:
      1. 「アイデンティティ」をクリックします。
      2. 「ユーザー」をクリックします。
      3. ユーザーについて、ユーザーの詳細を表示します。
      4. 認証トークンを作成するか、既存の認証トークンを使用します。
  2. Sparkは、デフォルトではKafka統合ライブラリにバインドされないため、Sparkアプリケーションの依存関係の一部として追加する必要があります。
    • SBTまたはMavenプロジェクト定義を使用するJavaやScalaのアプリケーションの場合は、アプリケーションと次のアーティファクトをリンクします:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      ノート

      ヘッダー機能を使用するには、Kafkaクライアントのバージョンが0.11.0.0以上であることが必要です。
    • Pythonアプリケーションの場合は、アプリケーションのデプロイ時に、Kafka統合ライブラリと依存関係を追加します。
    • データ・フロー・リソース・プリンシパル認証を使用する場合は、次のアーティファクトが必要です:
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. システムを構成します。
    Kafka接続の動作は、サーバー、認証、トピック、グループなどのシステムを構成することで制御されます。構成は強力であり、1つの値変更がシステム全体に大きく影響します。
    共通構成
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    フェッチ・レート制限の詳細は、ストリーミング・リソースの制限を参照してください。
    ノート

    後で再起動すると、startingOffsetsで指定された場所ではなく、最後のチェックポイントから続行されます。その他のオプションについては、『Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)』を参照してください。

    failOnDataLossは、Oracle Streamingから削除されたためにデータをフェッチできないときに使用するストリーミング・アプリケーションを指定します。

    詳細構成

    SparkストリーミングKafka統合ガイドを参照してください。

    サンプル構成
    プレーン・パスワード:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    リソース・プリンシパル:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Kafkaに接続します。
    サンプル接続。
    Oracle Cloud Infrastructure Streamingのリソース・プリンシパルを使用したJava
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    プレーン・パスワードを使用したJava
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    ノート

    Oracle Cloud Infrastructureストリーミングのリソース・プリンシパルでPythonを使用するには、archive.zipを使用する必要があります。詳細は、「データ・フローでのSpark-Submit機能」の項を参照してください。

プライベート・サブネットのストリーミング・ソースへの接続

プライベート・サブネットのストリーミング・ソースに接続するには、次のステップに従います。

ストリーミング・ソースFDQNをコピーして、データ・フロー・プライベート・エンドポイントの作成に使用されるプライベート・サブネット内のVNIC間のトラフィックを許可します。ストリーミング・ソースがデータ・フロー・プライベート・エンドポイントとは異なるサブネットにある場合は、ストリーミング・サブネットとデータ・フロー・プライベート・エンドポイント・サブネット間のトラフィックを許可します。

  1. プライベート・エンドポイントを含むストリーミング・プールを作成します。
    詳細は、ストリーミングのドキュメントを参照してください。
  2. ストリーム・プールの詳細を表示し、FDQNの値をコピーします。
  3. プライベート・エンドポイントを編集し、制御するDNSゾーンの値を前のステップでコピーしたストリーム・プールFDQNの値に置き換えます。
  4. ストリーミング・アプリケーションにプライベート・エンドポイントをアタッチします。
  5. アプリケーションを実行します。