Reequilibrios - En función de la disposición de su grupo de consumidores y del número de particiones, el proceso de reequilibrio que se desencadena cuando un consumidor abandona el grupo puede ser penoso y llevar mucho tiempo.
Falta de granularidad - si el consumidor está leyendo varios temas, se cierra el consumo de todo. No hay punto intermedio.
Procesamiento sensible - para volver a habilitar el procesamiento, todos los consumidores tienen que arrancar de nuevo y reequilibrar las particiones. Con más de 100 instancias consumidoras, esto puede llevar mucho tiempo.
Antes de entrar en cómo incorporar Feature Flags, deberíamos echar un vistazo a la funcionalidad que ya existe en las librerías cliente de Kafka para pausar y reanudar dinámicamente el procesamiento. Ya estamos más cerca del objetivo final de lo que te imaginas.
Pausa de consumidores Kafka
El cliente Kafka Consumer tiene la funcionalidad de recuperar particiones asignadas, pausar particiones específicas, recuperar particiones actualmente pausadas y reanudar particiones específicas. Estos métodos son todo lo que necesitas para pausar y reanudar el consumo dinámicamente.
Veamos con más detalle estas operaciones.
KafkaConsumer.assignment(): devuelve las particiones de temas actualmente asignadas en forma de un conjunto de TopicPartition. El TopicPartition contiene dos propiedades principales, el nombre del tema como String y la partición como int.
KafkaConsumer.pause(Set of TopicPartition): suspende la búsqueda en las particiones de temas proporcionadas. El sondeo del consumidor continuará, que es clave para mantener el consumidor vivo y evitar reequilibrios. Sin embargo, los sondeos futuros no devolverán ningún registro de las particiones en pausa hasta que se hayan reanudado.
KafkaConsumer.paused(): devuelve las particiones de temas que están actualmente asignadas y en estado de pausa como resultado del uso del método pause.
KafkaConsumer.resume(Set of TopicPartition): reanuda la búsqueda desde las particiones de temas proporcionadas.
Opción 1: Pausar el mundo
La forma más sencilla de pausar el procesamiento es pausarlo todo. El siguiente fragmento ilustra esto.
// obtener todas las particiones asignadas para el consumidor Set<TopicPartition> assignedPartitions = consumer.assignment();
// pausa todas las particiones asignadas Set<TopicPartition> pausedPartitions = consumer.pause(assignedPartitions);
// reanudar todas las particiones pausadas consumer.resume(consumer.paused());
// también se podría reanudar consumer.assignment() ya que todo está en pausa consumer.resume(consumer.assignment());
Opción 2: Pausa selectiva
Es útil tener el gran botón rojo para apagar todo el procesamiento mostrado anteriormente, pero sería más útil si pudiéramos controlar las cosas a un nivel más granular.
El siguiente fragmento muestra cómo sería pausar las particiones para un único tema.
// supongamos que el consumidor está suscrito a demo.topic.name && another.topic.name String pausedTopic = "demo.topic.name";
//
// PAUSA //
// filtra las particiones asignadas al tema en pausa
Set<TopicPartition> partitionsToPause = consumer.assignment() .stream()
.filter(tp -> pausedTopic.equals(tp.topic())) .collect(toSet());
Establecer<TopicPartition> pausedPartitions = consumer.pause(partitionsToPause);
// // REANUDAR //
// filtra las particiones en pausa para el tema que se reanuda Set<TopicPartition> partitionsToResume = consumer.paused()
.stream() .filter(tp -> pausedTopic.equals(tp.topic())) .collect(toSet());
consumer.resume(partitionsToResume);
Desencadenar las banderas
Después de ver el último fragmento, probablemente puedas adivinar cómo entran en escena los Feature Flags para suministrar dinámicamente la variable "pausedTopic" del fragmento.
Con una simple convención de nomenclatura de Feature Flags como topic_{topic-name-here} la aplicación puede extraer todas las banderas y filtrar sólo aquellas que le interesen.
El pseudocódigo siguiente muestra cómo podría ser.
// dos temas que interesan a este consumidor List assignedTopics = ["demo.topic.one", "demo.topic.two"].
// todas las banderas de características List flags = [
"topic_demo.topic.one", "ui-feature-flag", "topic_important.topic"
]
// los indicadores que realmente interesan a este consumidor List assignedTopicFlags = toggles
.stream() .filter(t -> t.startsWith('topic_') && assignedTopics.contains(getTopicName(t)) .collect(toList());
// assignedTopicFlags = ["topic_demo.topic.one"]
Ahora que ya podemos imaginarnos cómo funciona esto, veamos cómo integrar Unleash, una solución de Feature Flag de código abierto. Puedes sustituir Unleash por cualquier producto de banderas de características.
Unleash
Hay muchas opciones de Feature Flag en el mercado, pero Unleash es una plataforma de gestión de características de código abierto que ha funcionado bien para mí. Ha ganado suficiente tracción que Gitlab ahora lo ofrece como un servicio integrado y a partir de Gitlab 13.5, está disponible en todos los niveles. Una lista completa de las capacidades se puede encontrar aquí.
Conectar aplicaciones a Unleash es sencillo. Seleccione el SDK de Unleash que se ajuste a su aplicación y configure la URL de Unleash y el ID de instancia. Si utilizas Gitlab, hay instrucciones claras para integrar los indicadores de funciones con tu aplicación.
A continuación se muestra un ejemplo de configuración del cliente Unleash en una aplicación Java Spring Boot.
/** * Configurar el cliente que se conecta a Unleash si unleash.enabled es "true" **/ @Bean @ConditionalOnProperty(value = "unleash.enabled", havingValue = "true") Unleash defaultUnleash() {
UnleashConfig config = UnleashConfig.builder() .unleashAPI("<http://unleash-api.com>") .instanceId("asdf-123") .appName("unleash-demo") .environment("development") .fetchTogglesInterval(10)
.subscriber(new Log4JSubscriber()) .build();
return new Unleash(config); }
/** * Si unleash.enabled es false, el bean anterior no se creará. * En este escenario, el cliente FakeUnleash será inyectado en el * contexto de la aplicación para su uso. Esto es útil para el desarrollo local. **/ @Bean @ConditionalOnMissingBean(Unleash.class) Unleash fakeUnleash() {
return new FakeUnleash(config); }
Obtención de banderas de características
Hemos visto la funcionalidad de pausar/reanudar y cómo se puede utilizar una simple convención de nomenclatura de banderas de características para apuntar a los temas que deben ser pausados/reanudados. Ahora vamos a unir las dos cosas y ver las estrategias para obtener y aplicar los toggles.
Opción 1: Bucle de sondeo
La opción más sencilla es integrar las comprobaciones de los indicadores de función en el bucle de sondeo. Unleash refrescará los estados de las banderas en un subproceso en segundo plano, de modo que cada comprobación afectará a la caché al mirar los últimos valores y ver qué es lo que hay que pausar, si es que hay algo. El beneficio de este enfoque es que todo sucede en el hilo de sondeo, lo cual es importante ya que el consumidor no es seguro para los hilos.
// consumidor simplificado try {
while (true) { // recupera los últimos valores de las banderas y aplícalos según corresponda applyFeatureFlags(consumer);
ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) {
// lógica
}
}
} finally {
consumer.close(); }
Opción 2: Suscriptor UnleashEvent
Unleash tiene un diseño interno de eventos que hace que sea fácil suscribirse a los eventos que se disparan después de refrescar los toggles. Esta será la representación más actualizada de los estados de las banderas porque el evento se dispara inmediatamente después de que se actualice la caché.
Como se mencionó en la opción 1, los consumidores no son seguros para los hilos, por lo que tienes que manejar las operaciones de los consumidores adecuadamente y ejecutar las operaciones de pausa de los consumidores en el hilo de sondeo.
El principal beneficio de esto es que no desordena el bucle de sondeo con responsabilidades. Los sondeos continúan y si las cosas están en pausa, no se obtendrán registros. Si las cosas están activas, obtendrán registros.
/** * Suscriptor personalizado que será notificado en todos los UnleashEvents. * El suscriptor extendido Log4JSubscriber registrará los eventos que no se gestionen en esta extensión. **/ public class KafkaUnleashSubscriber extends Log4JSubscriber {
@Override public void on(@NotNull UnleashEvent event) {
// muchos eventos pasan por aquí, sólo nos importan las respuestas if (event instance of FeatureToggleResponse) {
// el evento tiene TODOS los toggles, filtra los que te interesan
// aplica los toggles al consumidor
}
}
}
No olvides el RebalanceListener
La última pieza que hay que tener en cuenta es el ConsumerRebalanceListener. Con múltiples consumidores ejecutándose en un grupo, cada uno será responsable de pausar sus particiones asignadas. Si un consumidor muere, sus particiones asignadas se reequilibrarán automáticamente a los otros consumidores del grupo. Sin embargo, el consumidor que recibe las particiones recién asignadas no tiene conocimiento de su estado previo (pausado/activo) por lo que estarán activas después de la asignación.
El RebalanceListener es tu gancho en este ciclo de vida de rebalanceo para pausar las particiones antes de que comience el consumo y evitar consumir accidentalmente un tema que debería estar pausado.
Con todos los componentes que ya se han construido, debería ser bastante sencillo crear un listener que lo una todo y mantenga las particiones en pausa si es necesario.