Kafka Pythonクライアントおよびストリーミング・クイックスタート
このクイックスタートでは、Oracle Cloud Infrastructure StreamingでKafka Pythonクライアントを使用して、メッセージを公開および消費する方法を示します。
詳細は、Apache Kafkaでのストリーミングの使用を参照してください。主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。
前提条件
-
ストリーミングでKafka Pythonクライアントを使用するには、次が必要です:
-
次の詳細を収集します:
- ストリームOCID
- メッセージ・エンドポイント
- ストリーム・プールOCID
- ストリーム・プールFQDN
- Kafka接続設定:
- ブートストラップ・サーバー
- SASL接続文字列
- セキュリティ・プロトコル
ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。ストリームは、Kafkaトピックに対応します。
- Python 3.6以降(PIPがインストールおよび更新済)。
- Visual Code Studio (推奨)またはその他の統合開発環境(IDE)。
-
次のコマンドを使用して、Python用の
Confluent-Kafka
パッケージをインストールします:pip install confluent-kafka
ノート
これらのパッケージは、グローバルに、またはvirtualenv内にインストールできます。librdkafka
パッケージは、confluent-kafka
パッケージによって使用され、最新のconfluent-kafka
リリースのwheelに埋め込まれています。詳細は、Confluent Pythonクライアントのドキュメントを参照してください。 -
このクイックスタートを開発および実行しているホストにSSL CAルート証明書をインストールします。クライアントは、CA証明書を使用してブローカの証明書を検証します。
Windowsの場合、curlとともに配布された
cacert.pem
ファイルをダウンロードします(cacert.pmのダウンロード)。他のプラットフォームの場合、SSLトラスト・ストアの構成を参照してください。 -
Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。認証トークンの生成については、認証トークンの作業を参照してください。OCIでストリームおよびストリーム・プールを作成した場合は、OCI IAMに従ってこのストリームを使用する権限がすでに付与されているため、OCIユーザーの認証トークンを作成する必要があります。
ノート
OCIユーザーの認証トークンは、作成時にのみ表示されます。それをコピーして、将来の使用に備えて安全な場所に保管してください。
メッセージの生成
- 空の作業ディレクトリ
wd
から、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のconfluent-kafka
パッケージがすでにインストールされている必要があります。 -
次のコードを使用して、
wd
ディレクトリにProducer.py
という名前のファイルを作成します。マップconf
の構成値を置き換えてください。トピックの名前は、作成したストリームの名前です。from confluent_kafka import Producer, KafkaError if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Producer instance producer = Producer(**conf) delivered_records = 0 # Optional per-message on_delivery handler (triggered by poll() or flush()) # when a message has been successfully delivered or permanently failed delivery after retries. def acked(err, msg): global delivered_records """Delivery report handler called on successful or failed delivery of message """ if err is not None: print("Failed to deliver message: {}".format(err)) else: delivered_records += 1 print("Produced record to topic {} partition [{}] @ offset {}".format(msg.topic(), msg.partition(), msg.offset())) for n in range(10): record_key = "messageKey" + str(n) record_value = "messageValue" + str(n) print("Producing record: {}\t{}".format(record_key, record_value)) producer.produce(topic, key=record_key, value=record_value, on_delivery=acked) # p.poll() serves delivery reports (on_delivery) from previous produce() calls. producer.poll(0) producer.flush() print("{} messages were produced to topic {}!".format(delivered_records, topic))
-
wd
ディレクトリから、次のコマンドを実行します:python Producer.py
- コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。
メッセージの消費
- 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
- 空の作業ディレクトリ
wd
から、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、現在のPython環境にPython用のconfluent-kafka
パッケージがすでにインストールされている必要があります。 -
次のコードを使用して、
wd
ディレクトリにConsumer.py
という名前のファイルを作成します。マップconf
の構成値を置き換えてください。トピックの名前は、作成したストリームの名前です。from confluent_kafka import Consumer if __name__ == '__main__': topic = "<topic_stream_name>" conf = { 'bootstrap.servers': "<bootstrap_servers_endpoint>", #usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 'security.protocol': 'SASL_SSL', 'ssl.ca.location': '</path/on/your/host/to/your/cert.pem/>', # from step 6 of Prerequisites section # optionally instead of giving path as shown above, you can do 1. pip install certifi 2. import certifi and # 3. 'ssl.ca.location': certifi.where() 'sasl.mechanism': 'PLAIN', 'sasl.username': '<OCI_tenancy_name>/<your_OCI_username>/<stream_pool_OCID>', # from step 2 of Prerequisites section 'sasl.password': '<your_OCI_user_auth_token>', # from step 7 of Prerequisites section } # Create Consumer instance consumer = Consumer(conf) # Subscribe to topic consumer.subscribe([topic]) # Process messages try: while True: msg = consumer.poll(1.0) if msg is None: # No message available within timeout. # Initial message consumption may take up to # `session.timeout.ms` for the consumer group to # rebalance and start consuming print("Waiting for message or event/error in poll()") continue elif msg.error(): print('error: {}'.format(msg.error())) else: # Check for Kafka message record_key = "Null" if msg.key() is None else msg.key().decode('utf-8') record_value = msg.value().decode('utf-8') print("Consumed record with key "+ record_key + " and value " + record_value) except KeyboardInterrupt: pass finally: print("Leave group and commit final offsets") consumer.close()
-
wd
ディレクトリから、次のコマンドを実行します:python Consumer.py
-
次のようなメッセージが表示されます:
Waiting for message or event/error in poll() Waiting for message or event/error in poll() Consumed record with key messageKey0 and value messageValue0 Consumed record with key messageKey1 and value messageValue1 Consumed record with key Null and value Example test message
次のステップ
詳細は、次のリソースを参照してください: