Kafka Javaクライアントおよびストリーミング・クイックスタート
このクイックスタートでは、Oracle Cloud Infrastructure StreamingでKafka Javaクライアントを使用して、メッセージを公開および消費する方法を示します。
詳細は、Apache Kafkaでのストリーミングの使用を参照してください。主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。
前提条件
-
ストリーミングでKafka Javaクライアントを使用するには、次が必要です:
-
次の詳細を収集します:
- ストリームOCID
- メッセージ・エンドポイント
- ストリーム・プールOCID
- ストリーム・プールFQDN
- Kafka接続設定:
- ブートストラップ・サーバー
- SASL接続文字列
- セキュリティ・プロトコル
ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。ストリームは、Kafkaトピックに対応します。
- JDK 8以上がインストール済。JavaがPATH内にあることを確認します。
- Maven 3.0がインストール済。MavenがPATH内にあることを確認します。
- Intellij (推奨)またはその他の統合開発環境(IDE)。
-
Kafka Java SDKの最新バージョンのMaven依存性またはjarを
pom.xml
に次のように追加します:<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency>
-
この例のJavaプロジェクトの作業ディレクトリとして
wd
を使用すると、pom.xml
は次のようになります:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>oci.example</groupId> <artifactId>StreamsExampleWithKafkaApis</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </dependencies> </project>
-
Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。認証トークンの生成については、認証トークンの作業を参照してください。OCIでストリームおよびストリーム・プールを作成した場合は、OCI IAMに従ってこのストリームを使用する権限がすでに付与されているため、OCIユーザーの認証トークンを作成する必要があります。
ノート
OCIユーザーの認証トークンは、作成時にのみ表示されます。それをコピーして、将来の使用に備えて安全な場所に保管してください。
メッセージの生成
wd
ディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、Maven Javaプロジェクトのpom.xml
の一部として、JavaのKafka SDK依存性がすでに存在する必要があります。-
次のコードを使用して、パス
/src/main/java/kafka/sdk/oss/example/
以下のディレクトリwd
にProducer.java
という名前の新しいファイルを作成します。コード・コメントの指示に従って、コード内の変数の値を置き換えます(bootstrapServers
からstreamOrKafkaTopicName
まで)。これらの変数は、前提条件で収集したKafka接続設定用のものです。package kafka.sdk.oss.example; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class Producer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section private static Properties getKafkaProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); properties.put("security.protocol", "SASL_SSL"); properties.put("sasl.mechanism", "PLAIN"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; properties.put("sasl.jaas.config", value); properties.put("retries", 3); // retries on transient errors and load balancing disconnection properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB return properties; } public static void main(String args[]) { try { Properties properties = getKafkaProperties(); KafkaProducer producer = new KafkaProducer<>(properties); for(int i=0;i<10;i++) { ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i); producer.send(record, (md, ex) -> { if (ex != null) { System.err.println("exception occurred in producer for review :" + record.value() + ", exception is " + ex); ex.printStackTrace(); } else { System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp()); } }); } // producer.send() is async, to make sure all messages are sent we use producer.flush() producer.flush(); producer.close(); } catch (Exception e) { System.err.println("Error: exception " + e); } } }
-
wd
ディレクトリから、次のコマンドを実行します:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
- コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。
メッセージの消費
- 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
- パス
/src/main/java/kafka/sdk/oss/example/
以下のwd
ディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、Maven Javaプロジェクトのpom.xml
の一部として、JavaのKafka SDK依存性がすでに存在する必要があります。 -
次のコードを使用して、ディレクトリ
wd
にConsumer.java
という名前の新しいファイルを作成します。コード・コメントの指示に従って、コード内の変数の値を置き換えます(bootstrapServers
からconsumerGroupName
まで)。これらの変数は、前提条件で収集したKafka接続設定用のものです。package kafka.sdk.oss.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ; static String tenancyName = "<OCI_tenancy_name>"; static String username = "<your_OCI_username>"; static String streamPoolId = "<stream_pool_OCID>"; static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section static String consumerGroupName = "<consumer_group_name>"; private static Properties getKafkaProperties(){ Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", consumerGroupName); props.put("enable.auto.commit", "false"); props.put("session.timeout.ms", "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put("security.protocol", "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("auto.offset.reset", "earliest"); final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"" + tenancyName + "/" + username + "/" + streamPoolId + "\" " + "password=\"" + authToken + "\";"; props.put("sasl.jaas.config", value); return props; } public static void main(String[] args) { final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());; consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName)); ConsumerRecords<Integer, String> records = consumer.poll(10000); System.out.println("size of records polled is "+ records.count()); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); } consumer.commitSync(); consumer.close(); } }
-
wd
ディレクトリから、次のコマンドを実行します:mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
-
次のようなメッセージが表示されます:
[INFO related maven compiling and building the Java code] size of records polled is 3 Received message: (messageKey0, message value) at offset 1284 Received message: (messageKey0, message value) at offset 1285 Received message: (null, message produced using oci console) at offset 1286
次のステップ
詳細は、次のリソースを参照してください: