Kafka .NET Client and Streaming Quickstart

This quickstart shows you how to use the Kafka .NET client with Oracle Cloud Infrastructure Streaming to publish and consume messages. These examples use C# language.

See Using Streaming with Apache Kafka for more information. Refer to the Overview of Streaming for key concepts and more Streaming details.

Prerequisites

Note

In this quickstart, we create and run a simple .NET console application by using Visual Studio Code and the .NET CLI. Project tasks, such as creating, compiling, and running a project are done by using the .NET CLI. If you prefer, you can follow this tutorial with a different IDE and run commands in a terminal.
  1. To use the Kafka .NET client with Streaming, you must have the following:

    • An Oracle Cloud Infrastructure account.
    • A user created in that account, in a group with a policy that grants the required permissions. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
  2. Collect the following details:

    • Stream OCID
    • Messages endpoint
    • Stream pool OCID
    • Stream pool FQDN
    • Kafka connection settings:
      • Bootstrap servers
      • SASL connection strings
      • Security protocol

    See Listing Streams and Stream Pools for instructions on viewing stream details. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream. Streams correspond to a Kafka topic.

  3. Install .NET 5.0 SDK or later. Ensure that dotnet is set in your PATH environment variable.
  4. Visual Studio Code (recommended) with the C# extension installed. For information about how to install extensions on Visual Studio Code, see VS Code Extension Marketplace.

  5. Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism. Refer to Working with Auth Tokens for auth token generation. If you created the stream and stream pool in OCI, you are already authorized to use this stream according to OCI IAM, so you should create auth tokens for your OCI user.

    Note

    OCI user auth tokens are visible only at the time of creation. Copy it and keep it somewhere safe for future use.
  6. Install the SSL CA root certificates on the host where you are developing and running this quickstart. The client uses CA certificates to verify the broker's certificate.

    For Windows, download the cacert.pem file distributed with curl (download cacert.pm). For other platforms, refer to Configure SSL trust store.

Producing Messages

  1. Open your favorite editor, such as Visual Studio Code, from the empty working directory wd.
  2. Open the terminal and cd into the wd directory.
  3. Create a C# .NET console application by running the following command in the terminal:

    dotnet new console

    You should see a message indicating that the application was created:

    The template "Console Application" was created successfully.

    This creates a Program.cs file with C# code for a simple "HelloWorld" application.

  4. To reference the confluent-kafka-dotnet library in your new .NET Core project, run the following command in your project's directory wd:

    dotnet add package Confluent.Kafka
  5. Replace the code in Program.cs in the wd directory with following code. Replace values of variables in the map ProducerConfig and the name of topic with the details you gathered in the prerequisites:

    using System;
    using Confluent.Kafka;
    
    namespace OssProducerWithKafkaApi
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ProducerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Produce("<topic_stream_name>", config); // use the name of the stream you created
    
            }
    
            static void Produce(string topic, ClientConfig config)
            {
                using (var producer = new ProducerBuilder<string, string>(config).Build())
                {
                    int numProduced = 0;
                    int numMessages = 10;
                    for (int i=0; i<numMessages; ++i)
                    {
                        var key = "messageKey" + i;
                        var val = "messageVal" + i;
    
                        Console.WriteLine($"Producing record: {key} {val}");
    
                        producer.Produce(topic, new Message<string, string> { Key = key, Value = val },
                            (deliveryReport) =>
                            {
                                if (deliveryReport.Error.Code != ErrorCode.NoError)
                                {
                                    Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                                }
                                else
                                {
                                    Console.WriteLine($"Produced message to: {deliveryReport.TopicPartitionOffset}");
                                    numProduced += 1;
                                }
                            });
                    }
    
                    producer.Flush(TimeSpan.FromSeconds(10));
    
                    Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
                }
            }
        }
    }
  6. From the wd directory, run the following command:

    dotnet run
  7. Use the Console to see the latest messages sent to the stream to verify that production was successful.

Consuming Messages

  1. First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
  2. Open your favorite editor, such as Visual Studio Code, from the empty working directory wd.
  3. Create a C# .NET console application by running the following command on the terminal:

    dotnet new console

    You should see a message indicating that the application was created:

    The template "Console Application" was created successfully.

    This creates a Program.cs file with C# code for a simple "HelloWorld" application.

  4. To reference the confluent-kafka-dotnet library in your new .NET Core project, run the following command in your project's directory wd:

    dotnet add package Confluent.Kafka
  5. Replace the code in Program.cs in the wd directory with following code. Replace values of variables in the map ProducerConfig and the name of topic with the details you gathered in the prerequisites:

    using System;
    using Confluent.Kafka;
    using System.Threading;
    
    namespace OssKafkaConsumerDotnet
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("Demo for using Kafka APIs seamlessly with OSS");
    
                var config = new ConsumerConfig {
                                BootstrapServers = "<bootstrap_servers_endpoint>", //usually of the form cell-1.streaming.[region code].oci.oraclecloud.com:9092
                                SslCaLocation = "<path\to\root\ca\certificate\*.pem>",
                                SecurityProtocol = SecurityProtocol.SaslSsl,
                                SaslMechanism = SaslMechanism.Plain,
                                SaslUsername = "<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>",
                                SaslPassword = "<your_OCI_user_auth_token>", // use the auth-token you created step 5 of Prerequisites section 
                                };
    
                Consume("<topic_stream_name>", config); // use the name of the stream you created
            }
            static void Consume(string topic, ClientConfig config)
            {
                var consumerConfig = new ConsumerConfig(config);
                consumerConfig.GroupId = "dotnet-oss-consumer-group";
                consumerConfig.AutoOffsetReset = AutoOffsetReset.Earliest;
                consumerConfig.EnableAutoCommit = true;
    
                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    e.Cancel = true; // prevent the process from terminating.
                    cts.Cancel();
                };
    
                using (var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build())
                {
                    consumer.Subscribe(topic);
                    try
                    {
                        while (true)
                        {
                            var cr = consumer.Consume(cts.Token);
                            string key = cr.Message.Key == null ? "Null" : cr.Message.Key;
                            Console.WriteLine($"Consumed record with key {key} and value {cr.Message.Value}");
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        //exception might have occurred since Ctrl-C was pressed.
                    }
                    finally
                    {
                        // Ensure the consumer leaves the group cleanly and final offsets are committed.
                        consumer.Close();
                    }
                }
            }
    
        }
    }
  6. From the wd directory, run the following command:

    dotnet run
  7. You should see messages similar to the following:

    Demo for using Kafka APIs seamlessly with OSS
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    
    Note

    If you used the Console to produce a test message, the key for each message is Null