個々のコンシューマの使用
コンシューマ・グループを使用するかわりに、個々のコンシューマを使用してストリームからのメッセージを消費するように選択した場合、ストリーミングの多くの利点(サービス管理の調整、水平スケーリング、オフセット管理など)を利用できません。アプリケーションでは、これらのシナリオに加え、多くのことをプログラム的に処理する必要があります。
これらの理由から、本番環境ではコンシューマ・グループを使用することをお薦めしますが、テストまたは概念実証アプリケーションのために個々のコンシューマを使用すると役立つ場合があります。
カーソルの使用
カーソルとは、ストリーム内のロケーションを指すポインタです。ロケーションは、パーティション内の特定のオフセットまたは時間です。
メッセージの消費を開始する前に、消費を開始するポイントを指定する必要があります。これを行うには、CreateCursor APIを使用してカーソルを作成します。
サポートされているカーソル・タイプは5つあります:
TRIM_HORIZON
- ストリーム内で使用可能な最も古いメッセージから消費を開始します。ストリーム内のすべてのメッセージを消費する場合に、カーソルをTRIM_HORIZON
で作成します。AT_OFFSET
- 指定されたオフセットで消費を開始します。オフセットは、最も古いメッセージのオフセット以上かつ最新の公開済オフセット以下である必要があります。AFTER_OFFSET
- 指定されたオフセット後に使用を開始します。このカーソルには、AT_OFFSET
カーソルと同じ制限があります。AT_TIME
- 指定された時間から消費を開始します。戻されるメッセージのタイムスタンプは、指定された時間以降になります。LATEST
- カーソルの作成後に公開されたメッセージの消費を開始します。
個々のコンシューマに対してカーソルを作成する場合、カーソルが使用するストリーム内のパーティションを指定する必要があります。ストリームにメッセージを含むパーティションが複数ある場合は、複数のカーソルを作成してそれらを読み取る必要があります。
カーソルを作成すると、GetMessagesを使用してメッセージの消費を開始できます。
メッセージを消費し続けるかぎり、カーソルを再作成する必要はありません。そのため、ループの外側にカーソルを作成してメッセージを取得する必要があります。
メッセージの取得
カーソルを作成したら、GetMessagesをコールし、そのカーソルを指定してメッセージの消費を開始します。サービスは、メッセージおよび次のGetMessagesコールで使用する必要があるopc-next-cursor
ヘッダー値で応答します。返されるカーソルはnullではありませんが、5分後に失効します。5分を超えてメッセージの消費を停止する場合は、カーソルを再作成する必要があります。
同じパーティションから読み取るコンシューマが複数ある場合、それらは同じメッセージを受信します。アプリケーションは、メッセージの処理方法を決定する必要があります。
パーティションにそれ以上未読メッセージが存在しない場合、ストリーミングはメッセージの空のリストを返します。
GetMessagesのバッチ・サイズは、ストリームに公開された平均メッセージ・サイズに基づきます。デフォルトでは、サービスはできるだけ多くのメッセージを返します。limit
パラメータを使用して最大10,000までの値を指定できますが、ストリームのスループットを超えないように、平均メッセージ・サイズを考慮してください。
遅延
コンシューマが遅延している(消費を上回る速度で生成している)かどうかを判断するには、メッセージのタイムスタンプを使用します。コンシューマが遅延している場合は、最初のコンシューマから一部のパーティションを引き継ぐために、追加のコンシューマを生成することを検討してください。1つのパーティションで遅延している場合、リカバリする方法はありません。
次のオプションを検討してください:
- より多くのパーティションを持つ新しいストリームを作成します。
- コンシューマ・グループを使用します。
- 問題の原因がホットスポットである場合は、メッセージ・キー戦略を変更します。
- メッセージ処理時間を短縮するか、リクエストを並列処理します。
特定のパーティションで消費するために残されているメッセージの数を確認する場合、LATEST
タイプのカーソルを使用して、次に公開されるメッセージのオフセットを取得し、現在消費しているオフセットとの差分を取ります。
オフセットの管理
オフセットは、パーティション内のメッセージの場所を示します。コンシューマが再開する場合や、障害からリカバリする必要がある場合は、オフセットを使用してストリームからの読取りを再開できます。
個々のコンシューマを使用する場合は、コンシューマ・アプリケーションで処理されたオフセットを管理する必要があります。コンシューマは、パーティションごとに到達または停止したオフセットを格納する責任があります。コンシューマが再開したら、処理した最後のメッセージのオフセットを読み取り、AFTER_OFFSET
タイプのカーソルを作成して、その取得したオフセットを指定します。処理した最後のメッセージのオフセットを格納するためのガイダンスは提供していません。別のストリーム、マシン上のファイル、オブジェクト・ストレージなどの任意の方法を使用できます。
メッセージのオフセットは密ではありません。オフセットは単調に増加する数値です。それらは減少せず、場合によっては2つ以上増加します。たとえば、同じパーティションに2つのメッセージを公開した場合、最初のメッセージのオフセットは42で、2番目のメッセージのオフセットは45になることがあります(オフセット43と44は存在しません)。