Using Kafka Connect
This information describes using Kafka Connect.
Managing Kafka Connect Configurations provides steps for using the Console, CLI, and API. Steps for using the OCI SDKs are supplied below.
To use your Kafka connectors with Oracle Cloud Infrastructure Streaming, create a Kafka Connect configuration using the Console or the command line interface (CLI). The Streaming API calls these configurations harnesses.
Kafka Connect configurations created in a given compartment work only for streams in the same compartment.
You can use multiple Kafka connectors with the same Kafka Connect configuration. In cases that require producing or consuming streams in separate compartments, or where more capacity is required to avoid hitting throttle limits on the Kafka Connect configuration (for example: too many connectors, or connectors with too many workers), you can create more Kafka Connector configurations.
For more information on managing Kafka Connect configurations using the Console and Streaming API, see Managing Kafka Connect Configurations.
Kafka Connectors
Streaming's Kafka Connect compatibility means that you can take advantage of the many existing first- and third-party connectors to move data from your sources to your targets.
Kafka connectors for Oracle products:
- Oracle Cloud Infrastructure
Object Storage (Using Kafka Connect for S3)
- Kafka Connect Amazon S3 source connector, for producers
- Kafka Connect Amazon S3 sink connector, for consumers
- Oracle Integration Cloud
- Oracle Database (Using Kafka Connect JDBC)
- Oracle GoldenGate
For a complete list of third-party Kafka source and sink connectors, refer to the official Confluent Kafka hub.
Kafka Connect Topics
The Streaming service automatically creates the three topics (config, offset, and status) that are required to use Kafka Connect when you create the Kafka Connect configuration. These topics contain the OCID of the Kafka Connect configuration in their names.
Place these topic names in the connect-distributed.properties
file of the Kafka connector that you want to use with Streaming.
For example:
# Relevant Kafka Connect setting
config.storage.topic:<connect_configuration_OCID>-config
offset.storage.topic:<connect_configuration_OCID>-offset
status.storage.topic:<connect_configuration_OCID>-status
These three compacted topics are meant to be used by Kafka Connect and Streaming to store configuration and state management data, and should not be used to store your data. To ensure that the Kafka Connect configuration topics are being used for their intended purpose by the connectors, there are hard throttle limits of 50 kb/s and 50 rps in place for these topics.
Bootstrap Server
Set the bootstrap server in your Kafka connector properties file to the endpoint for Streaming on port 9092. For example:
streaming.us-phoenix-1.oci.oraclecloud.com:9092
For a list of endpoints for Streaming, see the Streaming section in API Reference and Endpoints.
Authentication
Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism. You can generate tokens in the Console user details page. See Working with Auth Tokens for more information.
It's a good idea to create a dedicated group/user and grant that group the permission to manage streams in the appropriate compartment or tenancy. You then can generate an auth token for the user you created and use it in your Kafka client configuration.
Example Kafka Connector Properties File
The following shows an example Kafka connector connect-distributed.properties
file:
bootstrap.servers=<streaming_endpoint>:9092
sasl.mechanism=PLAIN
security.protocol=SASL_PLAINTEXT
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
producer.sasl.mechanism=PLAIN
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<userid>" password="<authToken>";
config.storage.topic:<connect_configuration_OCID>-config
offset.storage.topic:<connect_configuration_OCID>-offset
status.storage.topic:<connect_configuration_OCID>-status
Required IAM Policy
To use Oracle Cloud Infrastructure, you must be granted security access in a policy by an administrator. This access is required whether you're using the Console or the REST API with an SDK, CLI, or other tool. If you get a message that you don't have permission or are unauthorized, verify with your administrator what type of access you have and which compartment to work in.
To allow a group to manage Kafka Connect configurations, you need to create the correct policy in your tenancy. For example:
allow group <identity_domain_name>/<group_name> KafkaAdmins to manage connect-harnesses in tenancy
For administrators: The policy in Let streaming admins manage streaming resources lets the specified group do everything with streaming and related Streaming service resources.
If you're new to policies, see Getting Started with Policies and Common Policies. If you want to dig deeper into writing policies for the Streaming service, see Details for the Streaming Service in the IAM policy reference. For more information, see:
Using the SDKs for Managing Kafka Connect Configurations
In order to use Kafka Connect with Streaming, you need a Kafka Connect configuration, or Kafka Connect harness. You can retrieve the OCID for a harness when you create a new harness or use an existing one. For more information, see Using Kafka Connect.
The following code example shows how to create a Kafka Connect harness using the OCI SDK for Java:
CreateConnectHarnessDetails createConnectHarnessDetails = CreateConnectHarnessDetails.builder()
.compartmentId(compartment) //compartment where you want to create connect harness
.name("myConnectHarness") //connect harness name
.build();
CreateConnectHarnessRequest connectHarnessRequest = CreateConnectHarnessRequest.builder()
.createConnectHarnessDetails(createConnectHarnessDetails)
.build();
CreateConnectHarnessResponse createConnectHarnessResponse = streamAdminClient.createConnectHarness(connectHarnessRequest);
ConnectHarness connectHarness = createConnectHarnessResponse.getConnectHarness();
while (connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Active && connectHarness.getLifecycleState() != ConnectHarness.LifecycleState.Failed) {
GetConnectHarnessRequest getConnectHarnessRequest = GetConnectHarnessRequest.builder().connectHarnessId(connectHarness.getId()).build();
connectHarness = streamAdminClient.getConnectHarness(getConnectHarnessRequest).getConnectHarness();
}
The following code example shows how to list Kafka Connect harnesses using the OCI SDK for Java:
ListConnectHarnessesRequest listConnectHarnessesRequest = ListConnectHarnessesRequest.builder()
.compartmentId(compartment) // compartment id to list all the connect harnesses.
.lifecycleState(ConnectHarnessSummary.LifecycleState.Active)
.build();
ListConnectHarnessesResponse listConnectHarnessesResponse = streamAdminClient.listConnectHarnesses(listConnectHarnessesRequest);
List<ConnectHarnessSummary> items = listConnectHarnessesResponse.getItems();