Create and manage OpenSearch pipelines using Data Prepper to ingest data into an OpenSearch cluster.
Data Prepper is an open source data collector that can filter, enrich, transform, normalize, and aggregate data for downstream analysis and visualisation. It's one of the most recommended data ingestion tool for processing large and complex datasets.
We use the PULL model to ingest data into OpenSearch 2.x cluster and integrate it with the Oracle Cloud Infrastructure Streaming service, Self-managed Kafka and Object Storage.
Note
The OpenSearch Cluster with Data Prepper feature is currently available in the OC1 realm.
Required Policies
Complete the following tasks before proceeding with the steps described in this topic:
If you're a non-administrator in your tenancy, contact your tenancy administrators to grant these permissions to you. The administrator must update the following users permission to allow non-administrator users to manage and CRUD operations the pipelines
The following policy allows the administrator to grant permission to all users in the respective tenancy:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
The following policy allows the administrator to grant permission for a group in a compartment (recommended)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
where <group> is all the users inside that group can access the resource.
The following policy allows OpenSearch pipelines to read the secrets from the Oracle Cloud Infrastructure
Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Object Storage Policies 🔗
These policies are only required for Object Storage source:
The following policy allows OpenSearch pipelines to use a bucket from Object Storage as the source coordination persistence:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
The following policy allows OpenSearch pipelines to ingest objects from Object Storage:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
The following policy allows OpenSearch pipelines to read buckets from Object Storage:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
The following policy allows OpenSearch pipelines to read buckets from the source coordination bucket.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCI Streaming and Self-Managed Kafka Policies 🔗
These policies are required for OCI Streaming service or self-managed Kafka sources.
Common Network Policies
Note
These policies are not needed for the public OCI Streaming service.
Policies to be added to allow OpenSearch service to create, read, update the delete the private endpoints in the customer subnet.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
OCI Streaming Service Policies (Public and Private)
These policies are only required for the OCI Streaming service.
The following policy allows OpenSearch pipelines to consume the records from the OCI Streaming service.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
The following policy allows OpenSearch pipelines to read stream pools from OCI streaming service
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Self-Managed Kafka Permission
These permissions are only required for self-managed Kafka sources.
Select the following link to add required permission to list the topics, describe the topics, to join the group and to consume the records from the topic by the OpenSearch pipeline:
This configuration is only required for Private OCI Streaming service and Self-managed Kafka. In-case of Public OCI Streaming service, select none.
Add an ingress security rule in the Security List of the subnet or the Network Security Group to all OpenSearch pipelines to communicate to the private OCI Streaming service running in your subnet.
The following image shows the ingress rules for the Network Security Group.
Creating the Secrets in the Vault 🔗
All plain text secrets that are required by the OpenSearch pipelines should be passed to it through Vault, as OpenSearch pipelines doesn't accept plain text secrets such as usernames and passwords in the pipelines yaml code.
Perform the following tasks:
Create a new user with write permissions, in your OpenSearch cluster. For instructions, see the following OpenSearch documentation topics:
Create secrets (username and password) in Vault for the new user you created. For instructions, see the following Oracle Cloud Infrastructure
Vault topics:
Add the following resource principal policies in your OpenSearch tenancy to allow OpenSearch pipelines to read the secrets from the Vault.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Object Storage source depends the source coordination configuration. The OpenSearch pipeline support source coordination using Object Storage as persistence. You must provide the Object Storage bucket details from your tenancy.
The following is an example of source coordination:
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
The following are sections of the pipeline configuration YAML to be aware of:
OCI Secrets: Yo can create a secret in Vault with your OpenSearch cluster credentials and use it in the pipeline YAML for connecting to the OpenSearch cluster.
OpenSearch Sink: The sink contains OpenSearch cluster OCIDs with index names for ingestion.
oci-object source: The Data Prepper supports scan-based ingestion using Object Storage which has many configurations supported. You can configure the source to ingest objects within you Object Storage bucket based on scheduled frequency or not based on any schedule. You have the following scan options
One or more time scan: This option allows you to configure pipeline which reads objects in Object Storage buckets one or more times based on last modified time of objects.
Scheduling based scan: This option allows you to schedule a scan on a regular interval after pipeline is created.
The following table lists the options you can use to configure the Object Storage source.
Object Storage Configurations
Options
Required
Type
Description
acknowledgments
No
Boolean
When true, enables Object Storage object sources to receive end-to-end acknowledgments when events are received by OpenSearch sinks.
buffer_timeout
No
Duration
The amount of time allowed for writing events to the Data Prepper buffer before timeout occurs. Any events that the OCI source can't write to the buffer during the specified amount of time are discarded. Default is 10s.
The compression algorithm to apply: none, gzip, snappy, or automatic. Default is none.
delete_oci_objects_on_read
No
Boolean
When true, the Object Storage source scan attempts to delete Object Storage objects after all events from the Object Storage object are successfully acknowledged by all sinks. acknowledgments should be enabled when deleting Object Storage objects. Default is false. Delete doesn't work if end-to-end acknowledgments isn't enabled.
oci
No
OCI
The OCI configuration. See the following OCI section for more information.
records_to_accumulate
No
Integer
The number of messages that accumulate before being written to the buffer. Default is 100.
workers
No
Integer
Configures the number of worker threads that the source uses to read data from OCI bucket. Leaving this value at the default unless your Object Storage objects are less than 1 MB. Performance may decrease for larger Object Storage objects. Default is 1.
The following is an example of the Object Storage pipeline configuration YAML:
The following is an example of filter files from folder. To read files only from specific folders, use filter to specify folders. Here is an example of including files from folder2 within folder1 using include_prefix.
You can use the OCI Streaming Service as the Kafka source to ingest into the OpenSearch cluster. For information on how to do this, see Using Kafka APIs.