Los retos
Persistencia de la sesión: Las conexiones entre clientes y brokers deben sobrevivir a las caídas para evitar la pérdida de mensajes.
Entrega a Kafka: Los mensajes deben llegar de forma fiable a los temas de Kafka, aunque se interrumpan las conexiones.
Las soluciones
Persistencia de sesiones: Utilice
cleanStart de MQTT5
ysessionExpiryInterval
para asegurar que las sesiones permanecen activas durante las desconexiones.ACKs manuales: Habilite los acuses de recibo manuales en Paho
MQTT5
para confirmar que los mensajes se han procesado completamente antes de marcarlos como entregados.
¿Qué es la entrega "al menos una vez" y por qué es importante?
La entrega al menos una vez es una garantía de mensajería que hace exactamente lo que dice: garantiza que los mensajes se entregan al menos una vez al sistema de destino. Obviamente, esto es importante para garantizar que no se pierden datos al transferir mensajes de un sistema a otro.
Supongamos que tenemos dos sistemas llamados origen y destino. Estos dos sistemas pueden acordar de antemano que los mensajes deben entregarse exactamente una vez, al menos una vez o sin ninguna garantía. Si Destino quiere los mensajes "al menos una vez", enviará un ACK a Origen cada vez que reciba un mensaje. Se trata de una respuesta para que la fuente sepa que el mensaje ha llegado correctamente al destino. Si la fuente no recibe un ACK, retransmitirá el mensaje hasta que lo reciba correctamente.
La garantía de entrega es importante para los sistemas que requieren tolerancia a fallos, integridad de los datos y alta disponibilidad.
Entonces, ¿no es tan fácil como poner el parámetro QoS a 1?
En MQTT, un cliente puede establecer el nivel de Calidad de Servicio (QoS) en 2 (exactamente una vez), 1 (al menos una vez) o 0 (sin garantía). Por lo tanto, deberíamos ser capaces de establecer la QoS en el nivel 1 y llamarlo un día, ¿verdad? Bueno, ese sería el caso si pudieras garantizar la conexión desde el suscriptor al broker MQTT o desde el suscriptor al cluster Kafka. Sin embargo, queremos que se garantice la entrega al suscriptor, incluso cuando se pierde y se restablece la conexión. Además, necesitamos que se garantice la entrega de los mensajes al menos una vez al tema Kafka desde el conector. El siguiente diagrama simplificado ilustra todos los componentes implicados:

Esencialmente, tenemos dos problemas que necesitamos resolver:
Persistir las sesiones cuando se pierde la conexión entre un cliente y el broker.
Garantizar al menos QoS 1 para los mensajes que se entregan a un tema de Kafka cuando se pierde la conexión entre Kafka y el conector.
¿Cómo lo solucionamos?
Persistir las sesiones cuando se pierde la conexión entre un cliente y el broker.
Un usuario apto de MQTT pensaría inmediatamente en la función cleanStart
propiedad de MQTT5 (en MQTT3, esta opción se llama cleanSession
). La dirección cleanStart
establece si la conexión debe o no ser olvidada cuando el cliente se desconecta del broker. Cuando se establece esta propiedad a false, la sesión del cliente persistirá a través de las desconexiones. En MQTT5 esta propiedad debe utilizarse junto con sessionExpiryInterval
.
NOTA: De acuerdo con la documentación de Paho, esta propiedad se rellenará con -1 por defecto, lo que debería indicar que la sesión no expira. Sin embargo, en mis pruebas, la propiedad requiere un valor positivo para que surta efecto.
Garantizar al menos QoS 1 para los mensajes que se entregan a un tema Kafka
El componente Paho MQTT5 establece el nivel de QoS del conector en el nivel 2 por defecto. Esto asegura que los mensajes son recibidos exactamente una vez por el conector. Sin embargo, si los mensajes son recibidos por el conector, pero la conexión entre el conector y el broker se pierde, entonces el broker asume que los mensajes fueron entregados con éxito. Entonces, ¿cómo nos aseguramos de que el broker espera a que los mensajes se vuelquen al tema Kafka antes de celebrarlo prematuramente?
La solución es habilitar acuses de recibo manuales en el consumidor Paho MQTT5.
if (getEndpoint().getConfiguration().isManualAcksEnabled()) {
client.setManualAcks(true);
}
Por defecto, los acuses de recibo se envían automáticamente cuando se completa con éxito el proceso mensajeLlegado
de devolución de llamada. Después de establecer manualAcks
a true, el cliente (conector) debe enviar los acuses de recibo manualmente cuando los mensajes se procesan correctamente a Kafka, utilizando el método de devolución de llamada messageArrivedComplete
:
if (getEndpoint().getConfiguration().isManualAcksEnabled()) {
exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
try {
PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
} catch (MqttException e) {
LOG.warn("Error al enviar el mensaje con ID {} debido a una MqttException.", mqttMessage.getId());
}
}
@Override
public void onFailure(Exchange exchange) {
LOG.error("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(), exchange.getException());
}
});
}
Para ver la solución completa, consulte mi PR en el repositorio de Apache Camel CAMEL-21589: Enable manual acks mqtt5.
¿Cómo probar que nuestra solución funciona?
La prueba que vamos a realizar consiste en configurar Kafka Connect en una imagen Docker. Utilizaré un Azure Event Hubs Namespace como cluster de Kafka y Mosquitto como broker (y publisher) de MQTT5. El escenario que estamos tratando de imitar es uno en el que la conexión de nuestro conector de origen a nuestro broker Mosquitto se pierde temporalmente, mientras que los mensajes siguen siendo publicados.
En la prueba siguiente, reproducimos el problema por el que no se garantiza la entrega de los mensajes enviados mientras la conexión es inestable al tema de Kafka (o, en este caso, al Azure Event Hub).
Publica los "cuatro primeros" mensajes mientras todas las conexiones son estables.
Actualiza el conector para que tenga una conexión defectuosa con Azure Event Hubs.
Publica los "segundos cuatro" mensajes.
Actualiza el conector para que tenga una conexión estable con Azure Event Hubs.
Publica los "Terceros cuatro" mensajes.
Como era de esperar, los mensajes enviados al conector mientras su conexión con Azure Event Hubs está perdida no se entregan correctamente.

Queremos ejecutar esta misma prueba después de implementar nuestra solución propuesta para asegurarnos de que realmente hemos solucionado el problema. Al crear nuestro conector actualizado, establecemos las siguientes propiedades:
cleanStart
falsesessionExpiryInterval
: > 0manualAcksEnabled
: true
Como era de esperar, los mensajes enviados mientras la conexión estaba interrumpida se siguen entregando a pesar de que la conexión tanto con Azure como con Mosquitto se perdió temporalmente.

Conclusión
¿Qué he aprendido de todo esto? Esencialmente, al menos una vez, la entrega no siempre es tan sencilla como parece, incluso cuando sólo hay unas pocas máquinas implicadas. Problemas como conexiones caídas y acuses de recibo inoportunos pueden hacer que la garantía de entrega de mensajes sea compleja. Con la función cleanStart
y sessionExpiryInterval
las sesiones se mantienen durante las desconexiones. Además, podría haber problemas con el conector que deben ser corregidos en el repo del conector. A veces tendrás que ir directamente al código fuente (abierto) y convencer a la comunidad para que lo incluya en su backlog. Sin embargo, si el problema no es urgente, normalmente no se solucionará en el plazo previsto. Por eso, en Improving tomamos la iniciativa de implementar la solución nosotros mismos, para que la comunidad tenga acceso a una solución lo antes posible.