Background Image
TECHNOLOGIE

Comment j'ai activé la livraison "Au moins une fois" de MQTT5 vers un sujet Kafka

Ravjot Brar
Senior Consultant

March 28, 2025 | 6 Lecture minute

TLDR

La livraison "au moins une fois" garantit que les messages sont livrés de manière fiable à leur système cible, même en cas de perturbations. Cet aspect est essentiel pour la tolérance aux pannes et l'intégrité des données dans les systèmes à haute disponibilité.

Les défis

  1. Persistance de la session : Les connexions entre les clients et les courtiers doivent survivre aux abandons pour éviter la perte de messages.

  2. Livraison à Kafka : Les messages doivent atteindre les sujets Kafka de manière fiable, même si les connexions sont interrompues.

Les solutions

  1. Persistance des sessions : Utiliser MQTT5 cleanStart et sessionExpiryInterval pour s'assurer que les sessions restent actives pendant les déconnexions.

  2. ACKs manuels : Activer les acquittements manuels dans Paho MQTT5 pour confirmer que les messages ont été entièrement traités avant de les marquer comme livrés.

Qu'est-ce que la livraison "au moins une fois" et pourquoi est-elle importante ?

La livraison au moins une fois est une garantie de messagerie qui fait exactement ce qu'elle dit : elle assure que les messages sont livrés au moins une fois au système cible. Cette garantie est évidemment importante pour s'assurer qu'aucune donnée n'est perdue lors du transfert de messages d'un système à l'autre.

Supposons que nous ayons deux systèmes appelés Source et Destination. Ces deux systèmes peuvent convenir à l'avance que les messages doivent être livrés exactement une fois, au moins une fois ou sans garantie. Si le destinataire souhaite recevoir les messages "au moins une fois", il enverra un ACK à la source chaque fois qu'il recevra un message. Il s'agit d'un retour d'information permettant à la source de savoir que le message est bien parvenu à la destination. Si la source ne reçoit pas d'ACK, elle retransmet le message jusqu'à ce qu'il soit reçu avec succès.

La garantie de livraison est importante pour les systèmes qui nécessitent une tolérance aux pannes, l'intégrité des données et une haute disponibilité.

Alors, n'est-ce pas aussi simple que de régler le paramètre QoS sur 1 ?

Dans MQTT, un client peut définir le niveau de qualité de service (QoS) à 2 (exactement une fois), 1 (au moins une fois) ou 0 (aucune garantie). Nous devrions donc pouvoir régler la qualité de service sur le niveau 1 et nous en tenir là, n'est-ce pas ? Ce serait le cas si vous pouviez garantir la connexion entre l'abonné et le courtier MQTT ou entre l'abonné et le cluster Kafka. Cependant, nous voulons que la livraison soit garantie à l'abonné, même lorsque la connexion est perdue et rétablie. En outre, nous exigeons que les messages soient garantis d'être livrés au moins une fois au sujet Kafka à partir du connecteur. Le diagramme simplifié ci-dessous illustre tous les composants impliqués :

Asset - Image 1 How I enabled “At Least Once” Delivery from MQTT5 to Kafka Topic

Essentiellement, nous avons deux problèmes à résoudre :

  1. Persister les sessions lorsque la connexion entre un client et le courtier est perdue.

  2. Garantir au moins la qualité de service 1 pour les messages livrés à un sujet Kafka lorsque la connexion entre Kafka et le connecteur est perdue.

Comment résoudre ce problème ?

Persister les sessions lorsque la connexion entre un client et le courtier est perdue.

Un utilisateur averti de MQTT penserait immédiatement à la fonction cleanStart de MQTT5 (dans MQTT3, cette option s'appelle cleanSession). La propriété cleanStart définit si la connexion doit être oubliée ou non lorsque le client se déconnecte du courtier. Si cette propriété vaut false, la session du client persistera malgré les déconnexions. Dans MQTT5, cette propriété doit être utilisée en tandem avec sessionExpiryInterval.

NOTE : Selon la documentation de Paho, cette propriété sera remplie à -1 par défaut, ce qui devrait indiquer que la session n'expire pas. Cependant, dans mes tests, la propriété a nécessité une valeur positive pour être prise en compte.

Assurer au moins la QoS 1 pour les messages livrés à un sujet Kafka

Le composant Paho MQTT5 définit le niveau de QoS du connecteur au niveau 2 par défaut. Cela garantit que les messages sont reçus exactement une fois par le connecteur. Cependant, si les messages sont reçus par le connecteur, mais que la connexion entre le connecteur et le courtier est perdue, le courtier suppose que les messages ont été livrés avec succès. Alors, comment s'assurer que le courtier attend que les messages soient transférés vers le sujet Kafka avant de célébrer prématurément ?

La solution est d'activer les accusés de réception manuels dans le consommateur MQTT5 de Paho.

if (getEndpoint().getConfiguration().isManualAcksEnabled()) {

client.setManualAcks(true) ;

}

Par défaut, les accusés de réception sont envoyés automatiquement lorsque la fonction messageArrived de la méthode de rappel. Après avoir défini manualAcks à true, le client (connecteur) doit envoyer les accusés de réception manuellement lorsque les messages sont traités avec succès vers Kafka, à l'aide de la méthode de rappel messageArrivedComplete à l'aide de la méthode messageArrivedComplete :

if (getEndpoint().getConfiguration().isManualAcksEnabled()) {

exchange.getExchangeExtension().addOnCompletion(new Synchronization() {

@Override

public void onComplete(Exchange exchange) {

try {

     PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos()) ;

} catch (MqttException e) {

LOG.warn("Failed to commit message with ID {} due to MqttException.", mqttMessage.getId()) ;

}

}

@Override

public void onFailure(Exchange exchange) {

LOG.error("Rollback due to error processing Exchange ID : {}", exchange.getExchangeId(), exchange.getException()) ;

}

}) ;

}

Pour voir la solution complète, veuillez consulter mon PR dans le repo Apache Camel CAMEL-21589 : Activer les acks manuels mqtt5. 

Comment prouver que notre solution fonctionne ?

Le test que nous allons effectuer consiste à configurer Kafka Connect dans une image Docker. J'utiliserai un Azure Event Hubs Namespace comme cluster Kafka et Mosquitto comme broker (et publisher) MQTT5. Le scénario que nous essayons d'imiter est celui où la connexion entre notre connecteur source et notre broker Mosquitto est temporairement perdue, alors que les messages sont toujours publiés.

Dans le test ci-dessous, nous reproduisons le problème où les messages qui sont envoyés alors que la connexion est instable ne sont pas garantis d'être livrés au sujet Kafka (ou dans ce cas, au Azure Event Hub).

  1. Publier les messages "First four" alors que toutes les connexions sont stables.

  2. Mettre à jour le connecteur pour avoir une connexion défectueuse aux Azure Event Hubs.

  3. Publier les "quatre premiers" messages.

  4. Mettre à jour le connecteur pour avoir une connexion stable aux Azure Event Hubs.

  5. Publier les messages "Third four".

Comme prévu, les messages envoyés au connecteur alors que sa connexion avec Azure Event Hubs est perdue ne sont pas délivrés avec succès.

Asset - Image 2 - How I enabled “At Least Once” Delivery from MQTT5 to Kafka Topic

Nous voulons effectuer ce même test après avoir mis en œuvre la solution que nous avons proposée afin de nous assurer que nous avons effectivement résolu le problème. Lors de la création de notre connecteur mis à jour, nous avons défini les propriétés suivantes :

  1. cleanStart: false

  2. sessionExpiryInterval: > 0

  3. manualAcksEnabled: true

Comme prévu, les messages envoyés pendant que la connexion était interrompue sont toujours délivrés, même si la connexion avec Azure et Mosquitto a été temporairement perdue.

Asset - Image 3 - How I enabled “At Least Once” Delivery from MQTT5 to Kafka Topic

Conclusion

Alors, qu'est-ce que j'ai appris de tout cela ? Essentiellement, au moins une fois, que la livraison n'est pas toujours aussi simple qu'il n'y paraît, même lorsqu'il n'y a que quelques machines impliquées. Des problèmes tels que les connexions interrompues et les accusés de réception intempestifs peuvent rendre la garantie de livraison des messages complexe. Avec la fonction cleanStart et sessionExpiryInterval les sessions sont maintenues malgré les déconnexions. De plus, il peut y avoir des problèmes avec le connecteur qui doivent être corrigés dans le repo du connecteur. Parfois, vous devrez aller directement au source (ouvert) et convaincre la communauté de le mettre dans leur backlog. Cependant, si le problème n'est pas urgent, il ne sera généralement pas corrigé dans les délais impartis. C'est pourquoi, chez Improving, nous prenons l'initiative de mettre en œuvre la solution nous-mêmes, afin que la communauté ait accès à un correctif dès que possible.

N'hésitez pas à nous contacter !

Technologie

Dernières réflexions

Explorez nos articles de blog et laissez-vous inspirer par les leaders d'opinion de nos entreprises.
Asset - Image 1 Why Successful Software Projects Require More Than Technical Skills
TECHNOLOGIE

Comment j'ai activé la livraison "Au moins une fois" de MQTT5 vers un sujet Kafka

Livraison des messages "au moins une fois" de MQTT5 à Kafka, garantissant la fiabilité pour la tolérance aux pannes et l'intégrité des données.