Before you can use Spark streaming with Data Flow, you
must set it up.
Apache Spark unifies Batch Processing, Stream Processing and Machine Learning in one
API. Data Flow runs Spark applications within a
standard Apache Spark runtime. When you run a streaming Application, Data Flow doesn't use a different runtime, instead
it runs the Spark application in a different way:
Differences between streaming and non-streaming runs
What is Different
Non-Streaming Run
Streaming Run
Authentication
Uses an On-Behalf-Of (OBO) token of the requesting user. OBO
tokens expire after 24 hours, so this is not suitable for
long-running jobs.
Accesses Oracle Cloud Infrastructure Object
Storage using session tokens tied to the Run's Resource
Principal. It is suitable for long-running jobs.
Restart Policy
Fails if the Spark runtime exit code is non-zero.
Restarts up to ten times if the Spark runtime exit code is
non-zero.
Patch Policy
No patching policy as jobs are expected to last fewer than 24
hours.
Automatic monthly patches.
Create a Spark Streaming Application.
When the application is run, it uses Resource Principal authentication,
auto-patching, and auto-restart.
Because your Spark Streaming Applications uses the Resource Principal session
tokens to authenticate to Oracle Cloud Infrastructure resources,
you must create IAM policies authorizing
your applications before they can access these resources. Data Flow Runs are started on-demand so you
can't use the Run OCID in your IAM policy,
because it's not allocated until the Run starts. Instead, connect the Run's
resources to a permanent resource and reference it in your IAM policy. The two most common ways of
doing this are:
Parent Application ID
Connect the Data Flow Run to the
Data Flow Application that
created it, and put the Data Flow Application ID in the IAM Policy. To set permissions for a particular Application,
create a dynamic group that matches all Runs started from the
Application, and authorize the Dynamic Group to access IAM resources. Each Run
includes a tag associating it with its parent Application. You
can use this tag in a Dynamic Group matching rule.
Note
This
tag can't be used in an IAM "any-user" policy,
you must create a Dynamic Group.
For example, if you have a Data Flow Application with ID of
ocid1.dataflowapplication.oc1.iad.A, then
you create a dynamic
group:
ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
with the following
policies:
allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
target.bucket.name='<bucket_name>'
}
allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
target.streampool.id='<streampool_id>'
}
Target Compartment ID
Connect the Data Flow Run to the
Compartment where Runs are created, and put the Compartment ID
in the IAM Policy. This
approach is less specific, because any Spark application run in
the Compartment gets access to these resources. If you plan to
use spark-submit via CLI, you must use this approach because
both Application ID and Run ID are on-demand.
For example, if you have a Run with ID
ocid1.dataflowrun.oc1.iad.R2 in a
compartment with the ID ocid1.tenancy.oc1.C,
then you would have the following policies:
allow any-user to manage objects in tenancy where all {
request.principal.type='dataflowrun',
request.compartment.id='ocid1.tenancy.oc1.C',
target.bucket.name='<bucket_name>'
}
allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
request.principal.type='dataflowrun',
request.compartment.id='ocid1.tenancy.oc1.C',
target.streampool.id='<streampool_id>'
}
Connecting to Oracle Cloud Infrastructure Streaming
Learn how to connect to Oracle Cloud Infrastructure
Streaming.
Messages in a stream are retained for no less than 24 hours, and
no more than seven days.
All messages in a stream are deleted after the retention period
has expired, whether they have been read or not.
The retention period of a steam can't be changed after the
stream has been created.
A tenancy has a default limit of zero or five partitions
depending on your license. If you require more partitions you
can request a service limit
increase.
The number of partitions for a stream can't be changed after
creation of the stream.
A single stream can support up to 50 consumer groups reading
from it.
Each partition has a total data write of 1 MB per second. There
is no limit to the number of PUT requests, provided the data
write limit isn't exceeded.
Each partition has five GET requests per second per consumer
group. As a single stream can support up to 50 consumer groups,
and a single partition in a stream can be read by only one
consumer in a consumer group, a partition can support up to 250
GET requests per second.
Producers can publish a message of no more than 1 MB to a
stream.
A request can be no bigger than 1 MB. A request's size is the
sum of its keys and messages after they have been decoded from
Base64.
Connect to Kafka either using Java or Python. Authenticate in one of two ways:
Use a plain password or auth token. This method is suitable for cross
environment quick testing. For example, Spark structured streaming
application prototyping, where you want to run locally and on Data Flow against the Oracle Streaming
Service.
Note
Hardcoding, or exposing, the password in application
arguments isn't considered secure, so don't use this method for
production runs.
Resource principal authentication is more secure than plain password or auth
token. It's a more flexible way to authenticate with Oracle Streaming
Service. Set up streaming policies to use resource
principal authentication.
Find the stream pool you want to use to connect to Kafka.
Click Home.
Click Streaming.
Click Stream Pools.
Click the stream pool you want to use to see its details.
Click Kafka Connection Settings.
Copy the following information:
Stream pool OCID
Bootstrap server
Connection string
Security protocol, for example, SASL_SSL
Security mechanism, for example, PLAIN
Note
If the password in the connection string is set to
AUTH_TOKEN, create an auth token or use an
existing one (password="<auth_token>")
for the user specified in username
(username="<tenancy>/<username>/<stream_pool_id>":
Click Identity.
Click Users.
For your user, display the user details.
Create an auth token, or use an existing one.
Spark doesn't bind to Kafka integration libraries by default, so you must add
it as part of the Spark application dependencies.
For Java or Scala applications using SBT or Maven project definitions,
link your application with this artifact:
Copy
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.0.2
Note
To use the headers functionality, your
Kafka client version must be at least 0.11.0.0.
For Python applications, add the Kafka integration libraries and
dependencies when deploying your application.
If you use Data Flow Resource Principal
authentication, you need this
artifact:
Copy
groupId = com.oracle.oci.sdk
artifactId = oci-java-sdk-addons-sasl
version = 1.36.1
Configure the system.
How Kafka connections behave is controlled by configuring the system, for
example, the servers, authentication, topic, groups and so on. Configuration is
powerful with a single value change having a big effect on the whole system.
Java with resource principal for Oracle Cloud Infrastructure streaming
Copy
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
Java with a plain password
Copy
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
Python
Copy
spark = (
SparkSession.builder.config("failOnDataLoss", "false")
.appName("kafka_streaming_aggregate")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
# Configure settings we need to read Kafka.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = "OCI-RSA-SHA256"
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
# For the raw source stream.
raw_kafka_options = {
"kafka.sasl.jaas.config": jaas_template.format(
username=args.stream_username, password=args.stream_password, ocid=args.ocid
),
"kafka.sasl.mechanism": args.auth_type,
"kafka.security.protocol": args.encryption,
"kafka.bootstrap.servers": "{}:{}".format(
args.bootstrap_server, args.bootstrap_port
),
"group.id": args.raw_stream,
"subscribe": args.raw_stream,
}
# The actual reader.
raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
Note
To use Python with resource principal for Oracle Cloud Infrastructure streaming, you must
use archive.zip. More information is available in the section on
Spark-Submit Functionality in Data Flow.
Copy the streaming source FDQN to allow traffic between VNICs within the private
subnet used to create Data Flow Private Endpoint. If
the streaming source is in a different subnet to the Data Flow Private Endpoint, allow traffic between
the Streaming subnet and Data Flow Private Endpoint
subnet.