Kafkaクライアントの作成
Kafkaクライアントは、Kafkaクラスタと対話できるアプリケーションです。Kafkaトピックおよびコンシューマ・アプリケーションにデータを送信してKafkaトピックからメッセージを読み取るプロデューサ・アプリケーションを作成します。
開始する前に
- Apache Kafkaを使用したOCIストリーミングでKafkaクラスタを作成したことを確認します。
- OCIテナンシにコンピュート・インスタンスを作成します。
- コンピュート・インスタンスに接続できることを確認します。
Kafkaのインストール
作成するコンピュート・インスタンスごとに、Apache Kafkaクライアント・ライブラリおよびツールをインストールします。
クライアントの構成
作成する各コンピュート・インスタンスで、クライアント・プロパティ・ファイルを構成します。
Kafka CLIの使用
クライアント・アプリケーションをインストールして構成した後、組込みのApache Kafka CLIツールを使用して、Apache KafkaとのストリーミングでKafkaクラスタを管理および操作します。
Apache Kafkaには、いくつかの組込みクライアントが含まれています。たとえば、Javaコンシューマ・クライアントとJavaプロデューサ・クライアントです。
また、Kafkaコミュニティは、使用可能なクライアントをさらに多数提供します。
便利なクイックリファレンスを次に示します。
- Apache Kafka クイックスタート
- トピックの追加と削除
- トピックを更新しています
クライアントの構成の管理
Kafkaクライアントには、クライアントがメッセージの送受信方法を指定し、エラーを処理し、Kafkaクラスタへの接続を管理するための特定の構成が必要です。
Apache Kafkaは、/bin
ディレクトリにコマンドライン・インタフェース(CLI)ツールを提供します。たとえば、インストールされているKafkaクライアント・ライブラリで使用可能なkafka-configs.sh
ツールを使用して、クライアント構成を管理できます。
次に、一般的なCLIコマンドの例を示します。コマンドのブートストラップURLのクラスタの詳細の取得。クライアントの構成時に作成したclient.properties
ファイルへのパスを指定します。
トピック構成を表示するには、構成を表示するトピックの名前を指定してkafka-configs.sh
ツールを実行します。
bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--describe
すべてのトピックのパーティションおよびレプリケーション情報を表示するには、kafka-configs.sh
ツールを実行します。
bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--describe | grep Factor | sort
保存やセグメントなどのトピック構成を変更するには、構成を変更するトピックの名前を指定してkafka-configs.sh
ツールを実行します。
bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--alter
--add-config retention.ms=1, segment.ms=60000
保存やセグメントなどのトピック構成を削除するには、構成を削除するトピックの名前を指定してkafka-configs.sh
ツールを実行します。
bin/kafka-configs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--entity-type topics
--entity-name $TOPIC_NAME
--alter
--delete-config retention.ms,segment.ms
各パーティションのディスク使用量を表示するには、kafka-log-dirs.sh
ツールを実行し、出力ログ・ファイルへのパスを指定します。
bin/kafka-log-dirs.sh
--bootstrap-server $URL
--command-config /path/to/client.properties
--describe | tail -1 > /tmp/logdirs.output.txt
次に、次のコマンドを実行してログ・ファイルをフィルタ処理し、その内容を表示します。特定のブローカをターゲットにするようにbrokers[0]
を調整します。
cat /tmp/logdirs.output.txt
| jq -r '.brokers[0] | .logDirs[0].partitions[] | .partition + " " + (.size|tostring)'
| sort
| awk 'BEGIN {sum=0} {sum+=$2} END {print sum}'
ブローカに対してトレース・ログを有効にするには、次のプロパティを使用してファイルlog4j.properties
を作成します。
log4j.rootLogger=TRACE, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
次に、次のコマンドを実行します。
export KAFKA_OPTS="-Dlog4j.configuration=file:/path/to/log4j.properties"
クライアント・メトリックの監視
クライアント・アプリケーションとKafkaクラスタを監視する必要があります。
OCIモニタリング・サービスを使用して、Kafkaブローカによって生成されるメトリックをモニターします。
クライアント側のメトリックについては、独自のカスタム・ダッシュボードを作成して、クライアント・アプリケーションを監視する必要があります。少なくとも、次のクライアント・メトリックを監視します。
record-send-rate
request-latency-avg
error-rate
records-consumed-rate
lag
fetch-latency-avg
Retries
disconnects