OpenSearch Pipelines

Créez et gérez des pipelines OpenSearch à l'aide de Data Prepper pour inclure des données dans un cluster OpenSearch.

Data Prepper est un collecteur de données open source qui peut filtrer, enrichir, transformer, normaliser et agréger des données pour l'analyse et la visualisation en aval. Il s'agit de l'un des outils d'assimilation de données les plus recommandés pour le traitement d'ensembles de données volumineux et complexes.

Les pipelines OpenSearch sont compatibles avec les clusters OpenSearch exécutant la version 2.x et les versions ultérieures.

Remarque

La fonctionnalité Cluster OpenSearch avec Data Prepper n'est disponible que dans le domaine OC1.

Stratégies requises

Renseignez les conditions requises décrites dans cette section avant de procéder aux étapes décrites dans cette rubrique.

Si vous n'êtes pas administrateur dans votre location, contactez les administrateurs de votre location pour vous accorder ces droits d'accès. L'administrateur doit mettre à jour les droits d'accès des utilisateurs suivants pour permettre aux utilisateurs non administrateurs de gérer et d'exécuter des opérations CRUD sur les pipelines

La stratégie suivante permet à l'administrateur d'accorder des droits d'accès à tous les utilisateurs de la location concernée :

Allow any-user to manage opensearch-cluster-pipeline in tenancy

La stratégie suivante permet à l'administrateur d'accorder des droits d'accès à un groupe dans un compartiment (recommandé)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
                

<group> correspond à tous les utilisateurs de ce groupe qui peuvent accéder à la ressource.

La stratégie suivante permet aux pipelines OpenSearch de lire les clés secrètes à partir d'Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Remarque

Les noms d'index OpenSearch doivent respecter les règles suivantes :

  • Toutes les lettres doivent être en minuscules.
  • Les noms ne peuvent pas commencer par un trait de soulignement (_) ou un trait d'union (-).
  • Les noms ne peuvent pas contenir d'espaces, de virgules ou l'un des caractères suivants : :, ", *, +, /, \, |, ?, #, >, <
  • Le nom d'index peut contenir une expression de préparateur de données.

Création des clés secrètes dans le coffre

Toutes les clés secrètes en texte brut requises par les pipelines OpenSearch doivent lui être transmises via Vault, car les pipelines OpenSearch n'acceptent pas les clés secrètes en texte brut telles que les noms utilisateur et les mots de passe dans le code YAML des pipelines.

Procédez comme suit :

  • Créez un utilisateur avec des droits d'accès en écriture dans le cluster OpenSearch. Pour des instructions, reportez-vous aux rubriques de documentation OpenSearch suivantes :

    Groupes d'actions par défaut

    Utilisateurs et rôles

    Rechercher avec les stratégies IAM OpenSearch

  • Créez des clés secrètes (nom d'utilisateur et mot de passe) dans Vault pour le nouvel utilisateur que vous avez créé. Pour obtenir des instructions, reportez-vous aux rubriques Oracle Cloud Infrastructure Vault suivantes :

    Gérer les clés secrètes de coffre

    Création d'une clé secrète dans une chambre forte

  • Ajoutez les stratégies de principal de ressource suivantes dans votre location OpenSearch pour autoriser les pipelines OpenSearch à lire les clés secrètes à partir du coffre.
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Pour plus d'informations sur la création de stratégies pour Vault, reportez-vous à Détails du service Vault.

Vous pouvez effectuer les tâches de pipeline OpenSearch suivantes :

Répertoriez les pipelines OpenSearch dans un compartiment.

Créez un pipeline OpenSearch.

Obtenir les détails d'un pipeline OpenSearch.

Modifiez les paramètres d'un pipeline OpenSearch.

Supprimez un pipeline OpenSearch de votre location.

Processeurs pris en charge

Pipelines PULL

Stratégies Object Storage

Ces stratégies sont uniquement requises pour la source Object Storage :

La stratégie suivante permet aux pipelines OpenSearch d'utiliser un bucket d'Object Storage en tant que persistance de coordination source :

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

La stratégie suivante permet aux pipelines OpenSearch d'ingérer des objets à partir d'Object Storage :

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La stratégie suivante permet aux pipelines OpenSearch de lire des buckets à partir d'Object Storage :

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La stratégie suivante permet aux pipelines OpenSearch de lire des buckets à partir du bucket de coordination source.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

Stratégies Kafka OCI Streaming et autogérées

Ces stratégies sont requises pour le service OCI Streaming ou les sources Kafka autogérées.

Stratégies réseau communes

Remarque

Ces stratégies ne sont pas nécessaires pour le service OCI Streaming public.

Stratégies à ajouter pour permettre au service OpenSearch de créer, de lire et de mettre à jour les adresses privées du sous-réseau client.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
                    

Stratégies OCI Streaming Service (publiques et privées)

Ces stratégies sont uniquement requises pour le service OCI Streaming.

La stratégie suivante permet aux pipelines OpenSearch d'utiliser les enregistrements du service OCI Streaming.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

La stratégie suivante permet aux pipelines OpenSearch de lire des pools de flux de données à partir du service OCI Streaming

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Autorisation Kafka autogérée

Ces droits d'accès sont uniquement requis pour les sources Kafka autogérées.

Sélectionnez le lien suivant pour ajouter les droits d'accès requis afin de répertorier les rubriques, de décrire les rubriques, de rejoindre le groupe et d'utiliser les enregistrements du sujet par le pipeline OpenSearch :

https://kafka.apache.org/documentation/#security_authz

Règles de sécurité réseau

Cette configuration est uniquement requise pour le service OCI Streaming privé et Kafka autogéré. Dans le cas du service public OCI Streaming, sélectionnez Aucun.

Ajoutez une règle de sécurité entrante dans la liste de sécurité du sous-réseau ou du groupe de sécurité réseau à tous les pipelines OpenSearch afin de communiquer avec le service OCI Streaming privé exécuté dans votre sous-réseau.

Pour ajouter la règle de sécurité, reportez-vous à Règles de sécurité et à Accès et sécurité.

Pour rechercher le CIDR de votre sous-réseau, reportez-vous à Obtention des détails d'un sous-réseau.

L'image suivante présente les règles entrantes du groupe de sécurité réseau.

Règles entrantes pour les groupes de sécurité réseau

Object Storage et coordination des sources - YAML

La source Object Storage dépend de la configuration de la coordination source. Le pipeline OpenSearch prend en charge la coordination des sources en utilisant Object Storage comme persistance. Vous devez fournir les détails du bucket Object Storage à partir de votre location.

Voici un exemple de coordination source :

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>
                

Pour plus d'informations sur l'obtention de l'espace de noms Object Storage de votre location, reportez-vous à Présentation des espaces de noms Object Storage.

Voici un exemple de coordination source à l'aide de la persistance Object Storage :

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

Object Storage - YAML

Les sections suivantes de la configuration de pipeline YAML doivent être prises en compte :

  • Clés secrètes OCI : vous pouvez créer une clé secrète dans le coffre avec vos informations d'identification de cluster OpenSearch et l'utiliser dans le pipeline YAML pour la connexion au cluster OpenSearch.
  • OpenSearch récepteur : le récepteur contient des OCID de cluster OpenSearch avec des noms d'index pour l'inclusion.
  • Source d'oci-object : le pré-préparateur de données prend en charge l'inclusion basée sur l'analyse à l'aide d'Object Storage, qui prend en charge de nombreuses configurations. Vous pouvez configurer la source afin d'ingérer des objets dans votre bucket Object Storage en fonction de la fréquence programmée ou sans programmation. Vous disposez des options d'analyse suivantes
    • Analyse temporelle : cette option vous permet de configurer le pipeline qui lit les objets dans les buckets Object Storage une ou plusieurs fois en fonction de l'heure de dernière modification des objets.
    • Analyse basée sur la planification : cette option vous permet de programmer une analyse sur un intervalle régulier après la création du pipeline.

Le tableau suivant répertorie les options que vous pouvez utiliser pour configurer la source Object Storage.

Configurations Object Storage
Options Requis Type Description
acknowledgments Non Booléen Lorsque true, permet aux sources d'objet Object Storage de recevoir des accusés de réception de bout en bout lorsque les événements sont reçus par les puits OpenSearch.
buffer_timeout Non Durée Durée d'écriture des événements dans le tampon Data Prepper avant expiration. Tous les événements que la source OCI ne peut pas écrire dans le tampon pendant la durée indiquée sont ignorés. La valeur par défaut est 10s.
codec Oui Codec Le code codec du préparateur de données à appliquer.
compression Non Chaîne Algorithme de compression à appliquer : none, gzip, snappy ou automatic. La valeur par défaut est none.
delete_oci_objects_on_read Non Booléen Lorsque true, l'analyse de source Object Storage tente de supprimer des objets Object Storage une fois que tous les événements de l'objet Object Storage ont fait l'objet d'un accusé de réception par tous les puits. acknowledgments doit être activé lors de la suppression d'objets Object Storage. Par défaut, false est utilisé. La suppression ne fonctionne pas si l'option acknowledgments de bout en bout n'est pas activée.
oci Non OCI Configuration OCI. Pour plus d'informations, reportez-vous à la section OCI suivante.
records_to_accumulate Non Entier Nombre de messages qui s'accumulent avant d'être écrits dans le tampon. La valeur par défaut est 100.
workers Non Entier Configure le nombre de threads actifs que la source utilise pour lire les données à partir du bucket OCI. La valeur par défaut est conservée, sauf si les objets Object Storage sont inférieurs à 1 Mo. Les performances peuvent diminuer pour les objets Object Storage de plus grande taille. La valeur par défaut est 1.

Configuration de pipeline Object Storage - YAML

Voici un exemple de la configuration de pipeline Object Storage YAML :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>
                        

Exemples de configuration

Les options d'analyse unique suivantes peuvent être appliquées au niveau d'un bucket Object Storage individuel ou au niveau de l'analyse.

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

Heure de fin

Voici un exemple d'heure de fin :


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Heure de départ et heure de fin

Voici un exemple d'heure de début et d'heure de fin :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Fourchette

Voici un exemple de plage :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

Heure de début, heure de fin et plage

Voici un exemple d'heure de début, d'heure de fin et de plage :

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

Filtre include_prefix

Voici un exemple du filtre include_prefix :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

Filtrer les fichiers du dossier

Voici un exemple de filtre de fichiers à partir d'un dossier. Pour lire des fichiers uniquement à partir de dossiers spécifiques, utilisez le filtre pour spécifier des dossiers. Voici un exemple d'inclusion de fichiers à partir de folder2 dans folder1 à l'aide de include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

Filtre exclude_prefix

Voici un exemple du filtre exclude_prefix :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

Prise en charge de Codec pour JSON

Voici un exemple de prise en charge du codec pour JSON :

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Prise en charge de Codec pour CSV

Voici un exemple de prise en charge du codec pour CSV :


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Prise en charge de Codec pour Newline

Voici un exemple de prise en charge du codec pour la nouvelle ligne :

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Options d'inclusion de planification sans comptage

Voici un exemple de planification des options d'inclusion sans comptage :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Planification des options d'inclusion avec comptage

Voici un exemple d'options d'inclusion de planification avec comptage :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Planifier les options d'inclusion avec l'heure de début

Voici un exemple d'options d'inclusion de planification avec l'heure de début :


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Planifier les options d'inclusion avec l'heure de fin

Voici un exemple d'options d'inclusion de planification avec une heure de fin :

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

La source Kafka ne nécessite aucune coordination de source.

Pour plus d'informations sur toutes les configurations disponibles pour la source Kafka, accédez au lien suivant :

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

Vous pouvez utiliser OCI Streaming Service en tant que source Kafka pour l'inclusion dans le cluster OpenSearch. Pour en savoir plus sur la procédure à suivre, reportez-vous à Utilisation des API Kafka.

Remarque

Le nombre de noeuds ne peut pas dépasser le nombre maximal de partitions défini pour le sujet.

OCI Streaming Public Access YAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Pipelines OCI Streaming Service - Accès privé YAML

Voici un exemple de YAML pour les pipelines OpenSearch dans le service OCI Streaming :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>
                    

Kafka YAML autogéré

Voici un exemple de YAML Kafka autogéré pour OpenSearch :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>
                    

Pipeline PUSH

Stratégies réseau communes

Stratégies à ajouter pour permettre au service OpenSearch de créer, de lire et de mettre à jour les adresses privées du sous-réseau client.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
                    

Tampon persistant

Les pipelines OpenSearch avec des connecteurs push Data Prepper nécessitent un tampon persistant pour stocker le tampon sur disque afin d'ajouter de la durabilité à vos données. Ceci est obligatoire dans les connecteurs push pour empêcher la perte de données en cas de dysfonctionnement de noeud. Le service OCI Streaming est le seul tampon persistant pris en charge.

Remarque

Chaque pipeline d'inclusion de données doit disposer d'un tampon dédié. Le partage du même tampon sur plusieurs pipelines peut entraîner une corruption des données en raison de chevauchements ou de données mixtes.

Le tableau suivant répertorie les composants source du pipeline d'inclusion de données et indique s'ils nécessitent une mise en mémoire tampon.

OpenSearch Exigences de mise en mémoire tampon persistante du pipeline
Source Mise en mémoire tampon persistante requise
Object Storage Non
Kafka auto-géré Non
Service OCI Streaming (public) Non
Service OCI Streaming (privé) Non
OpenTelemetry (journaux, mesures ou trace) Oui
protocole HTTP Oui
Remarque

Le nombre de noeuds ne peut pas dépasser le nombre maximal de partitions défini pour le sujet.

Stratégies

Utilisez les stratégies suivantes pour appliquer les droits d'accès liés à la mise en mémoire tampon persistante pour OCI Streaming Service avec des adresses publiques :

  • Pour autoriser les pipelines OpenSearch à utiliser et à produire les enregistrements dans le service OCI Streaming, procédez comme suit :
    Allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME, STREAM_PRODUCE} in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
    
  • Pour autoriser les pipelines OpenSearch à lire des pools de flux de données à partir du service OCI Streaming, procédez comme suit :
    Allow any-user to read stream-pools in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
    

Pour plus d'informations, reportez-vous à Détails du service Streaming.

Règles de sécurité réseau

Cette configuration est uniquement requise pour le service OCI Streaming privé et Kafka autogéré. Dans le cas du service public OCI Streaming, sélectionnez none.

Ajoutez une règle de sécurité entrante dans la liste de sécurité du sous-réseau ou du groupe de sécurité réseau à tous les pipelines OpenSearch afin de communiquer avec le service OCI Streaming privé exécuté dans votre sous-réseau.

Pour ajouter la règle de sécurité, reportez-vous à Règles de sécurité et à Accès et sécurité.

Pour rechercher le CIDR de votre sous-réseau, reportez-vous à Obtention des détails d'un sous-réseau.

L'image suivante présente les règles entrantes du groupe de sécurité réseau.

Règles entrantes pour les groupes de sécurité réseau

OpenTelemetry Journaux

OpenTelemetry Les journaux nécessitent une mise en mémoire tampon persistante.

Le port pour les journaux OpenTelemetry est 21892, ce qui ne peut pas être modifié.

Pour plus d'informations sur la configuration, reportez-vous à Source de journaux OTel.

Vous pouvez utiliser le service OCI Streaming comme tampon persistant. Pour plus d'informations, reportez-vous àUtilisation d'API Kafka.

OpenTelemetry Configuration de pipeline de journaux

L'exemple suivant présente une configuration de pipeline pour les journaux OpenTelemetry :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      pipeline-username:
        secret_id: <secret_ocid>
      pipeline-password:
        secret_id: <secret_ocid>
sample-log-pipeline:
  source:
    otel_logs_source:
      authentication:
        http_basic:
          username: '${{oci_secrets:pipeline-username}}'
          password: '${{oci_secrets:pipeline-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
        - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
               stream_pool_id: <target_stream_pool_ocid>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: 'otel-logs-%{yyyy.MM.dd}'

OpenTelemetry Mesures

OpenTelemetry Les mesures nécessitent une mise en mémoire tampon persistante.

Le port pour les mesures OpenTelemetry est 21891 et ne peut pas être modifié.

Pour plus d'informations sur la configuration, reportez-vous à Source de mesures OTel.

Vous pouvez utiliser le service OCI Streaming comme tampon persistant. Pour plus d'informations, reportez-vous àUtilisation d'API Kafka.

OpenTelemetry Configuration de pipeline de mesures

L'exemple suivant présente une configuration de pipeline pour les mesures OpenTelemetry :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      pipeline-username:
        secret_id: <secret_ocid>
      pipeline-password:
        secret_id: <secret_ocid>
sample-metrics-pipeline:
  source:
    otel_metrics_source:
      authentication:
        http_basic:
          username: '${{oci_secrets:pipeline-username}}'
          password: '${{oci_secrets:pipeline-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
        - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
              stream_pool_id: <target_stream_pool_ocid>
  sink:
    - opensearch:
        hosts: [ <cluster_ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: 'otel-metrics-%{yyyy.MM.dd}'

OpenTelemetry Trace

OpenTelemetry La trace nécessite une mise en mémoire tampon persistante.

Le port de la trace OpenTelemetry est 21890, ce qui ne peut pas être modifié.

Pour plus d'informations sur la configuration, reportez-vous à OTel trace source.

Vous pouvez utiliser le service OCI Streaming comme tampon persistant. Pour plus d'informations, reportez-vous à l'utilisation des API Kafka.

OpenTelemetry Configuration de pipeline de trace

L'exemple suivant présente une configuration de pipeline pour la trace OpenTelemetry :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      pipeline-username:
        secret_id: <secret_ocid>
      pipeline-password:
        secret_id: <secret_ocid>
traces-entry-shared-pipeline:
  source:
    otel_trace_source:
      authentication:
        http_basic:
          username: '${{oci_secrets:pipeline-username}}'
          password: '${{oci_secrets:pipeline-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
        - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
              stream_pool_id: <target_stream_pool_ocid>
  sink:
    - pipeline:
        name: traces-pipeline
    - pipeline:
        name: service-map-pipeline
traces-pipeline:
  source:
    pipeline:
      name: traces-entry-shared-pipeline
  processor:
    - otel_traces:
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index_type: trace-analytics-raw
service-map-pipeline:
  source:
    pipeline:
      name: traces-entry-shared-pipeline
  processor:
    - service_map:
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index_type: trace-analytics-service-map

Connecteur Push HTTP

Le connecteur Push HTTP requiert une mise en mémoire tampon persistante.

Le port du connecteur Push HTTP est 2021, ce qui ne peut pas être modifié.

Pour plus d'informations sur la configuration, reportez-vous à Source HTTP.

Vous pouvez utiliser le service OCI Streaming comme tampon persistant. Pour plus d'informations, reportez-vous à l'utilisation des API Kafka.

Configuration de pipeline de connecteur Push HTTP

L'exemple suivant présente une configuration de pipeline pour le connecteur Push HTTP :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      http-username:
        secret_id: <secret_ocid>
      http-password:
        secret_id: <secret_ocid>
simple-sample-pipeline:
  source:
    http:
      authentication:
        http_basic:
          username: '${{oci_secrets:http-username}}'
          password: '${{oci_secrets:http-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
          - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
              stream_pool_id: <target_stream_pool_ocid>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index_name>

Connecteur Push HTTP avec plusieurs processeurs

L'exemple suivant présente une configuration de pipeline pour le connecteur Push HTTP avec plusieurs processeurs :

# Keep caution while adding percentage symbols
version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      http-username:
        secret_id: <secret_ocid>
      http-password:
        secret_id: <secret_ocid>
simple-sample-pipeline:
  source:
    http:
      path: "/logs"
      authentication:
        http_basic:
          username: '${{oci_secrets:http-username}}'
          password: '${{oci_secrets:http-password}}'
  buffer:
    kafka:
      # Idempotence should be 'false' for OCI Streaming Service
      producer_properties:
        enable_idempotence: false
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target_stream_pool_ocid>
  processor:
    - grok:
        match:
          log: [ "%{COMMONAPACHELOG}" ]
    - date:
        from_time_received: true
        destination: "@timestamp"
    - substitute_string:
        entries:
          - source: "log"
            from: '\.'
            to: "-"
    - uppercase_string:
        with_keys:
          - "log"
    - trim_string:
        with_keys:
          - "log"
    - split_string:
        entries:
          - source: "request"
            delimiter: "?"
    - key_value:
        source: "/request/1"
        field_split_characters: "&"
        value_split_characters: "="
        destination: "query_params"
    - lowercase_string:
        with_keys:
          - "verb"
    - add_entries:
        entries:
          - key: "entry1"
            value: "entry1value"
          - key: "entry2"
            value: "entry2value"
          - key: "entry3"
            value: "entry3value"
    - rename_keys:
        entries:
          - from_key: "entry1"
            to_key: "renameEntry1"
          - from_key: "entry2"
            to_key: "renameEntry2"
          - from_key: "entry3"
            to_key: "renameEntry3"
    - copy_values:
        entries:
          - from_key: "log"
            to_key: "copy_key"
    - delete_entries:
        with_keys: [ "renameEntry1", "renameEntry2", "renameEntry3" ]
  sink:
    - opensearch:
        hosts: [ <cluster_ocid> ]
        username: '${{oci_secrets:opensearch-username}}'
        password: '${{oci_secrets:opensearch-password}}'
        insecure: false
        index: <index_name>