Kafkaクライアントの作成

Kafkaクライアントは、Kafkaクラスタと対話できるアプリケーションです。Kafkaトピックおよびコンシューマ・アプリケーションにデータを送信してKafkaトピックからメッセージを読み取るプロデューサ・アプリケーションを作成します。

Kafkaのインストール

作成するコンピュート・インスタンスごとに、Apache Kafkaクライアント・ライブラリおよびツールをインストールします。

  1. 作成したコンピュート・インスタンスに接続します。
    ssh -i <private-key-file> <username>@<public-ip-address>
  2. Apache KafkaにはJavaが必要です。コンピュート・インスタンスにJavaをインストールします(まだインストールされていない場合)。Javaがインストールされているかどうかを確認するには、versionコマンドを実行します。
    java -version
    sudo yum install java-11-openjdk -y
  3. インストールするApache Kafkaバージョンを、公式のApache Kafkaサーバーからダウンロードします。
    wget https://downloads.apache.org/kafka/<version>/kafka_<version>.tgz
  4. ダウンロードしたパッケージを抽出します。
    tar -xzf kafka_<version>.tgz

クライアントの構成

作成する各コンピュート・インスタンスで、クライアント・プロパティ・ファイルを構成します。

  1. 作成したコンピュート・インスタンスに接続します。
    ssh -i <private-key-file> <username>@<public-ip-address>
  2. インストールされているApache Kafkaクライアント・ライブラリの場所にあるconfigディレクトリにディレクトリを変更します。
    cd kafka_<version>/config
  3. client.propertiesという名前のファイルを作成します。
    nano client.properties
  4. Kafkaクラスタ用に構成した認証に応じて、client.propertiesファイルにセキュリティ・プロパティ設定を作成します。
    • SASL/SCRAM認証の場合
      security.protocol=SASL_SSL 
      sasl.mechanism=SCRAM-SHA-512 
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<vault-username>" password="<vault-secret>"
    • mTLS認証の場合
      security.protocol=SSL
      ssl.certificate.location=/leaf.cert
      ssl.key.location=/leaf.key
      ssl.keystore.password=<password>
      ssl.keystore.location=/kafka-keystore.p12

Kafka CLIの使用

クライアント・アプリケーションをインストールして構成した後、組込みのApache Kafka CLIツールを使用して、Apache KafkaとのストリーミングでKafkaクラスタを管理および操作します。

Apache Kafkaには、いくつかの組込みクライアントが含まれています。たとえば、Javaコンシューマ・クライアントとJavaプロデューサ・クライアントです。

また、Kafkaコミュニティは、使用可能なクライアントをさらに多数提供します

便利なクイックリファレンスを次に示します。

クライアントの構成の管理

Kafkaクライアントには、クライアントがメッセージの送受信方法を指定し、エラーを処理し、Kafkaクラスタへの接続を管理するための特定の構成が必要です。

Apache Kafkaは、/binディレクトリにコマンドライン・インタフェース(CLI)ツールを提供します。たとえば、インストールされているKafkaクライアント・ライブラリで使用可能なkafka-configs.shツールを使用して、クライアント構成を管理できます。

次に、一般的なCLIコマンドの例を示します。コマンドのブートストラップURLのクラスタの詳細の取得クライアントの構成時に作成したclient.propertiesファイルへのパスを指定します。

トピック構成の表示

トピック構成を表示するには、構成を表示するトピックの名前を指定してkafka-configs.shツールを実行します。

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--describe
すべてのトピックのパーティションおよびレプリケーション情報の表示

すべてのトピックのパーティションおよびレプリケーション情報を表示するには、kafka-configs.shツールを実行します。

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--describe | grep Factor | sort 
トピック構成の変更

保存やセグメントなどのトピック構成を変更するには、構成を変更するトピックの名前を指定してkafka-configs.shツールを実行します。

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--alter
--add-config retention.ms=1, segment.ms=60000
トピック構成の削除

保存やセグメントなどのトピック構成を削除するには、構成を削除するトピックの名前を指定してkafka-configs.shツールを実行します。

bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--alter
--delete-config retention.ms,segment.ms
各パーティションのディスク使用量の表示

各パーティションのディスク使用量を表示するには、kafka-log-dirs.shツールを実行し、出力ログ・ファイルへのパスを指定します。

bin/kafka-log-dirs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--describe | tail -1 > /tmp/logdirs.output.txt

次に、次のコマンドを実行してログ・ファイルをフィルタ処理し、その内容を表示します。特定のブローカをターゲットにするようにbrokers[0]を調整します。

cat /tmp/logdirs.output.txt
  | jq -r '.brokers[0] | .logDirs[0].partitions[] | .partition + " " + (.size|tostring)' 
  | sort 
  | awk 'BEGIN {sum=0} {sum+=$2} END {print sum}'
トレース・ログの有効化

ブローカに対してトレース・ログを有効にするには、次のプロパティを使用してファイルlog4j.propertiesを作成します。

log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n 

次に、次のコマンドを実行します。

export KAFKA_OPTS="-Dlog4j.configuration=file:/path/to/log4j.properties"

クライアント・メトリックの監視

クライアント・アプリケーションとKafkaクラスタを監視する必要があります。

OCIモニタリング・サービスを使用して、Kafkaブローカによって生成されるメトリックをモニターします。

クライアント側のメトリックについては、独自のカスタム・ダッシュボードを作成して、クライアント・アプリケーションを監視する必要があります。少なくとも、次のクライアント・メトリックを監視します。

  • record-send-rate
  • request-latency-avg
  • error-rate
  • records-consumed-rate
  • lag
  • fetch-latency-avg
  • Retries
  • disconnects