Sparkストリーミングの開始
データ・フローでSparkストリーミングを使用する前に、設定が必要です。
Apache Sparkでは、バッチ処理、ストリーム処理および機械学習が1つのAPIに統合されます。データ・フローでは、標準のApache Sparkランタイム内でSparkアプリケーションが実行されます。ストリーミング・アプリケーションを実行すると、データ・フローでは他のランタイムは使用せず、別の方法でSparkアプリケーションが実行されます:
相違 | 非ストリーミング実行 | ストリーミング実行 |
---|---|---|
認証 | リクエスト元のユーザーのOn-Behalf-Of (OBO)トークンが使用されます。OBOトークンは24時間後に期限切れになるため、長時間実行されるジョブには適していません。 | 実行のリソース・プリンシパルに関連付けられたセッション・トークンを使用して、Oracle Cloud Infrastructureオブジェクト・ストレージにアクセスします。長時間実行するジョブに適しています。 |
再起動ポリシー | Sparkランタイムの終了コードが0以外の場合は失敗します。 | Sparkランタイムの終了コードが0以外の場合は、最大10回再起動します。 |
パッチ適用ポリシー | ジョブの存続時間が24時間未満と想定されるため、パッチ適用ポリシーはありません。 | 自動月次パッチ。 |
Oracle Cloud Infrastructure Streamingへの接続
Oracle Cloud Infrastructure Streamingへの接続方法を学習します。
ストリーミングの設定:
- Oracle Streaming Serviceを設定し、ストリームを作成します。ノート
Oracle Streaming Serviceには次の制限があります:- ストリーム内のメッセージは、24時間以上7日間まで保持されます。
- ストリーム内のすべてのメッセージは、保持期間が経過すると、メッセージが読まれたかどうかに関係なく削除されます。
- ストリームの作成後は、ストリームの保持期間を変更できません。
- テナンシのデフォルトの制限は、ライセンスに応じて0または5パーティションです。パーティションを増やす必要がある場合は、サービス制限の引上げをリクエストできます。
- ストリームの作成後にストリームのパーティション数を変更することはできません。
- 1つのストリームで最大50個のコンシューマ・グループを読み取ることができます。
- 各パーティションで書き込めるデータは合計1MB/秒です。データの書込み制限を超えていない場合、PUTリクエスト数に制限はありません。
- 各パーティションのGETリクエストは、コンシューマ・グループごとに1秒当たり5つです。1つのストリームでサポートできるコンシューマ・グループは最大50で、1つのストリーム内の1つのパーティションを読み取れるのは、コンシューマ・グループ内の1つのコンシューマのみであるため、1つのパーティションでサポートできるGETリクエストは1秒当たり最大250です。
- プロデューサがストリームに公開できるのは、1MB以下のメッセージです。
- リクエストは1MB以下にする必要があります。リクエストのサイズは、Base64からデコードされた後のキーとメッセージの合計です。
- ストリーミング・ポリシーをデータ・フローに追加します。
JavaまたはPythonを使用してKafkaに接続します。次の2つのいずれかの方法で認証します:
- プレーン・パスワードまたは認証トークンを使用します。この方法は、環境間のクイック・テストに適しています。たとえば、Spark構造化ストリーミング・アプリケーションのプロトタイピングでは、アプリケーションをローカルで、Oracle Streaming Serviceに対してデータ・フローで実行します。 ノート
アプリケーション引数のパスワードのハードコーディング(公開)は安全ではないため、本番実行にはこの方法を使用しないでください。 - リソース・プリンシパル認証は、プレーン・パスワードや認証トークンよりも安全です。Oracle Streaming Serviceで、より柔軟に認証を行えます。リソース・プリンシパル認証を使用するには、ストリーミング・ポリシーを設定します。
Javaサンプル・アプリケーションおよびPythonサンプル・アプリケーションを使用できます。
サンプルJavaアプリケーション
これは、データ・フロー用のサンプルJavaアプリケーションです。
package example;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.apache.spark.sql.types.TimestampType$;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
public class StructuredKafkaWordCount {
public static void main(String[] args) throws Exception {
Logger log = LogManager.getLogger(StructuredKafkaWordCount.class);
log.info("Started StructuredKafkaWordCount");
Thread.setDefaultUncaughtExceptionHandler((thread, e) -> {
log.error("Exception uncaught: ", e);
});
//Uncomment following line to enable debug log level.
//Logger.getRootLogger().setLevel(Level.DEBUG);
if (args.length < 4) {
printUsage();
}
String bootstrapServers = args[0];
String topics = args[1];
String checkpointLocation = args[2];
String type = args[3];
String outputLocation = null;
switch (type) {
case "console":
System.err.println("Using console output sink");
break;
case "csv":
if (args.length < 5) {
printUsage();
}
outputLocation = args[4];
System.err.println("Using csv output sink, output location = " + outputLocation);
break;
default:
printUsage();
}
SparkSession spark;
SparkConf conf = new SparkConf();
if (conf.contains("spark.master")) {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
} else {
spark = SparkSession.builder()
.appName("StructuredKafkaWordCount")
.master("local[*]")
.config("spark.sql.streaming.minBatchesToRetain", "10")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.streaming.stateStore.maintenanceInterval", "300")
.getOrCreate();
}
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config",
"com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:ocid1.streampool.oc1.phx.amaaaaaaep4fdfaartcuoi5y72mkrcg7hzogcx2jnygbmgik3hqwssxqa6pq\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING)");
// Split the lines into timestamp and words
StructType wordsSchema = StructType$.MODULE$.apply(
new StructField[]{
StructField.apply("timestamp", TimestampType$.MODULE$, true, Metadata.empty()),
StructField.apply("value", StringType$.MODULE$, true, Metadata.empty())
}
);
ExpressionEncoder<Row> encoder = RowEncoder.apply(wordsSchema);
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
Dataset<Row> words = lines
.flatMap(
(FlatMapFunction<Row, Row>) row -> {
// parse Kafka record in format: "timestamp(iso8601) text"
String text = row.getString(0);
String timestampString = text.substring(0, 25);
String message = text.substring(26);
Timestamp timestamp = new Timestamp(dateFormat.parse(timestampString).getTime());
return Arrays.asList(message.split(" ")).stream()
.map(word -> RowFactory.create(timestamp, word)).iterator();
}
, encoder);
// Time window aggregation
Dataset<Row> wordCounts = words
.withWatermark("timestamp", "1 minutes")
.groupBy(
functions.window(functions.col("timestamp"), "1 minutes", "1 minutes"),
functions.col("value")
)
.count()
.selectExpr("CAST(window.start AS timestamp) AS START_TIME",
"CAST(window.end AS timestamp) AS END_TIME",
"value AS WORD", "CAST(count AS long) AS COUNT");
wordCounts.printSchema();
// Reducing to a single partition
wordCounts = wordCounts.coalesce(1);
// Start streaming query
StreamingQuery query = null;
switch (type) {
case "console":
query = outputToConsole(wordCounts, checkpointLocation);
break;
case "csv":
query = outputToCsv(wordCounts, checkpointLocation, outputLocation);
break;
default:
System.err.println("Unknown type " + type);
System.exit(1);
}
query.awaitTermination();
}
private static void printUsage() {
System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " +
"<subscribe-topics> <checkpoint-location> <type> ...");
System.err.println("<type>: console");
System.err.println("<type>: csv <output-location>");
System.err.println("<type>: adw <wallet-path> <wallet-password> <tns-name>");
System.exit(1);
}
private static StreamingQuery outputToConsole(Dataset<Row> wordCounts, String checkpointLocation)
throws TimeoutException {
return wordCounts
.writeStream()
.format("console")
.outputMode("complete")
.option("checkpointLocation", checkpointLocation)
.start();
}
private static StreamingQuery outputToCsv(Dataset<Row> wordCounts, String checkpointLocation,
String outputLocation) throws TimeoutException {
return wordCounts
.writeStream()
.format("csv")
.outputMode("append")
.option("checkpointLocation", checkpointLocation)
.trigger(Trigger.ProcessingTime("1 minutes"))
.option("path", outputLocation)
.start();
}
}
サンプルPythonアプリケーション
これは、データ・フローのサンプルPythonアプリケーションです。
#!/usr/bin/env python3
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import concat, col, current_timestamp, lit, window, \
substring, to_timestamp, explode, split, length
import argparse
import os
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--auth-type', default='PLAIN')
parser.add_argument('--bootstrap-port', default='9092')
parser.add_argument('--bootstrap-server')
parser.add_argument('--checkpoint-location')
parser.add_argument('--encryption', default='SASL_SSL')
parser.add_argument('--ocid')
parser.add_argument('--output-location')
parser.add_argument('--output-mode', default='file')
parser.add_argument('--stream-password')
parser.add_argument('--raw-stream')
parser.add_argument('--stream-username')
args = parser.parse_args()
if args.bootstrap_server is None:
args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER')
if args.raw_stream is None:
args.raw_stream = os.environ.get('RAW_STREAM')
if args.stream_username is None:
args.stream_username = os.environ.get('STREAM_USERNAME')
if args.stream_password is None:
args.stream_password = os.environ.get('STREAM_PASSWORD')
assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set"
assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set"
assert args.output_location is not None, "Output location (--output-location) must be set"
assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set"
spark = (
SparkSession.builder
.appName('StructuredKafkaWordCount')
.config('failOnDataLoss', 'true')
.config('spark.sql.streaming.minBatchesToRetain', '10')
.config('spark.sql.shuffle.partitions', '1')
.config('spark.sql.streaming.stateStore.maintenanceInterval', '300')
.getOrCreate()
)
# Uncomment following line to enable debug log level.
# spark.sparkContext.setLogLevel('DEBUG')
# Configure Kafka connection settings.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = 'OCI-RSA-SHA256'
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
raw_kafka_options = {
'kafka.sasl.jaas.config': jaas_template.format(
username=args.stream_username, password=args.stream_password,
ocid=args.ocid
),
'kafka.sasl.mechanism': args.auth_type,
'kafka.security.protocol': args.encryption,
'kafka.bootstrap.servers': '{}:{}'.format(args.bootstrap_server,
args.bootstrap_port),
'subscribe': args.raw_stream,
'kafka.max.partition.fetch.bytes': 1024 * 1024,
'startingOffsets': 'latest'
}
# Reading raw Kafka stream.
raw = spark.readStream.format('kafka').options(**raw_kafka_options).load()
# Cast raw lines to a string.
lines = raw.selectExpr('CAST(value AS STRING)')
# Split value column into timestamp and words columns.
parsedLines = (
lines.select(
to_timestamp(substring('value', 1, 25))
.alias('timestamp'),
lines.value.substr(lit(26), length('value') - 25)
.alias('words'))
)
# Split words into array and explode single record into multiple.
words = (
parsedLines.select(
col('timestamp'),
explode(split('words', ' ')).alias('word')
)
)
# Do time window aggregation
wordCounts = (
words
.withWatermark('timestamp', '1 minutes')
.groupBy('word', window('timestamp', '1 minute'))
.count()
.selectExpr('CAST(window.start AS timestamp) AS START_TIME',
'CAST(window.end AS timestamp) AS END_TIME',
'word AS WORD',
'CAST(count AS long) AS COUNT')
)
# Reduce partitions to a single.
wordCounts = wordCounts.coalesce(1)
wordCounts.printSchema()
# Output it to the chosen channel.
if args.output_mode == 'console':
print("Writing aggregates to console")
query = (
wordCounts.writeStream
.option('checkpointLocation', args.checkpoint_location)
.outputMode('complete')
.format('console')
.option('truncate', False)
.start()
)
else:
print("Writing aggregates to Object Storage")
query = (
wordCounts.writeStream
.format('csv')
.outputMode('append')
.trigger(processingTime='1 minutes')
.option('checkpointLocation', args.checkpoint_location)
.option('path', args.output_location)
.start()
)
query.awaitTermination()
main()
プライベート・サブネットのストリーミング・ソースへの接続
プライベート・サブネットのストリーミング・ソースに接続するには、次のステップに従います。
-
プライベート・エンドポイントが存在しない場合は作成します。
-
Oracle Cloud Infrastructure Streamingへの接続の説明に従って、Oracle Streaming Serviceを設定し、ストリームを作成します
ストリーミング・ソースFDQNをコピーして、データ・フロー・プライベート・エンドポイントの作成に使用されるプライベート・サブネット内のVNIC間のトラフィックを許可します。ストリーミング・ソースがデータ・フロー・プライベート・エンドポイントとは異なるサブネットにある場合は、ストリーミング・サブネットとデータ・フロー・プライベート・エンドポイント・サブネット間のトラフィックを許可します。