Para poder utilizar el flujo de Spark con Data Flow, debe configurarlo.
Apache Spark unifica el procesamiento por lotes, el procesamiento de flujos y el Machine Learning en una API. Data Flow ejecuta aplicaciones Spark dentro de un tiempo de ejecución de Apache Spark estándar. Al ejecutar una aplicación de flujo, Data Flow no utiliza un tiempo de ejecución diferente, en su lugar ejecuta la aplicación Spark de forma diferente:
Diferencias entre ejecuciones de flujo y no de flujo
Qué diferentes
Ejecución no de flujo
Ejecución de flujo
Identificación
Utiliza un token en nombre del (OBO) usuario solicitante. Los tokens OBO caducan después de 24 horas, por lo que no son adecuados para trabajos de larga ejecución.
Accede a Oracle Cloud Infrastructure Object Storage mediante tokens de sesión vinculados a la entidad de recurso de la ejecución. Es adecuado para trabajos de larga ejecución.
Restaurar política
Falla si el código de salida de tiempo de ejecución de Spark es distinto de cero.
Reinicia hasta diez veces si el código de salida de tiempo de ejecución de Spark es distinto de cero.
Política de parches
No hay ninguna política de aplicación de parches, ya que se espera que los trabajos duren menos de 24 horas.
Aplicación automática de parches mensuales.
Cree una aplicación de flujo de Spark.
Cuando se ejecuta la aplicación, esta utiliza la autenticación de entidad de recurso, la aplicación automática de parches y el reinicio automático.
Debido a que las aplicaciones de flujo de Spark utilizan los tokens de sesión de la entidad de recurso para autenticarse en los recursos de Oracle Cloud Infrastructure, debe crear políticas de IAM que autoricen las aplicaciones para que puedan acceder a estos recursos. Las ejecuciones de Data Flow se inician bajo demanda, por lo que no puede utilizar el OCID de ejecución en la política de IAM, porque no se asigna hasta que se inicia la ejecución. En su lugar, conecte los recursos de la ejecución a un recurso permanente y haga referencia a este en la política de IAM. Las dos formas más comunes de hacerlo son:
Identificador de aplicación principal
Conexión de la ejecución de Data Flow a la aplicación de Data Flow que la ha creado y colocación del identificador de aplicación de Data Flow en la política de IAM. Para definir permisos para una aplicación concreta, crear un grupo dinámico que coincida con todas las ejecuciones iniciadas desde la aplicación y autorizar al grupo dinámico a acceder a los recursos de IAM. Cada ejecución incluye una etiqueta que la asocia a su aplicación principal. Puede utilizar esta etiqueta en una regla de coincidencia de grupo dinámico.
Nota
Esta etiqueta no se puede utilizar en una política "any-user" de IAM, debe crear un grupo dinámico.
Por ejemplo, si tiene una aplicación de Data Flow con el identificador ocid1.dataflowapplication.oc1.iad.A, cree un grupo dinámico:
ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
con las siguientes políticas:
allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
target.bucket.name='<bucket_name>'
}
allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
target.streampool.id='<streampool_id>'
}
Identificador de compartimento de destino
Conexión de la ejecución de Data Flow al compartimento donde se crean ejecuciones y colocación del identificador de compartimento en la política de IAM. Este enfoque es menos específico, porque cualquier ejecución de aplicación Spark en el compartimento obtiene acceso a estos recursos. Si tiene previsto utilizar spark-submit a través de la CLI, debe utilizar este enfoque porque tanto el identificador de aplicación como el identificador de ejecución son a petición.
Por ejemplo, si tiene una ejecución con el identificador ocid1.dataflowrun.oc1.iad.R2 en un compartimento con el identificador ocid1.tenancy.oc1.C, tendría las siguientes políticas:
allow any-user to manage objects in tenancy where all {
request.principal.type='dataflowrun',
request.compartment.id='ocid1.tenancy.oc1.C',
target.bucket.name='<bucket_name>'
}
allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
request.principal.type='dataflowrun',
request.compartment.id='ocid1.tenancy.oc1.C',
target.streampool.id='<streampool_id>'
}
Conexión a Oracle Cloud Infrastructure Streaming
Descubra cómo conectarse a Oracle Cloud Infrastructure Streaming.
Los mensajes de un flujo se conservan durante un mínimo de 24 horas y un máximo de siete días.
Todos los mensajes de un flujo se suprimen después de caducar el período de retención, tanto si se han leído como si no.
El período de retención de un flujo no se puede cambiar después de crear el flujo.
Un arrendamiento tiene un límite por defecto de cero o cinco particiones en función de su licencia. Si necesita más particiones, puede solicitar un aumento del límite de servicio.
El número de particiones de un flujo no se puede cambiar después de la creación del flujo.
Un único flujo puede soportar hasta 50 grupos de consumidores leyendo de este.
Cada partición tiene un total de escritura de datos de 1 MB por segundo. No hay ningún límite en el número de solicitudes PUT, siempre que no se supere el límite de escritura de datos.
Cada partición tiene cinco solicitudes GET por segundo por grupo de consumidores. Como único flujo, puede soportar hasta 50 grupos de consumidores, y solo un consumidor de un grupo de consumidores puede leer una única partición de un flujo. Una partición puede soportar hasta 250 solicitudes GET por segundo.
Los productores pueden publicar un mensaje de no más de 1 MB en un flujo.
Una solicitud no puede tener más de 1 MB. El tamaño de una solicitud es la suma de sus claves y mensajes después de que se hayan descodificado desde Base64.
Conéctese a Kafka utilizando Java o Python. Puede autenticarse de una de estas dos maneras:
Utilice una contraseña de texto sin formato o un token de autenticación. Este método es adecuado para pruebas rápidas entre entornos. Por ejemplo, la creación de prototipos de aplicación de flujo estructurado de Spark cuando desea ejecutar de forma local y en Data Flow en el servicio Oracle Streaming.
Nota
La codificación o la exposición de la contraseña en argumentos de aplicación no se considera segura, por lo que no debe utilizar este método para las ejecuciones de producción.
La autenticación de la entidad de recurso es más segura que la contraseña de texto sin formato o el token de autenticación. Es una forma más flexible de autenticarse con el servicio Oracle Streaming. Configure políticas de flujo para utilizar la autenticación de entidad de recurso.
Busque el pool de flujos que desea utilizar para conectarse a Kafka.
Haga clic en Página inicial.
Haga clic en Flujo.
Haga clic en Pools de flujos.
Haga clic en el pool de flujos que desea utilizar para ver sus detalles.
Haga clic en Configuración de conexión de Kafka.
Copie la siguiente información:
OCID de pool de flujos
Servidor de inicialización de datos
Cadena de conexión
Protocolo de seguridad, por ejemplo, SASL_SSL
Mecanismo de seguridad, por ejemplo, PLAIN
Nota
Si la contraseña de la cadena de conexión está definida en AUTH_TOKEN, cree un token de autenticación o utilice uno existente (password="<auth_token>") para el usuario especificado en el nombre de usuario (username="<tenancy>/<username>/<stream_pool_id>":
Haga clic en Identidad.
Haga clic en Usuarios.
Para el usuario, muestre los detalles del usuario.
Cree un token de autenticación o utilice uno existente.
Spark no se enlaza a las bibliotecas de integración de Kafka por defecto, por lo que debe agregarlo como parte de las dependencias de aplicación Spark.
Para aplicaciones Java o Scala que utilizan definiciones de proyecto de SBT o Maven, enlace la aplicación a este artefacto:
Copiar
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.0.2
Nota
Para utilizar la funcionalidad de cabeceras, la versión del cliente de Kafka debe ser al menos 0.11.0.0.
Para las aplicaciones Python, agregue las bibliotecas y las dependencias de integración de Kafka al desplegar la aplicación.
Si utiliza la autenticación de entidad de recurso de Data Flow, necesita este artefacto:
Copiar
groupId = com.oracle.oci.sdk
artifactId = oci-java-sdk-addons-sasl
version = 1.36.1
Configure el sistema.
El comportamiento de las conexiones de Kafka se controla mediante la configuración del sistema; por ejemplo, los servidores, la autenticación, el tema, los grupos, etc. La configuración es potente, con un único cambio de valor que tiene un gran efecto en todo el sistema.
Java con entidad de recurso para el flujo de Oracle Cloud Infrastructure
Copiar
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
.option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
Java con contraseña de texto sin formato
Copiar
// Create DataFrame representing the stream of input lines from Kafka
Dataset<Row> lines = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
.option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
.option("startingOffsets", "latest")
.load()
Python
Copiar
spark = (
SparkSession.builder.config("failOnDataLoss", "false")
.appName("kafka_streaming_aggregate")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
# Configure settings we need to read Kafka.
if args.ocid is not None:
jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
args.auth_type = "OCI-RSA-SHA256"
else:
jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
# For the raw source stream.
raw_kafka_options = {
"kafka.sasl.jaas.config": jaas_template.format(
username=args.stream_username, password=args.stream_password, ocid=args.ocid
),
"kafka.sasl.mechanism": args.auth_type,
"kafka.security.protocol": args.encryption,
"kafka.bootstrap.servers": "{}:{}".format(
args.bootstrap_server, args.bootstrap_port
),
"group.id": args.raw_stream,
"subscribe": args.raw_stream,
}
# The actual reader.
raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
Nota
Para utilizar Python con la entidad de recurso para el flujo de Oracle Cloud Infrastructure, debe utilizar archive.zip. Hay más información disponible en la sección sobre la Funcionalidad spark-submit en Data Flow.
Copie el FDQN de origen de flujo para permitir el tráfico entre las VNIC de la subred privada utilizada para crear el punto final privado de Data Flow. Si el origen de flujo está en una subred diferente al punto final privado de Data Flow, permita el tráfico entre la subred de Streaming y la subred del punto final privado de Data Flow.
Cree un pool de flujos con un punto final privado.
Edite el punto final privado y sustituya el valor de zonas de DNS para controlar por el valor del FDQN del pool de flujos que ha copiado en el paso anterior.