Esta página ha sido traducida por una máquina.

Introducción al flujo de Spark

Para poder utilizar el flujo de Spark con Data Flow, debe configurarlo.

Apache Spark unifica el procesamiento por lotes, el procesamiento de flujos y el Machine Learning en una API. Data Flow ejecuta aplicaciones Spark dentro de un tiempo de ejecución de Apache Spark estándar. Al ejecutar una aplicación de flujo, Data Flow no utiliza un tiempo de ejecución diferente, en su lugar ejecuta la aplicación Spark de forma diferente:
Diferencias entre ejecuciones de flujo y no de flujo
Qué diferentes Ejecución no de flujo Ejecución de flujo
Identificación Utiliza un token en nombre del (OBO) usuario solicitante. Los tokens OBO caducan después de 24 horas, por lo que no son adecuados para trabajos de larga ejecución. Accede a Oracle Cloud Infrastructure Object Storage mediante tokens de sesión vinculados a la entidad de recurso de la ejecución. Es adecuado para trabajos de larga ejecución.
Restaurar política Falla si el código de salida de tiempo de ejecución de Spark es distinto de cero. Reinicia hasta diez veces si el código de salida de tiempo de ejecución de Spark es distinto de cero.
Política de parches No hay ninguna política de aplicación de parches, ya que se espera que los trabajos duren menos de 24 horas. Aplicación automática de parches mensuales.
  1. Cree una aplicación de flujo de Spark.
    Cuando se ejecuta la aplicación, esta utiliza la autenticación de entidad de recurso, la aplicación automática de parches y el reinicio automático.
  2. Configuración de una política para Spark Streaming
    Debido a que las aplicaciones de flujo de Spark utilizan los tokens de sesión de la entidad de recurso para autenticarse en los recursos de Oracle Cloud Infrastructure, debe crear políticas de IAM que autoricen las aplicaciones para que puedan acceder a estos recursos. Las ejecuciones de Data Flow se inician bajo demanda, por lo que no puede utilizar el OCID de ejecución en la política de IAM, porque no se asigna hasta que se inicia la ejecución. En su lugar, conecte los recursos de la ejecución a un recurso permanente y haga referencia a este en la política de IAM. Las dos formas más comunes de hacerlo son:
    Identificador de aplicación principal
    Conexión de la ejecución de Data Flow a la aplicación de Data Flow que la ha creado y colocación del identificador de aplicación de Data Flow en la política de IAM. Para definir permisos para una aplicación concreta, crear un grupo dinámico que coincida con todas las ejecuciones iniciadas desde la aplicación y autorizar al grupo dinámico a acceder a los recursos de IAM. Cada ejecución incluye una etiqueta que la asocia a su aplicación principal. Puede utilizar esta etiqueta en una regla de coincidencia de grupo dinámico.
    Nota

    Esta etiqueta no se puede utilizar en una política "any-user" de IAM, debe crear un grupo dinámico.
    Por ejemplo, si tiene una aplicación de Data Flow con el identificador ocid1.dataflowapplication.oc1.iad.A, cree un grupo dinámico:
    ALL {resource.type='dataflowrun', tag.oci-dataflow.application-id.value='ocid1.dataflowapplication.oc1.iad.A'}
    con las siguientes políticas:
    allow dynamic-group <dynamic_group_name> to manage objects in tenancy where all {
     target.bucket.name='<bucket_name>'
    }
    allow dynamic-group <dynamic_group_name> to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     target.streampool.id='<streampool_id>'
    }
    Identificador de compartimento de destino

    Conexión de la ejecución de Data Flow al compartimento donde se crean ejecuciones y colocación del identificador de compartimento en la política de IAM. Este enfoque es menos específico, porque cualquier ejecución de aplicación Spark en el compartimento obtiene acceso a estos recursos. Si tiene previsto utilizar spark-submit a través de la CLI, debe utilizar este enfoque porque tanto el identificador de aplicación como el identificador de ejecución son a petición.

    Por ejemplo, si tiene una ejecución con el identificador ocid1.dataflowrun.oc1.iad.R2 en un compartimento con el identificador ocid1.tenancy.oc1.C, tendría las siguientes políticas:
    allow any-user to manage objects in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.bucket.name='<bucket_name>'
    }
    allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in tenancy where all {
     request.principal.type='dataflowrun',
     request.compartment.id='ocid1.tenancy.oc1.C',
     target.streampool.id='<streampool_id>'
    }

Conexión a Oracle Cloud Infrastructure Streaming

Descubra cómo conectarse a Oracle Cloud Infrastructure Streaming.

Configuración del flujo:
  • Configure el servicio Oracle Streaming y cree un flujo.
    Nota

    El servicio Oracle Streaming tiene los límites siguientes:
    • Los mensajes de un flujo se conservan durante un mínimo de 24 horas y un máximo de siete días.
    • Todos los mensajes de un flujo se suprimen después de caducar el período de retención, tanto si se han leído como si no.
    • El período de retención de un flujo no se puede cambiar después de crear el flujo.
    • Un arrendamiento tiene un límite por defecto de cero o cinco particiones en función de su licencia. Si necesita más particiones, puede solicitar un aumento del límite de servicio.
    • El número de particiones de un flujo no se puede cambiar después de la creación del flujo.
    • Un único flujo puede soportar hasta 50 grupos de consumidores leyendo de este.
    • Cada partición tiene un total de escritura de datos de 1 MB por segundo. No hay ningún límite en el número de solicitudes PUT, siempre que no se supere el límite de escritura de datos.
    • Cada partición tiene cinco solicitudes GET por segundo por grupo de consumidores. Como único flujo, puede soportar hasta 50 grupos de consumidores, y solo un consumidor de un grupo de consumidores puede leer una única partición de un flujo. Una partición puede soportar hasta 250 solicitudes GET por segundo.
    • Los productores pueden publicar un mensaje de no más de 1 MB en un flujo.
    • Una solicitud no puede tener más de 1 MB. El tamaño de una solicitud es la suma de sus claves y mensajes después de que se hayan descodificado desde Base64.
  • Agregue políticas de flujo a Data Flow.
Conéctese a Kafka utilizando Java o Python. Puede autenticarse de una de estas dos maneras:
  • Utilice una contraseña de texto sin formato o un token de autenticación. Este método es adecuado para pruebas rápidas entre entornos. Por ejemplo, la creación de prototipos de aplicación de flujo estructurado de Spark cuando desea ejecutar de forma local y en Data Flow en el servicio Oracle Streaming.
    Nota

    La codificación o la exposición de la contraseña en argumentos de aplicación no se considera segura, por lo que no debe utilizar este método para las ejecuciones de producción.
  • La autenticación de la entidad de recurso es más segura que la contraseña de texto sin formato o el token de autenticación. Es una forma más flexible de autenticarse con el servicio Oracle Streaming. Configure políticas de flujo para utilizar la autenticación de entidad de recurso.

Dispone de una aplicación Java de ejemplo y una aplicación Python de ejemplo.

  1. Busque el pool de flujos que desea utilizar para conectarse a Kafka.
    1. Haga clic en Página inicial.
    2. Haga clic en Flujo.
    3. Haga clic en Pools de flujos.
    4. Haga clic en el pool de flujos que desea utilizar para ver sus detalles.
    5. Haga clic en Configuración de conexión de Kafka.
    6. Copie la siguiente información:
      • OCID de pool de flujos
      • Servidor de inicialización de datos
      • Cadena de conexión
      • Protocolo de seguridad, por ejemplo, SASL_SSL
      • Mecanismo de seguridad, por ejemplo, PLAIN
      Nota

      Si la contraseña de la cadena de conexión está definida en AUTH_TOKEN, cree un token de autenticación o utilice uno existente (password="<auth_token>") para el usuario especificado en el nombre de usuario (username="<tenancy>/<username>/<stream_pool_id>":
      1. Haga clic en Identidad.
      2. Haga clic en Usuarios.
      3. Para el usuario, muestre los detalles del usuario.
      4. Cree un token de autenticación o utilice uno existente.
  2. Spark no se enlaza a las bibliotecas de integración de Kafka por defecto, por lo que debe agregarlo como parte de las dependencias de aplicación Spark.
    • Para aplicaciones Java o Scala que utilizan definiciones de proyecto de SBT o Maven, enlace la aplicación a este artefacto:
      groupId = org.apache.spark
      artifactId = spark-sql-kafka-0-10_2.12
      version = 3.0.2
      Nota

      Para utilizar la funcionalidad de cabeceras, la versión del cliente de Kafka debe ser al menos 0.11.0.0.
    • Para las aplicaciones Python, agregue las bibliotecas y las dependencias de integración de Kafka al desplegar la aplicación.
    • Si utiliza la autenticación de entidad de recurso de Data Flow, necesita este artefacto:
      groupId = com.oracle.oci.sdk
      artifactId = oci-java-sdk-addons-sasl
      version = 1.36.1
  3. Configure el sistema.
    El comportamiento de las conexiones de Kafka se controla mediante la configuración del sistema; por ejemplo, los servidores, la autenticación, el tema, los grupos, etc. La configuración es potente, con un único cambio de valor que tiene un gran efecto en todo el sistema.
    Configuración común
    subscribe = <Kafka_topic_to_consume>
    kafka.max.partition.fetch.bytes = <Fetch_rate_limit>
    startingOffsets = <Start_point_of_the_first_run_of_the_application> 
    failOnDataLoss = <Failure_streaming_application>
    Para obtener más información sobre el límite de ratio de recuperación, consulte Límites sobre los recursos de Streaming.
    Nota

    Los reinicios posteriores continúan desde el último punto de control, no desde el lugar especificado en startingOffsets. Para ver otras opciones, consulte Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

    failOnDataLoss especifica la aplicación de flujo que se utilizará cuando no se puedan recuperar los datos porque se han eliminado de Oracle Streaming.

    Configuración avanzada

    Consulte la guía de integración de Kafka y el flujo de Spark.

    Configuraciones de ejemplo
    Contraseña de texto:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = PLAIN
    kafka.sasl.jaas.config = org.apache.kafka.common.security.plain.PlainLoginModule required username="<tenancy_name>/<username>/<streampool_ocid>" password="<example-password>";
    Entidad de recurso:
    kafka.bootstrap.servers = <bootstrap_server_name>:<port_number>
    kafka.security.protocol = SASL_SSL
    kafka.sasl.mechanism = OCI-RSA-SHA256
    kafka.sasl.jaas.config = com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:<streampool_ocid>";
  4. Conéctese a Kafka.
    Conexiones de ejemplo.
    Java con entidad de recurso para el flujo de Oracle Cloud Infrastructure
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "OCI-RSA-SHA256")
        .option("kafka.sasl.jaas.config", "com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent=\"streamPoolId:<streampool_ocid>\";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
    Java con contraseña de texto sin formato
    // Create DataFrame representing the stream of input lines from Kafka
    Dataset<Row> lines = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topics)
        .option("kafka.security.protocol", "SASL_SSL")
        .option("kafka.sasl.mechanism", "PLAIN")
        .option("kafka.sasl.jaas.config",
            "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<tenancy_name>/<username>/<streampool_ocid>" password=\"<example-password> \";")
        .option("kafka.max.partition.fetch.bytes", 1024 * 1024) // limit request size to 1MB per partition
        .option("startingOffsets", "latest")
        .load()
    Python
    spark = (
        SparkSession.builder.config("failOnDataLoss", "false")
        .appName("kafka_streaming_aggregate")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
     
    # Configure settings we need to read Kafka.
    if args.ocid is not None:
        jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";'
        args.auth_type = "OCI-RSA-SHA256"
    else:
        jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";'
     
    # For the raw source stream.
    raw_kafka_options = {
        "kafka.sasl.jaas.config": jaas_template.format(
            username=args.stream_username, password=args.stream_password, ocid=args.ocid
        ),
        "kafka.sasl.mechanism": args.auth_type,
        "kafka.security.protocol": args.encryption,
        "kafka.bootstrap.servers": "{}:{}".format(
            args.bootstrap_server, args.bootstrap_port
        ),
        "group.id": args.raw_stream,
        "subscribe": args.raw_stream,
    }
     
    # The actual reader.
    raw = spark.readStream.format("kafka").options(**raw_kafka_options).load()
    Nota

    Para utilizar Python con la entidad de recurso para el flujo de Oracle Cloud Infrastructure, debe utilizar archive.zip. Hay más información disponible en la sección sobre la Funcionalidad spark-submit en Data Flow.

Conexión a un origen de flujo en una subred privada

Siga estos pasos para conectarse a un origen de transmisión en una subred privada.

Copie el FDQN de origen de flujo para permitir el tráfico entre las VNIC de la subred privada utilizada para crear el punto final privado de Data Flow. Si el origen de flujo está en una subred diferente al punto final privado de Data Flow, permita el tráfico entre la subred de Streaming y la subred del punto final privado de Data Flow.

  1. Cree un pool de flujos con un punto final privado.
    Consulte la documentación de Streaming para obtener más información.
  2. Consulte los detalles del pool de flujos y copie el valor de FDQN.
  3. Edite el punto final privado y sustituya el valor de zonas de DNS para controlar por el valor del FDQN del pool de flujos que ha copiado en el paso anterior.
  4. Asocie el punto final privado a la aplicación de flujo.
  5. Ejecute la aplicación.