Esta página ha sido traducida por una máquina.

OpenSearch Pipelines

Cree y gestione pipelines OpenSearch mediante Data Prepper para ingerir datos en un cluster OpenSearch.

Data Prepper es un recopilador de datos de código abierto que puede filtrar, enriquecer, transformar, normalizar y agregar datos para análisis y visualización descendentes. Es una de las herramientas de ingestión de datos más recomendadas para procesar conjuntos de datos grandes y complejos.

Utilizamos el modelo PULL para ingerir datos en el cluster OpenSearch 2.x e integrarlos con el servicio Oracle Cloud Infrastructure Streaming, Kafka autogestionado y Object Storage.
Nota

La función Cluster OpenSearch con preparador de datos está disponible actualmente en el dominio OC1.

Políticas necesarias

Complete las siguientes tareas antes de continuar con los pasos descritos en este tema:

Si no es administrador de su arrendamiento, póngase en contacto con los administradores de su arrendamiento para que le otorguen estos permisos. El administrador debe actualizar el permiso de los siguientes usuarios para permitir que los usuarios que no sean administradores gestionen y realicen operaciones CRUD en los pipelines

La siguiente política permite al administrador otorgar permisos a todos los usuarios del arrendamiento respectivo:

Allow any-user to manage opensearch-cluster-pipeline in tenancy

La siguiente política permite al administrador otorgar permiso para un grupo en un compartimento (recomendado)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

donde <group> es todos los usuarios dentro de ese grupo que pueden acceder al recurso.

La siguiente política permite a los pipelines OpenSearch leer los secretos de Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

Políticas de Object Storage

Estas políticas solo son necesarias para el origen de Object Storage:

La siguiente política permite a los pipelines OpenSearch utilizar un cubo de Object Storage como persistencia de coordinación de origen:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

La siguiente política permite a los pipelines OpenSearch ingerir objetos de Object Storage:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La siguiente política permite a los pipelines OpenSearch leer cubos de Object Storage:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La siguiente política permite a los pipelines OpenSearch leer cubos del cubo de coordinación de origen.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

OCI Streaming y políticas de Kafka autogestionadas

Estas políticas son necesarias para el servicio OCI Streaming u orígenes de Kafka autogestionados.

Políticas de red comunes

Nota

Estas políticas no son necesarias para el servicio público OCI Streaming.

Políticas que se van a agregar para permitir que el servicio OpenSearch cree, lea y actualice la supresión de los puntos finales privados en la subred del cliente.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

Políticas del servicio OCI Streaming (públicas y privadas)

Estas políticas solo son necesarias para el servicio OCI Streaming.

La siguiente política permite a los pipelines OpenSearch consumir los registros del servicio OCI Streaming.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

La siguiente política permite a los pipelines OpenSearch leer pools de flujos del servicio de flujo de OCI

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Permiso de Kafka autogestionado

Estos permisos solo son necesarios para los orígenes de Kafka autogestionados.

Seleccione el siguiente enlace para agregar el permiso necesario para mostrar los temas, describir los temas, unirse al grupo y consumir los registros del tema por el pipeline OpenSearch:

https://kafka.apache.org/documentation/#security_authz

Reglas de seguridad de red

Esta configuración solo es necesaria para el servicio OCI Streaming privado y Kafka autogestionado. En el caso del servicio Public OCI Streaming, seleccione ninguno.

Agregue una regla de seguridad de entrada en la lista de seguridad de la subred o el grupo de seguridad de red a todos los pipelines OpenSearch para comunicarse con el servicio privado de OCI Streaming que se ejecuta en la subred.

Para agregar la regla de seguridad, consulte Reglas de seguridad y Acceso y seguridad.

Para buscar el CIDR de la subred, consulte Obtención de detalles de una subred.

En la siguiente imagen se muestran las reglas de entrada para el grupo de seguridad de red.

Reglas de entrada para grupos de seguridad de red

Creación de los secretos en Vault

Todos los secretos de texto sin formato que necesitan los pipelines OpenSearch se deben transferir a él a través de Vault, ya que los pipelines OpenSearch no aceptan secretos de texto sin formato como nombres de usuario y contraseñas en el código yaml de los pipelines.

Realice las siguientes tareas:

  • Cree un nuevo usuario con permisos de escritura en el cluster OpenSearch. Para obtener instrucciones, consulte los siguientes temas de documentación de OpenSearch:

    Grupos de acción por defecto

    Usuarios y Roles

    Buscar con políticas de IAM OpenSearch

  • Cree secretos (nombre de usuario y contraseña) en Vault para el nuevo usuario que ha creado. Para obtener instrucciones, consulte los siguientes temas de Oracle Cloud Infrastructure Vault:

    Gestión de secretos de almacén

    Creación de un secreto en un almacén

  • Agregue las siguientes políticas de entidad de recurso en su arrendamiento OpenSearch para permitir que los pipelines OpenSearch lean los secretos del almacén.

    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Para obtener más información sobre la creación de políticas para Vault, consulte Detalles del servicio Vault.

Puede realizar las siguientes tareas de pipeline de OpenSearch:

Muestre los pipelines OpenSearch en un compartimento.

Crear un nuevo pipeline OpenSearch.

Obtención de detalles de un pipeline de OpenSearch.

Edite la configuración de un pipeline de OpenSearch.

Suprima un pipeline de OpenSearch de su arrendamiento.

Procesadores soportados

Almacenamiento de objetos y coordinación de origen YAML

El origen de Object Storage depende de la configuración de coordinación del origen. El pipeline OpenSearch soporta la coordinación de origen mediante el almacenamiento de objetos como persistencia. Debe proporcionar los detalles del cubo de Object Storage desde su arrendamiento.

El siguiente es un ejemplo de coordinación de origen:

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

Para obtener información sobre cómo obtener el espacio de nombres de Object Storage de su arrendamiento, consulte Descripción de los espacios de nombres de Object Storage.

A continuación, se muestra un ejemplo de coordinación de origen mediante la persistencia de Object Storage:

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

Almacenamiento de objetos YAML

A continuación se muestran las secciones de la configuración de pipeline de las que YAML debe tener en cuenta:

  • Secretos de OCI: puede crear un secreto en Vault con sus credenciales de cluster OpenSearch y utilizarlo en el YAML de pipeline para conectarse al cluster OpenSearch.

  • Disipador OpenSearch: el disipador contiene OCID de cluster OpenSearch con nombres de índice para la ingestión.

  • Origen ooci-object: el preparador de datos soporta la ingestión basada en exploración mediante Object Storage, que tiene muchas configuraciones soportadas. Puede configurar el origen para que ingiera objetos en el bloque de Object Storage según la frecuencia programada, o bien no según cualquier programa. Tiene las siguientes opciones de exploración

    • Una o más exploraciones de tiempo: esta opción permite configurar el pipeline que lee objetos en cubos de Object Storage una o más veces según la hora de la última modificación de los objetos.

    • Exploración basada en programación: esta opción permite programar una exploración en un intervalo regular después de crear el pipeline.

En la siguiente tabla se muestran las opciones que puede utilizar para configurar el origen de Object Storage.

Configuraciones de Almacenamiento de objetos
Opciones necesario Escribir Descripción
acknowledgments Número Booleano Cuando true, permite a los orígenes de objetos de Object Storage recibir confirmaciones de extremo a extremo cuando los eventos los reciben los sumideros de OpenSearch.
buffer_timeout Número Duración Cantidad de tiempo permitida para escribir eventos en el buffer de preparador de datos antes de que se produzca un timeout. Se descartan todos los eventos que el origen de OCI no pueda escribir en el buffer durante el período de tiempo especificado. El valor por defecto es 10s.
codec Codec El codec del preparador de datos que se va a aplicar.
compression Número Cadena Algoritmo de compresión que se va a aplicar: none, gzip, snappy o automatic. El valor por defecto es none.
delete_oci_objects_on_read Número Booleano Cuando true, la exploración de origen de Object Storage intenta suprimir objetos de Object Storage después de que todos los eventos del objeto de Object Storage hayan sido confirmados correctamente por todos los sumideros. acknowledgments debe estar activado al suprimir objetos de Object Storage. El valor por defecto es false. La supresión no funciona si acknowledgments de extremo a extremo no está activado.
oci Número OCIO Configuración de OCI. Consulte la siguiente sección de OCI para obtener más información.
records_to_accumulate Número Entero Número de mensajes que se acumulan antes de escribirse en el buffer. El valor por defecto es 100.
workers Número Entero Configura el número de threads de trabajo que utiliza el origen para leer datos del cubo de OCI. Deje este valor por defecto a menos que los objetos de Object Storage tengan menos de 1 MB. El rendimiento puede disminuir en el caso de objetos de Object Storage de mayor tamaño. El valor por defecto es 1.

A continuación, se muestra un ejemplo de la configuración del pipeline de Object Storage YAML:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

configuraciones de ejemplo

A continuación se muestran las opciones de exploración de un solo uso que puede aplicar en el nivel de cubo individual de Object Storage o en el nivel de exploración

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

A continuación se muestra un ejemplo de la hora de finalización:


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

A continuación se muestra un ejemplo de la hora de inicio y la hora de finalización:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

El siguiente es un ejemplo de rango:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

A continuación se muestra un ejemplo de hora de inicio, hora de finalización e intervalo:

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

El siguiente es un ejemplo del filtro include_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

A continuación se muestra un ejemplo de los archivos de filtro de la carpeta. Para leer archivos solo de carpetas específicas, utilice el filtro para especificar carpetas. A continuación, se muestra un ejemplo de cómo incluir archivos de folder2 en folder1 mediante include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

El siguiente es un ejemplo del filtro exclude_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

A continuación se muestra un ejemplo de soporte de códec para JSON:

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

A continuación, se muestra un ejemplo de compatibilidad con códec para CSV:


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

A continuación se muestra un ejemplo de compatibilidad con códec para nueva línea:

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

A continuación, se muestra un ejemplo de programación de opciones de ingestión sin recuento:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

A continuación, se muestra un ejemplo de programación de opciones de ingestión con recuento:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

A continuación, se muestra un ejemplo de opciones de programación de ingestión con hora de inicio:


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

A continuación se muestra un ejemplo de opciones de programación de ingestión con hora de finalización:

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

La fuente de Kafka no requiere ninguna coordinación de fuentes.

Para obtener información sobre todas las configuraciones disponibles para el origen de Kafka, acceda al siguiente enlace:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

Puede utilizar el servicio OCI Streaming como origen de Kafka para realizar la ingesta en el cluster OpenSearch. Para obtener más información sobre cómo hacerlo, consulte Uso de API de Kafka.

YAML de acceso público a OCI Streaming

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Pipelines de OCI Streaming Service Private Access YAML

A continuación se muestra un ejemplo de YAML para los pipelines OpenSearch en el servicio OCI Streaming:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Kafka YAML autogestionado

A continuación, se muestra un ejemplo de Kafka YAML autogestionado para OpenSearch:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>