Kafka Javaクライアントおよびストリーミング・クイックスタート

このクイックスタートでは、Oracle Cloud Infrastructure StreamingでKafka Javaクライアントを使用して、メッセージを公開および消費する方法を示します。

詳細は、Apache Kafkaでのストリーミングの使用を参照してください。主要概念およびストリーミングの詳細は、ストリーミングの概要を参照してください。

前提条件

  1. ストリーミングでKafka Javaクライアントを使用するには、次が必要です:

    • Oracle Cloud Infrastructureアカウント。
    • そのアカウントで作成され、必要な権限を付与するポリシーがあるグループに含まれるユーザー。新しいユーザー、グループ、コンパートメントおよびポリシーの設定方法の例は、ユーザーの追加を参照してください。使用する一般的なポリシーのリストは、共通ポリシーを参照してください。
  2. 次の詳細を収集します:

    • ストリームOCID
    • メッセージ・エンドポイント
    • ストリーム・プールOCID
    • ストリーム・プールFQDN
    • Kafka接続設定:
      • ブートストラップ・サーバー
      • SASL接続文字列
      • セキュリティ・プロトコル

    ストリームの詳細を表示する手順は、ストリームおよびストリーム・プールのリストを参照してください。既存のストリームがない場合は、ストリームの作成およびストリーム・プールの作成を参照してください。ストリームは、Kafkaトピックに対応します。

  3. JDK 8以上がインストール済。JavaがPATH内にあることを確認します。
  4. Maven 3.0がインストール済。MavenがPATH内にあることを確認します。
  5. Intellij (推奨)またはその他の統合開発環境(IDE)。
  6. Kafka Java SDKの最新バージョンのMaven依存性またはjarをpom.xmlに次のように追加します:

    	<dependency>
    		<groupId>org.apache.kafka</groupId>
    		<artifactId>kafka-clients</artifactId>
    		<version>2.8.0</version>
    	</dependency>
    
  7. この例のJavaプロジェクトの作業ディレクトリとしてwdを使用すると、pom.xmlは次のようになります:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>oci.example</groupId>
        <artifactId>StreamsExampleWithKafkaApis</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>2.8.0</version>
    		</dependency>
        </dependencies>
    </project>
    
  8. Kafkaプロトコルを使用した認証では、認証トークンとSASL/PLAINメカニズムが使用されます。認証トークンの生成については、認証トークンの作業を参照してください。OCIでストリームおよびストリーム・プールを作成した場合は、OCI IAMに従ってこのストリームを使用する権限がすでに付与されているため、OCIユーザーの認証トークンを作成する必要があります。

    ノート

    OCIユーザーの認証トークンは、作成時にのみ表示されます。それをコピーして、将来の使用に備えて安全な場所に保管してください。

メッセージの生成

  1. wdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、Maven Javaプロジェクトのpom.xmlの一部として、JavaのKafka SDK依存性がすでに存在する必要があります。
  2. 次のコードを使用して、パス/src/main/java/kafka/sdk/oss/example/以下のディレクトリwdProducer.javaという名前の新しいファイルを作成します。コード・コメントの指示に従って、コード内の変数の値を置き換えます(bootstrapServersからstreamOrKafkaTopicNameまで)。これらの変数は、前提条件で収集したKafka接続設定用のものです。

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class Producer {
    
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
    
        private static Properties getKafkaProperties() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);
            properties.put("security.protocol", "SASL_SSL");
            properties.put("sasl.mechanism", "PLAIN");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            properties.put("sasl.jaas.config", value);
            properties.put("retries", 3); // retries on transient errors and load balancing disconnection
            properties.put("max.request.size", 1024 * 1024); // limit request size to 1MB
            return properties;
        }
    
        public static void main(String args[]) {
            try {
                Properties properties = getKafkaProperties();
                KafkaProducer producer = new KafkaProducer<>(properties);
    
                for(int i=0;i<10;i++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(streamOrKafkaTopicName, "messageKey" + i, "messageValue" + i);
                    producer.send(record, (md, ex) -> {
                        if (ex != null) {
                            System.err.println("exception occurred in producer for review :" + record.value()
                                    + ", exception is " + ex);
                            ex.printStackTrace();
                        } else {
                            System.err.println("Sent msg to " + md.partition() + " with offset " + md.offset() + " at " + md.timestamp());
                        }
                    });
                }
                // producer.send() is async, to make sure all messages are sent we use producer.flush()
                producer.flush();
                producer.close();
            } catch (Exception e) {
                System.err.println("Error: exception " + e);
            }
        }
    }
    
  3. wdディレクトリから、次のコマンドを実行します:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Producer
  4. コンソールを使用して、ストリームに送信された最新のメッセージを表示し、生成が成功したことを確認します。

メッセージの消費

  1. 最初に、メッセージを消費するストリームにメッセージが含まれていることを確認します。コンソールを使用してテスト・メッセージを生成するか、このクイックスタートで作成したストリームおよびメッセージを使用できます。
  2. パス/src/main/java/kafka/sdk/oss/example/以下のwdディレクトリから、Visual Studio Codeなどのお気に入りのエディタを開きます。前提条件を満たした後、Maven Javaプロジェクトのpom.xmlの一部として、JavaのKafka SDK依存性がすでに存在する必要があります。
  3. 次のコードを使用して、ディレクトリwdConsumer.javaという名前の新しいファイルを作成します。コード・コメントの指示に従って、コード内の変数の値を置き換えます(bootstrapServersからconsumerGroupNameまで)。これらの変数は、前提条件で収集したKafka接続設定用のものです。

    package kafka.sdk.oss.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class Consumer {
        static String bootstrapServers = "<bootstrap_servers_endpoint>", // usually of the form cell-1.streaming.<region>.oci.oraclecloud.com:9092 ;
        static String tenancyName = "<OCI_tenancy_name>";
        static String username = "<your_OCI_username>";
        static String streamPoolId = "<stream_pool_OCID>";
        static String authToken = "<your_OCI_user_auth_token>"; // from step 8 of Prerequisites section
        static String streamOrKafkaTopicName = "<topic_stream_name>"; // from step 2 of Prerequisites section
        static String consumerGroupName = "<consumer_group_name>"; 
    
        private static Properties getKafkaProperties(){
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrapServers);
            props.put("group.id", consumerGroupName);
            props.put("enable.auto.commit", "false");
            props.put("session.timeout.ms", "30000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("security.protocol", "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("auto.offset.reset", "earliest");
            final String value = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""
                    + tenancyName + "/"
                    + username + "/"
                    + streamPoolId + "\" "
                    + "password=\""
                    + authToken + "\";";
            props.put("sasl.jaas.config", value);
            return props;
        }
    
        public static void main(String[] args) {
            final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(getKafkaProperties());;
            consumer.subscribe(Collections.singletonList(streamOrKafkaTopicName));
            ConsumerRecords<Integer, String> records = consumer.poll(10000);
    
            System.out.println("size of records polled is "+ records.count());
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
            }
    
            consumer.commitSync();
            consumer.close();
        }
    }
    
    
  4. wdディレクトリから、次のコマンドを実行します:

    mvn clean install exec:java -Dexec.mainClass=kafka.sdk.oss.example.Consumer
    
  5. 次のようなメッセージが表示されます:

    [INFO related maven compiling and building the Java code]
    size of records polled is 3
    Received message: (messageKey0, message value) at offset 1284
    Received message: (messageKey0, message value) at offset 1285
    Received message: (null, message produced using oci console) at offset 1286
    ノート

    コンソールを使用してテスト・メッセージを生成した場合、各メッセージのキーはNullです