コンシューマ・グループの使用
コンシューマは、グループの一部としてメッセージを消費するように構成できます。複数のパーティションがある本番環境では、ストリーミング・メッセージを消費する方法として、コンシューマ・グループを使用することをお薦めします。
各ストリーム・パーティションは、コンシューマ・グループのメンバーに割り当てられます。コンシューマ・グループの個々のメンバーは、インスタンスと呼ばれます。パーティションよりもインスタンスの数が多い場合を除き、コンシューマ・グループ内の各インスタンスは、1つ以上のパーティションからメッセージを受信します。ストリームのパーティション数を超えているインスタンスは、メッセージを受信しません。
コンシューマ・グループは、複数のコンシューマがストリームの消費を共有するために必要な調整を処理します。コンシューマ・グループは自動的に次を行います:
- 1つ以上のパーティションをインスタンスに割り当てます
- グループが受信したメッセージをトラッキングし、コミットを管理します
- 各インスタンスのかわりに適切なパーティションおよびオフセットをリクエストします
- インスタンスの参加または退出に応じてグループをバランシングします
単一のストリームから最大50個のコンシューマ・グループが読み取ることができます。各コンシューマ・グループは、ストリーム内のすべてのメッセージを少なくとも1回受信します。
コンシューマ・グループは一時的です。それらは、ストリームの保持期間に使用されないと消失します。
コンシューマ・グループの作成
コンシューマ・グループは、最初のCreateGroupCursorリクエストで作成されます。グループ・カーソルは、グループ名/インスタンス名のペアを定義します。グループ・カーソルを作成する場合、ストリームのID、グループ名、インスタンス名、およびサポートされている次のカーソル・タイプのいずれかを指定する必要があります:
TRIM_HORIZON
- グループは、ストリーム内で使用可能な最も古いメッセージから消費を開始します。AT_TIME
- グループは、指定された時間から消費を開始します。戻されるメッセージのタイムスタンプは、指定された時間以降になります。LATEST
- グループは、カーソルの作成後に公開されたメッセージの消費を開始します。
グループ・カーソル・タイプは、既存のグループの名前を含むCreateGroupCursorコールでは無視されます。指定されたカーソル・タイプのかわりに、そのグループのコミット済オフセットが使用されます。
ストリーミングでは、オフセットを管理するときにインスタンス名を使用してグループのメンバーを識別します。コンシューマ・グループのインスタンスごとに一意のインスタンス名を使用します。
ストリーミング・サービスでオフセットのコミットを処理する場合は、グループ・カーソルのcommitOnGet
値をtrue
に設定したままにしておく必要があります。アプリケーションでコミットを処理する必要がないため、この方法を使用してアプリケーションの複雑さを軽減することをお薦めします。
グループとしての消費
インスタンスがコンシューマ・グループに参加した後、GetMessagesを使用してストリームからメッセージを読み取ることができます。GetMessagesへの各コールでは、次のGetMessagesコールでopc-next-cursor
ヘッダー値として使用するカーソルが返されます。返されるカーソルはnullではありませんが、5分後に失効します。消費し続けるかぎりは、カーソルを再作成する必要はありません。
ストリーミングがインスタンスからメッセージのリクエストを受信すると、サービスは:
- グループのリバランスが必要かどうかをチェックします
- そのインスタンスの前のリクエスト(存在する場合)からオフセットをコミットします
- リクエストのカーソルで定義されているメッセージで応答します
GetMessagesのバッチ・サイズは、ストリームに公開された平均メッセージ・サイズに基づきます。デフォルトでは、サービスはできるだけ多くのメッセージを返します。limit
パラメータを使用して最大10,000までの値を指定できますが、ストリームのスループットまたはタイムアウトを超えないように、平均メッセージ・サイズを考慮してください。
パーティションにそれ以上未読メッセージが存在しない場合、ストリーミングはメッセージの空のリストを返します。
コンシューマ・グループは30秒を超えてメッセージの消費を停止しているインスタンスを削除するため、タイムアウトを回避するためにリクエストするメッセージを少なくするか、ConsumerHeartbeatを使用してタイムアウトを延長する必要があります。
同じコンシューマ・グループ内の複数のインスタンスに1つのパーティションを割り当てることはできません。パーティションより多くのインスタンスがある場合、未割当てのインスタンスはGetMessagesリクエストを送信できますが、メッセージを受信することはできません。それ以外の場合、これらは、コンシューマ・グループがインスタンスを置き換える必要が発生するまで(グループの既存のメンバーがタイムアウト期間内に動作しない場合など)、アイドル状態のままです。
グループの位置を手動で更新する必要がある場合は、UpdateGroupを使用して、グループ内のすべてのコンシューマの場所をストリーム内の指定された場所にリセットできます。
オフセットとコミット
オフセットは、パーティション内のメッセージの場所を示します。コンシューマが再開する場合や、障害からリカバリする必要がある場合は、オフセットを使用してストリームからの読取りを再開できます。
コンシューマ・グループを使用すると、ストリーミングによってオフセットが自動的に処理されます。commitOnGet=true
のデフォルト動作は、前のリクエストからのオフセットがコミットされることを意味します。例:
コンシューマAの場合:
- Aは、GetMessagesをコールして、オフセットが1–100の任意のパーティションからメッセージを受信します。
- Aは、100個のすべてのメッセージを正常に処理します。
- AはGetMessagesをコールして、ストリーミング・サービスはオフセット100をコミットし、オフセットが101–200のメッセージを返します。
- Aは、15個のメッセージを処理し、予期せず(30秒を超えて)オフラインになります。
新しいコンシューマB:
- BはGetMessagesをコールして、ストリーミング・サービスは最新のコミット済オフセットを使用し、オフセットが101–200のメッセージを返します。
- Bは、メッセージ・ループを続行します。
この例では、メッセージの一部(15個)が1回以上処理されました。これは、それらが複数回処理された可能性があるが、データは失われていないことを意味します。
ストリーミングでは、コンシューマ・グループに"at-least-once"セマンティクスが提供されます。メッセージ・ループでオフセットがコミットされるタイミングを考慮してください。メッセージのバッチをコミットする前にコンシューマがオフラインになった場合、そのバッチは別のコンシューマに渡される可能性があります。パーティションが別のコンシューマに渡されると、そのコンシューマは最新のコミット済オフセットを使用して消費を開始します。コンシューマは、コミット済オフセットより前のメッセージを取得しません。コンシューマ・アプリケーションで重複に対応することをお薦めします。
メッセージのオフセットは密ではありません。オフセットは単調に増加する数値です。それらは減少せず、場合によっては2つ以上増加します。たとえば、同じパーティションに2つのメッセージを公開した場合、最初のメッセージのオフセットは42で、2番目のメッセージのオフセットは45になることがあります(オフセット43と44は存在しません)。
デフォルトのオフセット動作をオーバーライドし、カスタムのオフセット・コミット・メカニズムを実装する場合は、グループ・カーソルの作成時にcommitOnGet
をfalse
に設定します。ConsumerCommitを使用すると、追加のメッセージを読み取ることなくメッセージをコミットできます。ConsumerCommitでは、次のリクエストで使用するカーソルが返されます。
カスタム・コミット・ロジックの記述は複雑で、競合条件および考慮事項が大量に含まれます。内部の状態が変更されるケースが多く存在し、その状況はクライアントが処理する必要があります。
バランシングとリバランシング
ストリーミングでは、バランスの評価時に、ストリーム内のパーティション数とコンシューマ・グループ内のインスタンス数が考慮されます。グループ・バランシングは自動です。各コンシューマは、次の計算に基づいて1つ以上のパーティションに割り当てられます:
(nパーティション / nコンシューマ) ± 1
たとえば、ストリームに8個のパーティションがあり、グループに4個のコンシューマがある場合、各コンシューマは2個のパーティションに割り当てられます。ストリームに10個のパーティションがあり、グループに4個のコンシューマがある場合、2個のコンシューマが2個のパーティションに割り当てられ、2個のコンシューマが3個のパーティションに割り当てられます。
インスタンスがコンシューマ・グループに参加するか退出し、メッセージに対するリクエストが行われると、パーティション割当てが再評価されます。ストリームにグループ内の現在のインスタンス数より多い1つ以上のパーティションが含まれており、新しいインスタンスが参加した場合、パーティションは、新しいインスタンスを含むすべてのインスタンスに再割当てされます。グループ内のインスタンスが30秒を超えてメッセージの消費を停止した場合、または30秒以内にConsumerHeartbeatの送信に失敗した場合、そのインスタンスはコンシューマ・グループから削除され、可能であればそのパーティションは別のインスタンスに再割当てされます。
これらのイベントは、リバランシングと呼ばれます。グループ内のインスタンスはリバランシング・プロセスを認識しませんが、グループによって、ストリーム内のパーティションの相互排他的なセットを所有するように調整されています。
コンシューマ・グループのリバランス操作が正常に終了すると、ストリーム内のすべてのパーティションがグループ内のインスタンスによって所有されます。
このように、各インスタンスがただ1つのパーティションからのメッセージを消費するまで、インスタンスの数をパーティションの数までスケーリングできます。この構成により、ストリームの使用可能なスループットが最大化されます。その時点以降、グループに参加する新しいインスタンスは、どのパーティションにも割り当てられることなくアイドル状態のままになります。