Kafka Pythonクライアントおよびストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure StreamingでKafka Pythonクライアントを使用して、メッセージを公開および消費する方法を示します。

詳細は、Apache Kafkaでのストリーミングの使用を参照してください。主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。


  1. ストリーミングでKafka Pythonクライアントを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
  2. 次の詳細を収集します:

    • ストリームOCID
    • メッセージ・エンドポイント
    • ストリーム・プールOCID
    • ストリーム・プールFQDN
    • Kafka接続設定:
      • ブートストラップ・サーバー
      • SASL接続文字列
      • セキュリティ・プロトコル


  3. Python 3.6以降(PIPがインストールおよび更新済)。
  4. Visual Code Studio (推奨)またはその他の統合開発環境(IDE)。
  5. 次のコマンドを使用して、Python用のConfluent-Kafkaパッケージをインストールします:

    pip install confluent-kafka

    これらのパッケージは、グローバルに、またはvirtualenv内にインストールできます。librdkafkaパッケージは、confluent-kafkaパッケージによって使用され、最新のconfluent-kafkaリリースのwheelに埋め込まれています。詳細は、Confluent Pythonクライアントのドキュメントを参照してください。
  6. このクイックスタートを開発および実行しているホストにSSL CAルート証明書をインストールします。クライアントは、CA証明書を使用してブローカの証明書を検証します。


  7. Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。認証トークンの生成については、認証トークンの作業を参照してください。OCIでストリームおよびストリーム・プールを作成した場合は、OCI IAMに従ってこのストリームを使用する権限がすでに付与されているため、OCIユーザーの認証トークンを作成する必要があります。




  1. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のconfluent-kafkaパッケージがすでにインストールされている必要があります。
  2. 次のコードを使用して、wdディレクトリにProducer.pyという名前のファイルを作成します。マップconfの構成値を置き換えてください。トピックの名前は、作成したストリームの名前です。

    from confluent_kafka import Producer, KafkaError  
    if __name__ == '__main__':  
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
       # Create Producer instance  
       producer = Producer(**conf)  
       delivered_records = 0  
       # Optional per-message on_delivery handler (triggered by poll() or flush())  
       # when a message has been successfully delivered or permanently failed delivery after retries.  
       def acked(err, msg):  
           global delivered_records  
           """Delivery report handler called on  
               successful or failed delivery of message """  
           if err is not None:  
               print("Failed to deliver message: {}".format(err))  
               delivered_records += 1  
               print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset()))  
      for n in range(10):  
          record_key = "messageKey" + str(n)  
          record_value = "messageValue" + str(n)  
          print("Producing record: {}\t{}".format(record_key, record_value))  
          producer.produce(topic, key=record_key, value=record_value, on_delivery=acked)  
          # p.poll() serves delivery reports (on_delivery) from previous produce() calls.  
      print("{} messages were produced to topic {}!".format(delivered_records, topic))
  3. wdディレクトリから、次のコマンドを実行します:

    python Producer.py
  4. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。


  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. 空の作業ディレクトリwdから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のconfluent-kafkaパッケージがすでにインストールされている必要があります。
  3. 次のコードを使用して、wdディレクトリにConsumer.pyという名前のファイルを作成します。マップconfの構成値を置き換えてください。トピックの名前は、作成したストリームの名前です。

    from confluent_kafka import Consumer
    if __name__ == '__main__':
      topic = "<topic_stream_name>"  
      conf = {  
        'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092  
        'security.protocol': 'SASL_SSL',  
        'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>',  # from step 6 of Prerequisites section
         # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and
         # 3. 'ssl.ca.location': certifi.where()
        'sasl.mechanism': 'PLAIN',
        'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>',  # from step 2 of Prerequisites section
        'sasl.password': '<your_OCI_user_auth_token>',  # from step 7 of Prerequisites section
        # Create Consumer instance
        consumer = Consumer(conf)
        # Subscribe to topic
        # Process messages
            while True:
                msg = consumer.poll(1.0)
                if msg is None:
                    # No message available within timeout.
                    # Initial message consumption may take up to
                    # `session.timeout.ms` for the consumer group to
                    # rebalance and start consuming
                    print("Waiting for message or event/error in poll()")
                elif msg.error():
                    print('error: {}'.format(msg.error()))
                    # Check for Kafka message
                    record_key = "Null" if msg.key() is None else msg.key().decode('utf-8')
                    record_value = msg.value().decode('utf-8')
                    print("Consumed record with key "+ record_key + " and value " + record_value)
        except KeyboardInterrupt:
            print("Leave group and commit final offsets")
  4. wdディレクトリから、次のコマンドを実行します:

    python Consumer.py
  5. 次のようなメッセージが表示されます:

    Waiting for message or event/error in poll()
    Waiting for message or event/error in poll()
    Consumed record with key messageKey0 and value messageValue0
    Consumed record with key messageKey1 and value messageValue1
    Consumed record with key Null and value Example test message
