Sparkストリーミングの開始

データ・フローでSparkストリーミングを使用する前に、設定が必要です。

Apache Sparkでは、バッチ処理、ストリーム処理および機械学習が1つのAPIに統合されます。データ・フローでは、標準のApache Sparkランタイム内でSparkアプリケーションが実行されます。ストリーミング・アプリケーションを実行すると、データ・フローでは他のランタイムは使用せず、別の方法でSparkアプリケーションが実行されます:
ストリーミング実行と非ストリーミング実行の違い
相違 非ストリーミング実行 ストリーミング実行
認証 リクエスト元のユーザーのOn-Behalf-Of (OBO)トークンが使用されます。OBOトークンは24時間後に期限切れになるため、長時間実行されるジョブには適していません。 実行のリソース・プリンシパルに関連付けられたセッション・トークンを使用して、Oracle Cloud Infrastructureオブジェクト・ストレージにアクセスします。長時間実行するジョブに適しています。
再起動ポリシー Sparkランタイムの終了コードが0以外の場合は失敗します。 Sparkランタイムの終了コードが0以外の場合は、最大10回再起動します。
パッチ適用ポリシー ジョブの存続時間が24時間未満と想定されるため、パッチ適用ポリシーはありません。 自動月次パッチ。
  1. Sparkストリーミング・アプリケーションを作成します。
    アプリケーションが実行されると、リソース・プリンシパル認証が使用され、パッチ適用と再起動は自動的に行われます。
  2. Sparkストリーミングのポリシーの設定
    Sparkストリーミング・アプリケーションでは、Oracle Cloud Infrastructureリソースに対して、リソース・プリンシパル・セッション・トークンを使用した認証が行われるため、OCIリソースにアクセスするには、アプリケーションを認可するIAMポリシーを作成する必要があります。データ・フロー実行はオンデマンドで起動されるため、実行が開始されるまで割り当てられないIAMポリシーで、実行OCIDを使用することはできません。かわりに、実行のリソースを永続リソースに接続し、IAMポリシーでそれを参照します。これを実行する最も一般的な方法は、次の2つです:
    親アプリケーションID
    データ・フロー実行を作成したデータ・フロー・アプリケーションに接続し、データ・フロー・アプリケーションIDをIAMポリシーに指定します。特定のアプリケーションの権限を設定する場合は、そのアプリケーションから起動されたすべての実行に一致する動的グループを作成し、動的グループがIAMリソースにアクセスすることを認可します。各実行には、親アプリケーションとその実行を関連付けるタグがあります。このタグは、動的グループの一致ルールに使用できます。
    ノート

    このタグはIAMの任意のユーザー・ポリシーでは使用できないため、動的グループを作成する必要があります。
    たとえば、IDがocid1.dataflowapplication.oc1.iad.Aのデータ・フロー・アプリケーションがある場合は、次のような動的グループを作成します:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    ポリシーは次のとおりです:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    ターゲット・コンパートメントID

    データ・フロー実行を実行が作成されたコンパートメントに接続し、IAMポリシーにコンパートメントIDを指定します。コンパートメントで実行されているすべてのSparkアプリケーションが、これらのリソースにアクセスできるため、この方法はあまり限定的ではありません。CLIを介してspark-submitを使用する場合は、アプリケーションIDと実行IDの両方がオンデマンドであるため、この方法を使用する必要があります。

    たとえば、IDがocid1.tenancy.oc1.Cのコンパートメントに、IDがocid1.dataflowrun.oc1.iad.R2の実行がある場合、ポリシーは次のようになります:
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Oracle Cloud Infrastructure Streamingへの接続

Oracle Cloud Infrastructure Streamingへの接続方法を学習します。

ストリーミングの設定:
  • Oracle Streaming Serviceを設定し、ストリームを作成します。
    ノート

    Oracle Streaming Serviceには次の制限があります:
    • ストリーム内のメッセージは、24時間以上7日間まで保持されます。
    • ストリーム内のすべてのメッセージは、保持期間が経過すると、メッセージが読まれたかどうかに関係なく削除されます。
    • ストリームの作成後は、ストリームの保持期間を変更できません。
    • テナンシのデフォルトの制限は、ライセンスに応じて0または5パーティションです。パーティションを増やす必要がある場合は、サービス制限の引上げをリクエストできます。
    • ストリームの作成後にストリームのパーティション数を変更することはできません。
    • 1つのストリームで最大50個のコンシューマ・グループを読み取ることができます。
    • 各パーティションで書き込めるデータは合計1MB/秒です。データの書込み制限を超えていない場合、PUTリクエスト数に制限はありません。
    • 各パーティションのGETリクエストは、コンシューマ・グループごとに1秒当たり5つです。1つのストリームでサポートできるコンシューマ・グループは最大50で、1つのストリーム内の1つのパーティションを読み取れるのは、コンシューマ・グループ内の1つのコンシューマのみであるため、1つのパーティションでサポートできるGETリクエストは1秒当たり最大250です。
    • プロデューサがストリームに公開できるのは、1MB以下のメッセージです。
    • リクエストは1MB以下にする必要があります。リクエストのサイズは、Base64からデコードされた後のキーとメッセージの合計です。
  • ストリーミング・ポリシーをデータ・フローに追加します。
JavaまたはPythonを使用してKafkaに接続します。次の2つのいずれかの方法で認証します:
  • プレーン・パスワードまたは認証トークンを使用します。この方法は、環境間のクイック・テストに適しています。たとえば、Spark構造化ストリーミング・アプリケーションのプロトタイピングでは、アプリケーションをローカルで、Oracle Streaming Serviceに対してデータ・フローで実行します。
    ノート

    アプリケーション引数のパスワードのハードコーディング(公開)は安全ではないため、本番実行にはこの方法を使用しないでください。
  • リソース・プリンシパル認証は、プレーン・パスワードや認証トークンよりも安全です。Oracle Streaming Serviceで、より柔軟に認証を行えます。リソース・プリンシパル認証を使用するには、ストリーミング・ポリシーを設定します。

Javaサンプル・アプリケーションおよびPythonサンプル・アプリケーションを使用できます。

  1. Kafkaへの接続に使用するストリーム・プールを探します。
    1. 「ホーム」をクリックします。
    2. 「ストリーミング」をクリックします。
    3. 「ストリーム・プール」をクリックします。
    4. 使用するストリーム・プールをクリックして、詳細を確認します。
    5. 「Kafka接続設定」をクリックします。
    6. 次の情報をコピーします:
      • ストリーム・プールOCID
      • ブートストラップ・サーバー
      • 接続文字列
      • セキュリティ・プロトコル(例: SASL_SSL)
      • セキュリティ・メカニズム(例: PLAIN)
      ノート

      接続文字列のパスワードがAUTH_TOKENに設定されている場合、認証トークンを作成するか、ユーザー名(username="<tenancy>/<username>/<stream_pool_id>")で指定されたユーザーの既存の認証トークン(password="<auth_token>")を使用します:
      1. 「アイデンティティ」をクリックします。
      2. 「ユーザー」をクリックします。
      3. ユーザーについて、ユーザーの詳細を表示します。
      4. 認証トークンを作成するか、既存の認証トークンを使用します。
  2. Sparkは、デフォルトではKafka統合ライブラリにバインドされないため、Sparkアプリケーションの依存関係の一部として追加する必要があります。
    • SBTまたはMavenプロジェクト定義を使用するJavaやScalaのアプリケーションの場合は、アプリケーションと次のアーティファクトをリンクします:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      ノート

      ヘッダー機能を使用するには、Kafkaクライアントのバージョンが0.11.0.0以上であることが必要です。
    • Pythonアプリケーションの場合は、アプリケーションのデプロイ時に、Kafka統合ライブラリと依存関係を追加します。
    • データ・フロー・リソース・プリンシパル認証を使用する場合は、次のアーティファクトが必要です:
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. システムを構成します。
    Kafka接続の動作は、サーバー、認証、トピック、グループなどのシステムを構成することで制御されます。構成はシンプルで強力であり、1つの値変更がシステム全体に大きな影響を及ぼします。
    共通構成
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    フェッチ・レート制限の詳細は、ストリーミング・リソースの制限を参照してください。
    ノート

    後続の再起動は、startingOffsetsで指定された場所ではなく、最後のチェックポイントから続行されます。その他のオプションについては、『Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)』を参照してください。

    failOnDataLossは、Oracle Streamingから削除されたためにデータをフェッチできないときに使用するストリーミング・アプリケーションを指定します。

    詳細構成

    SparkストリーミングKafka統合ガイドを参照してください。

    サンプル構成
    プレーン・パスワード:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    リソース・プリンシパル:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Kafkaに接続します。
    サンプル接続。
    Oracle Cloud Infrastructure Streamingのリソース・プリンシパルを使用したJava
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    プレーン・パスワードを使用したJava
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    ノート

    Oracle Cloud Infrastructure Streamingのリソース・プリンシパルでPythonを使用するには、archive.zipを使用する必要があります。詳細は、「データ・フローでのSpark-Submit機能」の項を参照してください。
サンプルJavaアプリケーション

これは、データ・フロー用のサンプルJavaアプリケーションです。

package example;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class StructuredKafkaWordCount {

  public static void main(String[] args) throws Exception {

    Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
    log.info("Started StructuredKafkaWordCount");

    Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
      log.error("Exception uncaught: ", e);
    });

    //Uncomment following line to enable debug log level.
    //Logger.getRootLogger().setLevel(Level.DEBUG);

    if (args.length < 4) {
      printUsage();
    }

    String bootstrapServers = args[0];
    String topics = args[1];
    String checkpointLocation = args[2];
    String type = args[3];
    String outputLocation = null;

    switch (type) {
      case "console":
        System.err.println("Using console output sink");
        break;

      case "csv":
        if (args.length < 5) {
          printUsage();
        }
        outputLocation = args[4];
        System.err.println("Using csv output sink, output location = " + outputLocation);
        break;

      default:
        printUsage();
    }

    SparkSession spark;

    SparkConf conf = new SparkConf();
    if (conf.contains("spark.master")) {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    } else {
      spark = SparkSession.builder()
          .appName("StructuredKafkaWordCount")
          .master("local[*]")
          .config("spark.sql.streaming.minBatchesToRetain", "10")
          .config("spark.sql.shuffle.partitions", "1")
          .config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
          .getOrCreate();
    }

    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config",
            "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
        .selectExpr("CAST(value AS STRING)");

    // Split the lines into timestamp and words
    StructType wordsSchema = StructType$.MODULE$.apply(
        new StructField[]{
            StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
            StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
        }
    );
    ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
    final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");

    Dataset<Row> words = lines
        .flatMap(
            (FlatMapFunction<Row, Row>) row -> {
              // parse Kafka record in format: "timestamp(iso8601) text"
              String text = row.getString(0);
              String timestampString = text.substring(0, 25);
              String message = text.substring(26);
              Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());

              return Arrays.asList(message.split(" ")).stream()
                  .map(word -> RowFactory.create(timestamp, word)).iterator();
            }
            , encoder);

    // Time window aggregation
    Dataset<Row> wordCounts = words
        .withWatermark("timestamp", "1 minutes")
        .groupBy(
            functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
            functions.col("value")
        )
        .count()
        .selectExpr("CAST(window.start AS timestamp) AS START_TIME",
            "CAST(window.end AS timestamp) AS END_TIME",
            "value AS WORD", "CAST(count AS long) AS COUNT");

    wordCounts.printSchema();

    // Reducing to a single partition
    wordCounts = wordCounts.coalesce(1);

    // Start streaming query
    StreamingQuery query = null;
    switch (type) {
      case "console":
        query = outputToConsole(wordCounts, checkpointLocation);
        break;
      case "csv":
        query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
        break;
      default:
        System.err.println("Unknown type " + type);
        System.exit(1);
    }

    query.awaitTermination();
  }

  private static void printUsage() {
    System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
        "<subscribe-topics> <checkpoint-location> <type> ...");
    System.err.println("<type>: console");
    System.err.println("<type>: csv <output-location>");
    System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
    System.exit(1);
  }

  private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
      throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("console")
        .outputMode("complete")
        .option("checkpointLocation", checkpointLocation)
        .start();
  }

  private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
      String outputLocation) throws TimeoutException {
    return wordCounts
        .writeStream()
        .format("csv")
        .outputMode("append")
        .option("checkpointLocation", checkpointLocation)
        .trigger(Trigger.ProcessingTime("1 minutes"))
        .option("path", outputLocation)
        .start();
  }
}
サンプルPythonアプリケーション

これは、データ・フローのサンプルPythonアプリケーションです。

#!/usr/bin/env python3
 
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
  substring, to_timestamp, explode, split, length
 
import argparse
import os
 
 
def main():
  parser = argparse.ArgumentParser()
  parser.add_argument('--auth-type', default='PLAIN')
  parser.add_argument('--bootstrap-port', default='9092')
  parser.add_argument('--bootstrap-server')
  parser.add_argument('--checkpoint-location')
  parser.add_argument('--encryption', default='SASL_SSL')
  parser.add_argument('--ocid')
  parser.add_argument('--output-location')
  parser.add_argument('--output-mode', default='file')
  parser.add_argument('--stream-password')
  parser.add_argument('--raw-stream')
  parser.add_argument('--stream-username')
  args = parser.parse_args()
 
  if args.bootstrap_server is None:
    args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
  if args.raw_stream is None:
    args.raw_stream = os.environ.get('RAW_STREAM')
  if args.stream_username is None:
    args.stream_username = os.environ.get('STREAM_USERNAME')
  if args.stream_password is None:
    args.stream_password = os.environ.get('STREAM_PASSWORD')
 
  assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
  assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
  assert args.output_location is not None, "Output location (--output-location) must be set"
  assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
 
  spark = (
    SparkSession.builder
      .appName('StructuredKafkaWordCount')
      .config('failOnDataLoss', 'true')
      .config('spark.sql.streaming.minBatchesToRetain', '10')
      .config('spark.sql.shuffle.partitions', '1')
      .config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
      .getOrCreate()
  )
 
  # Uncomment following line to enable debug log level.
  # spark.sparkContext.setLogLevel('DEBUG')
 
  # Configure Kafka connection settings.
  if args.ocid is not None:
    jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
    args.auth_type = 'OCI-RSA-SHA256'
  else:
    jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
 
  raw_kafka_options = {
    'kafka.sasl.jaas.config': jaas_template.format(
      username=args.stream_username, password=args.stream_password,
      ocid=args.ocid
    ),
    'kafka.sasl.mechanism': args.auth_type,
    'kafka.security.protocol': args.encryption,
    'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
                                              args.bootstrap_port),
    'subscribe': args.raw_stream,
    'kafka.max.partition.fetch.bytes': 1024 * 1024,
    'startingOffsets': 'latest'
  }
 
  # Reading raw Kafka stream.
  raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
 
  # Cast raw lines to a string.
  lines = raw.selectExpr('CAST(value AS STRING)')
 
  # Split value column into timestamp and words columns.
  parsedLines = (
    lines.select(
      to_timestamp(substring('value', 1, 25))
        .alias('timestamp'),
      lines.value.substr(lit(26), length('value') - 25)
        .alias('words'))
  )
 
  # Split words into array and explode single record into multiple.
  words = (
    parsedLines.select(
      col('timestamp'),
      explode(split('words', ' ')).alias('word')
    )
  )
 
  # Do time window aggregation
  wordCounts = (
    words
      .withWatermark('timestamp', '1 minutes')
      .groupBy('word', window('timestamp', '1 minute'))
      .count()
      .selectExpr('CAST(window.start AS timestamp) AS START_TIME',
                  'CAST(window.end AS timestamp) AS END_TIME',
                  'word AS WORD',
                  'CAST(count AS long) AS COUNT')
  )
 
  # Reduce partitions to a single.
  wordCounts = wordCounts.coalesce(1)
 
  wordCounts.printSchema()
 
  # Output it to the chosen channel.
  if args.output_mode == 'console':
    print("Writing aggregates to console")
    query = (
      wordCounts.writeStream
        .option('checkpointLocation', args.checkpoint_location)
        .outputMode('complete')
        .format('console')
        .option('truncate', False)
        .start()
    )
  else:
    print("Writing aggregates to Object Storage")
    query = (
      wordCounts.writeStream
        .format('csv')
        .outputMode('append')
        .trigger(processingTime='1 minutes')
        .option('checkpointLocation', args.checkpoint_location)
        .option('path', args.output_location)
        .start()
    )
 
  query.awaitTermination()
 
 
main()

プライベート・サブネットのストリーミング・ソースへの接続

プライベート・サブネットのストリーミング・ソースに接続するには、次のステップに従います。

ストリーミング・ソースFDQNをコピーして、データ・フロー・プライベート・エンドポイントの作成に使用されるプライベート・サブネット内のVNIC間のトラフィックを許可します。ストリーミング・ソースがデータ・フロー・プライベート・エンドポイントとは異なるサブネットにある場合は、ストリーミング・サブネットとデータ・フロー・プライベート・エンドポイント・サブネット間のトラフィックを許可します。

  1. プライベート・エンドポイントを含むストリーミング・プールを作成します。
    詳細は、ストリーミングのドキュメントを参照してください。
  2. ストリーム・プールの詳細を表示し、FDQNの値をコピーします。
  3. プライベート・エンドポイントを編集し、制御するDNSゾーンの値を、前のステップでコピーしたストリーム・プールFDQNの値に置き換えます。
  4. ストリーミング・アプリケーションにプライベート・エンドポイントをアタッチします。
  5. アプリケーションを実行します。