Consumir mensajes de un flujo mediante grupos de consumidores.
Se pueden configurar consumidores para que consuman mensajes como parte de un grupo. En un entorno de producción con varias particiones, el uso de un grupo de consumidores es nuestro método recomendado para consumir mensajes de Streaming.
Cada partición de flujo se asigna a un miembro de un grupo de consumidores. Un individuo miembro de un grupo de consumidores se denomina una instancia. Cada instancia de un grupo de consumidores recibe mensajes de una o más particiones, a menos que haya más instancias que particiones. Las instancias que superan el recuento de particiones para el flujo no reciben mensajes.
Los grupos de consumidores gestionan la coordinación necesaria para que varios consumidores compartan el consumo de un flujo. Un grupo de consumidores automáticamente:
Asigna una o más particiones a una instancia
Realiza un seguimiento de los mensajes recibidos por el grupo y gestiona las confirmaciones
Solicita las particiones y compensaciones adecuadas en nombre de cada instancia
Equilibra el grupo a medida que se unen o abandonan instancias
Hasta 50 grupos de consumidores pueden leer de un único flujo. Cada grupo de consumidores recibe todos los mensajes del flujo al menos una vez.
Los grupos de consumidores son efímeros. Desaparecen cuando no se utilizan durante el período de retención del flujo.
Creación de un grupo de consumidores
Se crea un grupo de consumidores en la primera solicitud CreateGroupCursor (consulte Creación de un Cursor de Grupo). Los cursores de grupo definen un par nombre de grupo/nombre de instancia. Al crear el cursor de grupo, proporcione el identificador de flujo, un nombre de grupo, un nombre de instancia y uno de los siguientes tipos de cursor soportados:
TRIM_HORIZON: el grupo comienza a consumir desde el mensaje más antiguo disponible del flujo.
AT_TIME: el grupo empieza a consumir desde una hora específica. El registro de hora del mensaje devuelto es igual o posterior a la hora proporcionada.
LATEST: el grupo comienza a consumir mensajes publicados después de crear el cursor.
Los tipos de cursor de grupo se ignoran en las llamadas a CreateGroupCursor que incluyen el nombre de un grupo existente. Los desplazamientos confirmados de ese grupo se utilizan en lugar del tipo de cursor proporcionado.
Streaming utiliza el nombre de instancia para identificar miembros del grupo al gestionar desplazamientos. Utilice nombres de instancia únicos para cada instancia del grupo de consumidores.
Si desea que el servicio Streaming gestione la confirmación de desplazamientos, deje el valor commitOnGet del cursor de grupo definido en true. Recomendamos utilizar este método para reducir la complejidad de la aplicación, de modo que su aplicación no tenga que gestionar las confirmaciones.
Consumo como grupo 🔗
Una vez que las instancias se unan al grupo de consumidores, estos pueden leer los mensajes del flujo mediante GetMessages (consulte Creación de un cursor de grupo). Cada llamada GetMessages devuelve el cursor que se va a utilizar en la siguiente llamada GetMessages como valor de cabecera opc-next-cursor. El cursor devuelto nunca es nulo, pero caduca en cinco minutos. Mientras siga consumiendo, no necesita volver a crear un cursor.
Cuando Streaming recibe una solicitud de mensaje de una instancia, el servicio:
Responde con los mensajes definidos por el cursor de la solicitud
Los tamaños de lote de GetMessages se basan en el tamaño medio de mensaje publicado en ese flujo. Por defecto, el servicio devuelve tantos mensajes como sea posible. Puede utilizar el parámetro limit para especificar cualquier valor hasta 10 000, pero debe tener en cuenta el tamaño medio de mensaje para evitar exceder el rendimiento en el flujo o los timeouts.
Si no hay más mensajes no leídos en la partición, Streaming devuelve una lista vacía de mensajes.
Dado que los grupos de consumidores eliminan instancias que han parado el consumo de mensajes durante más de 30 segundos, solicite menos mensajes para evitar timeouts o amplíe el timeout mediante ConsumerHeartbeat (consulte Envío de latidos).
No se puede asignar una partición a varias instancias del mismo grupo de consumidores. Si tiene más instancias que particiones, las instancias no asignadas pueden enviar solicitudes GetMesages, pero no reciben ningún mensaje. De lo contrario, permanecen inactivas hasta que el grupo de consumidores necesite sustituir una instancia, por ejemplo, cuando un miembro existente del grupo no actúa dentro del período de timeout.
Si necesita actualizar manualmente la posición del grupo, puede utilizar UpdateGroup (consulte Actualización de un grupo de consumidores) para restablecer la ubicación de todos los consumidores del grupo a la ubicación especificada en el flujo.
Desplazamientos y confirmaciones 🔗
Los desplazamientos indican la ubicación del mensaje dentro de la partición. Si se reinicia un consumidor o necesita recuperarse de un fallo, puede utilizar el desplazamiento para reiniciar la lectura del flujo.
Al utilizar un grupo de consumidores, Streaming gestiona los desplazamientos automáticamente. El comportamiento por defecto de commitOnGet=true significa que se confirman los desplazamientos de la solicitud anterior. Por ejemplo:
Para el consumidor A:
A llama a GetMessages y recibe mensajes de una partición arbitraria con desplazamientos de 1 a 100.
A procesa los 100 mensajes correctamente.
A llama a GetMessages y el servicio Streaming confirma el desplazamiento 100 y devuelve mensajes con los desplazamientos 101-200.
A procesa 15 mensajes y, a continuación, se desconecta de forma inesperada (durante más de 30 segundos).
Un nuevo consumidor B:
B llama a GetMessages y el servicio Streaming utiliza el último desplazamiento confirmado y devuelve mensajes con los desplazamientos 101-200.
B continúa el bucle de mensajes.
En este ejemplo, una parte (15) de los mensajes se ha procesado al menos una vez, lo que significa que podrían haberse procesado más de una vez, pero no se ha perdido ningún dato.
Streaming proporciona semántica "al menos una vez" para grupos de consumidores. Considere cuándo se confirman los desplazamientos en un bucle de mensajes. Si un consumidor se desconecta antes de confirmar un lote de mensajes, ese lote se podría entregar a otro consumidor. Cuando se entrega una partición a otro consumidor, el consumidor utiliza el último desplazamiento confirmado para iniciar el consumo. El consumidor no recibe mensajes anteriores al desplazamiento confirmado. Recomendamos que las aplicaciones de consumidor se encarguen de los duplicados.
Nota
Los desplazamientos de mensajes no son densos. Los desplazamientos son números que aumentan monótonamente. No disminuyen, y a veces aumentan en más de una unidad. Por ejemplo, si publica dos mensajes en la misma partición, el primer mensaje podría tener un desplazamiento de 42 y el segundo mensaje podría tener un desplazamiento de 45 (no existen los desplazamiento 43 y 44).
Para sustituir el comportamiento de desplazamiento por defecto e implantar un mecanismo de confirmación de desplazamiento personalizado, defina commitOnGet en false al crear el cursor de grupo. Puede utilizar ConsumerCommit (consulte Manually Committing an Offset) para confirmar mensajes sin leer más mensajes. ConsumerCommit devuelve un cursor para que lo utilice en la siguiente solicitud.
Atención
La escritura de la lógica de confirmación personalizada es complicada y está llena de consideraciones y de condiciones. Existen muchos casos en los que se cambia algún estado interno y se requiere al cliente que gestione la situación.
Equilibrio y reequilibrio 🔗
Streaming tiene en cuenta el número de particiones del flujo y el número de instancias del grupo de consumidores al evaluar el equilibrio. El equilibrio de grupo es automático. Cada consumidor se asigna a una o más particiones según el siguiente cálculo:
(nParticiones/nConsumidores) ± 1
Por ejemplo, si hay ocho particiones en el flujo y cuatro consumidores en el grupo, cada consumidor se asigna a dos particiones. Si hay 10 particiones en el flujo y cuatro consumidores en el grupo, se asignan dos consumidores a dos particiones y dos consumidores a tres particiones.
A medida que las instancias se unen o abandonan un grupo de consumidores y se realizan solicitudes de mensajes, las asignaciones de particiones se vuelven a evaluar. Si el flujo tiene al menos una partición más que el número de instancias actuales del grupo y se une una nueva instancia, las particiones se reasignan a todas las instancias, incluida la nueva. Si una instancia del grupo deja de consumir mensajes durante más de 30 segundos o no envía ConsumerHeartbeat en 30 segundos, esa instancia se elimina del grupo de consumidores y su partición se reasigna, si es posible, a otra instancia.
Estos eventos se llaman reequilibrio. Las instancias del grupo no son conscientes del proceso de equilibrio, pero el grupo se ha coordinado para poseer un juego de particiones mutuamente excluyentes en el flujo.
Al final de una operación de reequilibrio correcta para un grupo de consumidores, cada partición dentro del flujo es propiedad de una instancia dentro del grupo.
De esta forma, puede escalar verticalmente el número de instancias al número de particiones hasta que cada instancia consuma mensajes de una sola partición. Esta configuración maximiza el rendimiento disponible del flujo. A partir de ese punto, cualquier instancia nueva que se una al grupo permanecerá en estado inactivo sin que se asigne a ninguna partición.