Kafka Python Client and Streaming Quickstart

This quickstart shows you how to use the Kafka Python client with Oracle Cloud Infrastructure Streaming to publish and consume messages.

See Using Streaming with Apache Kafka for more information. Refer to the Overview of Streaming for key concepts and more Streaming details.

Prerequisites

  1. To use the Kafka Python client with Streaming, you must have the following:

    • An Oracle Cloud Infrastructure account.
    • A user created in that account, in a group with a policy that grants the required permissions. For an example of how to set up a new user, group, compartment, and policy, see Adding Users. For a list of typical policies you may want to use, see Common Policies.
  2. Collect the following details:

    • Stream OCID
    • Messages endpoint
    • Stream pool OCID
    • Stream pool FQDN
    • Kafka connection settings:
      • Bootstrap servers
      • SASL connection strings
      • Security protocol

    See Listing Streams and Stream Pools for instructions on viewing stream details. Refer to Creating a Stream and Creating a Stream Pool if you do not have an existing stream. Streams correspond to a Kafka topic.

  3. Python 3.6 or later, with PIP installed and updated.
  4. Visual Code Studio (recommended) or any other integrated development environment (IDE).
  5. Install Confluent-Kafka packages for Python using the following command:

    pip install confluent-kafka
    Note

    You can install these packages globally, or within a virtualenv. The librdkafka package is used by the confluent-kafka package and embedded in wheels for the latest confluent-kafka release. For more details, refer to the Confluent Python client documentation.
  6. Install the SSL CA root certificates on the host where you are developing and running this quickstart. The client uses CA certificates to verify the broker's certificate.

    For Windows, download the cacert.pem file distributed with curl (download cacert.pm). For other platforms, refer to Configure SSL trust store.

  7. Authentication with the Kafka protocol uses auth tokens and the SASL/PLAIN mechanism. Refer to Working with Auth Tokens for auth token generation. If you created the stream and stream pool in OCI, you are already authorized to use this stream according to OCI IAM, so you should create auth tokens for your OCI user.

    Note

    OCI user auth tokens are visible only at the time of creation. Copy it and keep it somewhere safe for future use.

Producing Messages

  1. Open your favorite editor, such as Visual Studio Code, from the empty working directory wd. You should already have confluent-kafka packages for Python installed for your current Python environment after you've met the prerequisites.
  2. Create a file named Producer.py in the wd directory with following code. Replace the config values in the map conf and the name of topic is the name of stream you created.

    from confluent_kafka import Producer, KafkaError  
      
    if __name__ == '__main__':  
      
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
      
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
      
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
           else:  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
    
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
          producer.poll(0)  
    
      producer.flush()  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. From the wd directory, run the following command:

    python Producer.py
  4. Use the Console to see the latest messages sent to the stream to verify that production was successful.

Consuming Messages

  1. First, ensure that the stream you want to consume messages from contains messages. You could use the Console to produce a test message, or use the stream and messages we created in this quickstart.
  2. Open your favorite editor, such as Visual Studio Code, from the empty working directory wd. You should already have confluent-kafka packages for Python installed for your current Python environment after you've met the prerequisites.
  3. Create a file named Consumer.py in the wd directory with following code. Replace the config values in the map conf and the name of topic is the name of stream you created.

    from confluent_kafka import Consumer
    
    
    if __name__ == '__main__':
    
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
      
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
      
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       }  
    
        # Create Consumer instance
        consumer = Consumer(conf)
    
        # Subscribe to topic
        consumer.subscribe([topic])
    
        # Process messages
        try:
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                    continue
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                else:
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            pass
        finally:
            print("Leave group and commit final offsets")
            consumer.close()
  4. From the wd directory, run the following command:

    python Consumer.py
  5. You should see messages similar to the following:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
    Note

    If you used the Console to produce a test message, the key for each message is Null