OpenSearch Pipelines
Erstellen und verwalten Sie OpenSearch-Pipelines mit Data Prepper, um Daten in ein OpenSearch-Cluster aufzunehmen.
Data Prepper ist ein Open-Source-Datensammler, der Daten für Downstreamanalysen und -visualisierungen filtern, anreichern, transformieren, normalisieren und aggregieren kann. Es ist eines der am meisten empfohlenen Tools für die Datenaufnahme zur Verarbeitung großer und komplexer Datasets.
Die OpenSearch-Pipelines sind mit OpenSearch-Clustern kompatibel, auf denen Version 2.x und höher ausgeführt werden.
Das Feature "Cluster mit Daten-Prepper" OpenSearch ist nur in der Realm OC1 verfügbar.
Erforderliche Policys
Führen Sie die in diesem Abschnitt beschriebenen Richtlinienanforderungen durch, bevor Sie mit den in diesem Thema beschriebenen Schritten fortfahren.
Wenn Sie kein Administrator in Ihrem Mandanten sind, bitten Sie Ihre Mandantenadministratoren, Ihnen diese Berechtigungen zu erteilen. Der Administrator muss die folgenden Benutzerberechtigungen aktualisieren, damit Nicht-Administratorbenutzer die Pipelines verwalten und CRUD-Vorgänge ausführen können
Mit der folgenden Policy kann der Administrator allen Benutzern im jeweiligen Mandanten Berechtigungen erteilen:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
Mit der folgenden Policy kann der Administrator eine Berechtigung für eine Gruppe in einem Compartment erteilen (empfohlen)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
wobei <group>
alle Benutzer innerhalb dieser Gruppe auf die Ressource zugreifen können.
Mit der folgenden Policy können OpenSearch-Pipelines die Secrets aus Oracle Cloud Infrastructure Vault lesen.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
OpenSearch-Indexnamen müssen den folgenden Regeln entsprechen:
- Alle Buchstaben müssen Kleinbuchstaben sein.
- Namen dürfen nicht mit einem Unterstrich ( _ ) oder Bindestrich (-) beginnen.
- Namen dürfen keine Leerzeichen, Kommas oder eines der folgenden Zeichen enthalten: :, ", *, +, /, \, |, ?, #, >, <
- Der Indexname kann einen Data Prepper-Ausdruck enthalten.
Secrets im Vault erstellen
Alle Klartext-Secrets, die für die OpenSearch-Pipelines erforderlich sind, müssen über Vault an sie übergeben werden, da OpenSearch-Pipelines keine Klartext-Secrets wie Benutzernamen und Kennwörter im YAML-Code der Pipelines akzeptieren.
Führen Sie die folgenden Aufgaben aus:
- Erstellen Sie einen neuen Benutzer mit Schreibberechtigungen im OpenSearch-Cluster. Anweisungen finden Sie in den folgenden Dokumentationsthemen zu OpenSearch:
- Erstellen Sie Secrets (Benutzername und Kennwort) in Vault für den neu erstellten Benutzer. Anweisungen finden Sie in den folgenden Oracle Cloud Infrastructure Vault-Themen:
- Fügen Sie die folgenden Resource Principal-Policys im OpenSearch-Mandanten hinzu, damit OpenSearch-Pipelines die Secrets aus dem Vault lesen können.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Weitere Informationen zum Erstellen von Policys für Vault finden Sie unter Details zum Vault-Service.
Sie können die folgenden OpenSearch-Pipelineaufgaben ausführen:
Listen Sie die OpenSearch-Pipelines in einem Compartment auf.
Neue OpenSearch-Pipeline erstellen.
Details einer OpenSearch-Pipeline abrufen.
Unterstützte Prozessore
In der folgenden Tabelle werden diese Prozessoren aufgeführt, die in der Pipeline unterstützt werden.
PULL Pipelines
Object Storage-Policys
Diese Policys sind nur für die Object Storage-Quelle erforderlich:
Mit der folgenden Policy können OpenSearch-Pipelines einen Bucket aus Object Storage als Quellkoordinationspersistenz verwenden:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
Mit der folgenden Policy können OpenSearch-Pipelines Objekte aus Object Storage aufnehmen:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Mit der folgenden Policy können OpenSearch-Pipelines Buckets aus Object Storage lesen:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Mit der folgenden Policy können OpenSearch-Pipelines Buckets aus dem Quellkoordinations-Bucket lesen.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCI Streaming und selbstverwaltete Kafka-Policys
Diese Policys sind für OCI Streaming-Service oder selbstverwaltete Kafka-Quellen erforderlich.
Allgemeine Netzwerk-Policys
Diese Policys sind für den öffentlichen OCI Streaming-Service nicht erforderlich.
Policys, die hinzugefügt werden, damit der OpenSearch-Service die privaten Endpunkte im Kundensubnetz erstellen, lesen und aktualisieren kann.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
OCI Streaming Service-Richtlinien (öffentlich und privat)
Diese Policys sind nur für den OCI Streaming-Service erforderlich.
Mit der folgenden Policy können OpenSearch-Pipelines die Datensätze aus dem OCI Streaming-Service konsumieren.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
Mit der folgenden Policy können OpenSearch-Pipelines Streampools aus dem OCI-Streamingservice lesen
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Selbstverwaltete Kafka-Berechtigung
Diese Berechtigungen sind nur für selbstverwaltete Kafka-Quellen erforderlich.
Wählen Sie den folgenden Link aus, um die erforderliche Berechtigung hinzuzufügen, um die Themen aufzulisten, die Themen zu beschreiben, der Gruppe beizutreten und die Datensätze aus dem Thema von der Pipeline OpenSearch zu konsumieren:
Netzwerksicherheitsregeln
Diese Konfiguration ist nur für den privaten OCI Streaming-Service und das selbstverwaltete Kafka erforderlich. Wählen Sie im Falle eines öffentlichen OCI-Streaming-Service keine Option aus.
Fügen Sie in der Sicherheitsliste des Subnetzes oder der Netzwerksicherheitsgruppe allen OpenSearch-Pipelines eine Ingress-Sicherheitsregel hinzu, um mit dem privaten OCI-Streaming-Service zu kommunizieren, der in Ihrem Subnetz ausgeführt wird.
Informationen zum Hinzufügen der Sicherheitsregel finden Sie unter Sicherheitsregeln und Zugriff und Sicherheit.
Informationen zum Suchen des CIDR Ihres Subnetzes finden Sie unter Details eines Subnetzes abrufen.
Die folgende Abbildung zeigt die Ingress-Regeln für die Netzwerksicherheitsgruppe.

Object Storage und Quellkoordination YAML
Die Object Storage-Quelle hängt von der Konfiguration der Quellkoordination ab. Die OpenSearch-Pipeline unterstützt die Quellkoordination mit Object Storage als Persistenz. Sie müssen die Details des Objektspeicher-Buckets aus Ihrem Mandanten angeben.
Im Folgenden finden Sie ein Beispiel für die Quellkoordination:
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Informationen zum Abrufen des Object Storage-Namespace Ihres Mandanten finden Sie unter Object Storage-Namespaces.
Im Folgenden finden Sie ein Beispiel für die Quellkoordination mit Object Storage-Persistence:
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
Objektspeicher-YAML
Die folgenden Abschnitte der Pipelinekonfiguration YAML sind zu beachten:
- OCI-Secrets: Sie können ein Secret in Vault mit den Zugangsdaten des OpenSearch-Clusters erstellen und es in der Pipeline-YAML für die Verbindung zum Cluster OpenSearch verwenden.
- OpenSearch Sink: Die Sink-Datei enthält OpenSearch-Cluster-OCIDs mit Indexnamen für die Aufnahme.
-
oci-object source: Data Prepper unterstützt die scanbasierte Aufnahme mit Object Storage, für die zahlreiche Konfigurationen unterstützt werden. Sie können die Quelle so konfigurieren, dass Objekte in Ihrem Objektspeicher-Bucket basierend auf der geplanten Häufigkeit oder nicht basierend auf einem Zeitplan aufgenommen werden. Sie haben folgende Scan-Optionen
- Ein oder mehrere Zeitscans: Mit dieser Option können Sie eine Pipeline konfigurieren, die Objekte in Object Storage-Buckets ein- oder mehrmals basierend auf der letzten Änderungszeit von Objekten liest.
- Planungsbasierter Scan: Mit dieser Option können Sie einen Scan in einem regulären Intervall planen, nachdem die Pipeline erstellt wurde.
In der folgenden Tabelle sind die Optionen aufgeführt, mit denen Sie die Object Storage-Quelle konfigurieren können.
Optionen | Erforderlich | Typ | Beschreibung |
---|---|---|---|
acknowledgments
|
Nein | Boolescher Wert | Wenn true verwendet wird, können Object Storage-Objektquellen End-to-End-Bestätigungen empfangen, wenn Ereignisse von OpenSearch-Senken empfangen werden. |
buffer_timeout
|
Nein | Zeitraum | Die zulässige Zeit für das Schreiben von Ereignissen in den Data Prepper-Puffer, bevor ein Timeout auftritt. Alle Ereignisse, die die OCI-Quelle während der angegebenen Zeit nicht in den Puffer schreiben kann, werden verworfen. Der Standardwert ist 10s . |
codec
|
Ja | Codec | Der codec vom anzuwendenden Data-Prepper. |
compression
|
Nein | Zeichenfolge | Der anzuwendende Komprimierungsalgorithmus: none , gzip , snappy oder automatic . Der Standardwert ist none . |
delete_oci_objects_on_read
|
Nein | Boolescher Wert | Wenn true verwendet wird, versucht der Object Storage-Quellscan, Object Storage-Objekte zu löschen, nachdem alle Ereignisse aus dem Object Storage-Objekt erfolgreich von allen Senken bestätigt wurden. acknowledgments muss beim Löschen von Object Storage-Objekten aktiviert sein. Der Standardwert ist false . Löschen funktioniert nicht, wenn End-to-End-acknowledgments nicht aktiviert ist. |
oci
|
Nein | OCI | Die OCI-Konfiguration. Weitere Informationen finden Sie im folgenden Abschnitt zu OCI. |
records_to_accumulate
|
Nein | Ganzzahl | Die Anzahl der Nachrichten, die sich ansammeln, bevor sie in den Puffer geschrieben werden. Der Standardwert ist 100 . |
workers
|
Nein | Ganzzahl | Konfiguriert die Anzahl der Worker-Threads, mit denen die Quelle Daten aus dem OCI-Bucket liest. Dieser Wert wird standardmäßig beibehalten, es sei denn, Ihre Object Storage-Objekte sind weniger als 1 MB. Bei größeren Object Storage-Objekten kann sich die Performance verringern. Der Standardwert ist 1 . |
Object Storage-Pipelinekonfiguration - YAML
Im Folgenden finden Sie ein Beispiel für die YAML-Konfiguration der Object Storage-Pipeline:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Beispielkonfigurationen
Im Folgenden finden Sie Optionen zum einmaligen Scannen, die Sie auf Ebene des einzelnen Object Storage-Buckets oder auf Scanebene anwenden können.
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Endzeit
Im Folgenden finden Sie ein Beispiel für die Endzeit:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Start- und Enduhrzeit
Im Folgenden finden Sie ein Beispiel für Start- und Endzeit:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
Range
Im Folgenden finden Sie ein Beispiel für einen Bereich:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Startzeit, Endzeit und Bereich
Im Folgenden finden Sie ein Beispiel für Startzeit, Endzeit und Bereich:
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
include_prefix
Filter
Im Folgenden finden Sie ein Beispiel für den include_prefix
-Filter:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Dateien aus Ordner filtern
Im Folgenden finden Sie ein Beispiel für Filterdateien aus dem Ordner. Um Dateien nur aus bestimmten Ordnern zu lesen, verwenden Sie den Filter, um Ordner anzugeben. Im Folgenden finden Sie ein Beispiel für das Einschließen von Dateien aus folder2 in folder1 mit include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
exclude_prefix
Filter
Im Folgenden finden Sie ein Beispiel für den Filter exclude_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Codec-Unterstützung für JSON
Im Folgenden finden Sie ein Beispiel für die Codec-Unterstützung für JSON:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Codec-Unterstützung für CSV
Im Folgenden finden Sie ein Beispiel für die Codec-Unterstützung für CSV:
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Codec-Unterstützung für Newline
Im Folgenden finden Sie ein Beispiel für die Codec-Unterstützung für Newline:
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Aufnahmeoptionen ohne Inventur planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen ohne Zählung:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Aufnahmeoptionen mit Anzahl planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen mit der Anzahl:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Aufnahmeoptionen mit Startzeit planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen mit der Startzeit:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Aufnahmeoptionen mit Endzeit planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen mit Endzeit:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
Für die Kafka-Quelle ist keine Quellkoordination erforderlich.
Informationen zu allen verfügbaren Konfigurationen für die Kafka-Quelle finden Sie unter folgendem Link:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
Sie können den OCI Streaming Service als Kafka-Quelle für die Aufnahme in das OpenSearch-Cluster verwenden. Weitere Informationen hierzu finden Sie unter Kafka-APIs verwenden.
Die Anzahl der Knoten darf die maximale Anzahl der Partitionen nicht überschreiten, die für das Thema definiert sind.
OCI Streaming - Öffentlicher Zugriff - YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Pipelines OCI Streaming Service - Privater Zugriff YAML
Im Folgenden finden Sie ein Beispiel für die YAML für die OpenSearch-Pipelines im OCI Streaming-Service:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Selbstverwalteter Kafka YAML
Im Folgenden finden Sie ein Beispiel für die selbstverwaltete Kafka-YAML für die OpenSearch:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
PUSH Pipeline
Allgemeine Netzwerk-Policys
Policys, die hinzugefügt werden, damit der OpenSearch-Service die privaten Endpunkte im Kundensubnetz erstellen, lesen und aktualisieren kann.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Persistenter Puffer
OpenSearch-Pipelines mit Data Prepper-Push-Connectors erfordern einen persistenten Puffer, um den datenträgerbasierten Puffer zu speichern und Dauerhaftigkeit zu Ihren Daten hinzuzufügen. Dies ist in Push-Connectors erforderlich, um Datenverlust bei einem Ausfall des Knotens zu verhindern. OCI Streaming Service ist der einzige unterstützte persistente Puffer.
Jede Datenaufnahme-Pipeline muss über einen dedizierten Puffer verfügen. Die gemeinsame Nutzung desselben Puffers über mehrere Pipelines hinweg kann zu Datenbeschädigungen aufgrund von sich überschneidenden oder gemischten Daten führen.
In der folgenden Tabelle sind die Quellkomponenten der Datenaufnahme-Pipeline und deren Pufferung aufgeführt.
Quelle | Persistentes Puffering erforderlich |
---|---|
Objektspeicher | Nein |
Selbstverwaltete Kafka | Nein |
OCI Streaming-Service (öffentlich) | Nein |
OCI Streaming-Service (privat) | Nein |
OpenTelemetry (Logs, Metriken oder Trace) | Ja |
HTTP | Ja |
Die Anzahl der Knoten darf die maximale Anzahl der Partitionen nicht überschreiten, die für das Thema definiert sind.
Policys
Verwenden Sie die folgenden Policys, um Berechtigungen im Zusammenhang mit dem persistenten Puffering für OCI Streaming Service mit öffentlichen Endpunkten anzuwenden:
- So lassen Sie zu, dass OpenSearch-Pipelines die Datensätze im OCI Streaming Service konsumieren und produzieren:
Allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME, STREAM_PRODUCE} in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
- So lassen Sie zu, dass OpenSearch-Pipelines Streampools aus dem OCI Streaming Service lesen:
Allow any-user to read stream-pools in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
Weitere Informationen finden Sie unter Details zum Streaming-Service.
Netzwerksicherheitsregeln
Diese Konfiguration ist nur für den privaten OCI Streaming-Service und das selbstverwaltete Kafka erforderlich. Wählen Sie bei einem öffentlichen OCI-Streaming-Service keine Option aus.
Fügen Sie in der Sicherheitsliste des Subnetzes oder der Netzwerksicherheitsgruppe allen OpenSearch-Pipelines eine Ingress-Sicherheitsregel hinzu, um mit dem privaten OCI-Streaming-Service zu kommunizieren, der in Ihrem Subnetz ausgeführt wird.
Informationen zum Hinzufügen der Sicherheitsregel finden Sie unter Sicherheitsregeln und Zugriff und Sicherheit.
Informationen zum Suchen des CIDR Ihres Subnetzes finden Sie unter Details eines Subnetzes abrufen.
Die folgende Abbildung zeigt die Ingress-Regeln für die Netzwerksicherheitsgruppe.

OpenTelemetry Logs
Für OpenTelemetry-Logs ist eine persistente Pufferung erforderlich.
Der Port für OpenTelemetry-Logs ist 21892, der nicht geändert werden kann.
Konfigurationsinformationen finden Sie unter OTel-Logquelle.
Sie können den OCI Streaming Service als persistenten Puffer verwenden. Weitere Informationen finden Sie unter Kafka-APIs verwenden.
OpenTelemetry Logpipelinekonfiguration
Das folgende Beispiel zeigt eine Pipelinekonfiguration für OpenTelemetry-Logs:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
pipeline-username:
secret_id: <secret_ocid>
pipeline-password:
secret_id: <secret_ocid>
sample-log-pipeline:
source:
otel_logs_source:
authentication:
http_basic:
username: '${{oci_secrets:pipeline-username}}'
password: '${{oci_secrets:pipeline-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: 'otel-logs-%{yyyy.MM.dd}'
OpenTelemetry-Metriken
Für OpenTelemetry-Metriken ist eine persistente Pufferung erforderlich.
Der Port für OpenTelemetry-Metriken ist 21891, der nicht geändert werden kann.
Konfigurationsinformationen finden Sie unter OTel-Metrikquelle.
Sie können den OCI Streaming Service als persistenten Puffer verwenden. Weitere Informationen finden Sie unter Kafka-APIs verwenden.
OpenTelemetry Metrikpipelinekonfiguration
Das folgende Beispiel zeigt eine Pipelinekonfiguration für OpenTelemetry-Metriken:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
pipeline-username:
secret_id: <secret_ocid>
pipeline-password:
secret_id: <secret_ocid>
sample-metrics-pipeline:
source:
otel_metrics_source:
authentication:
http_basic:
username: '${{oci_secrets:pipeline-username}}'
password: '${{oci_secrets:pipeline-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- opensearch:
hosts: [ <cluster_ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: 'otel-metrics-%{yyyy.MM.dd}'
OpenTelemetry-Trace
Für das OpenTelemetry-Trace ist eine persistente Pufferung erforderlich.
Der Port für OpenTelemetry Trace ist 21890, der nicht geändert werden kann.
Konfigurationsinformationen finden Sie unter OTel Tracequelle.
Sie können den OCI Streaming Service als persistenten Puffer verwenden. Weitere Informationen finden Sie unter Kafka-APIs verwenden.
OpenTelemetry Trace-Pipelinekonfiguration
Das folgende Beispiel zeigt eine Pipelinekonfiguration für OpenTelemetry-Trace:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
pipeline-username:
secret_id: <secret_ocid>
pipeline-password:
secret_id: <secret_ocid>
traces-entry-shared-pipeline:
source:
otel_trace_source:
authentication:
http_basic:
username: '${{oci_secrets:pipeline-username}}'
password: '${{oci_secrets:pipeline-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- pipeline:
name: traces-pipeline
- pipeline:
name: service-map-pipeline
traces-pipeline:
source:
pipeline:
name: traces-entry-shared-pipeline
processor:
- otel_traces:
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index_type: trace-analytics-raw
service-map-pipeline:
source:
pipeline:
name: traces-entry-shared-pipeline
processor:
- service_map:
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index_type: trace-analytics-service-map
HTTP-Push-Connector
HTTP-Push Connector erfordert persistente Pufferung.
Der Port für HTTP Push Connector ist 2021, der nicht geändert werden kann.
Konfigurationsinformationen finden Sie unter HTTP-Quelle.
Sie können den OCI Streaming Service als persistenten Puffer verwenden. Weitere Informationen finden Sie unter Kafka-APIs verwenden.
Konfiguration der HTTP-Push-Connector-Pipeline
Das folgende Beispiel zeigt eine Pipelinekonfiguration für HTTP-Push-Connector:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
http-username:
secret_id: <secret_ocid>
http-password:
secret_id: <secret_ocid>
simple-sample-pipeline:
source:
http:
authentication:
http_basic:
username: '${{oci_secrets:http-username}}'
password: '${{oci_secrets:http-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index_name>
HTTP Push Connector mit mehreren Prozessoren
Das folgende Beispiel zeigt eine Pipelinekonfiguration für HTTP-Push-Connector mit mehreren Prozessoren:
# Keep caution while adding percentage symbols
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
http-username:
secret_id: <secret_ocid>
http-password:
secret_id: <secret_ocid>
simple-sample-pipeline:
source:
http:
path: "/logs"
authentication:
http_basic:
username: '${{oci_secrets:http-username}}'
password: '${{oci_secrets:http-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG}" ]
- date:
from_time_received: true
destination: "@timestamp"
- substitute_string:
entries:
- source: "log"
from: '\.'
to: "-"
- uppercase_string:
with_keys:
- "log"
- trim_string:
with_keys:
- "log"
- split_string:
entries:
- source: "request"
delimiter: "?"
- key_value:
source: "/request/1"
field_split_characters: "&"
value_split_characters: "="
destination: "query_params"
- lowercase_string:
with_keys:
- "verb"
- add_entries:
entries:
- key: "entry1"
value: "entry1value"
- key: "entry2"
value: "entry2value"
- key: "entry3"
value: "entry3value"
- rename_keys:
entries:
- from_key: "entry1"
to_key: "renameEntry1"
- from_key: "entry2"
to_key: "renameEntry2"
- from_key: "entry3"
to_key: "renameEntry3"
- copy_values:
entries:
- from_key: "log"
to_key: "copy_key"
- delete_entries:
with_keys: [ "renameEntry1", "renameEntry2", "renameEntry3" ]
sink:
- opensearch:
hosts: [ <cluster_ocid> ]
username: '${{oci_secrets:opensearch-username}}'
password: '${{oci_secrets:opensearch-password}}'
insecure: false
index: <index_name>