Si comme moi vous avez raté le train pour la release de Kafka 2.4, voici un petit résumé de quelques fonctionnalités ajoutées et bugfix :)

Update des Consommateurs / Producteurs / Brokers

Une meilleure optimisation niveau réseau

How-To :

  1. Configurer broker.rack et replica.selector.class (tous deux configurations du broker)
  2. Configurer client.rack (configuration du consommateur)
  3. ???
  4. Profit

En mettant ces trois configurations, lorsqu'un consommateur essaye de récupérer des données de la partition leader, la partition regarde s'il y a un rack préféré et si une class selector est renseignée, puis redirige le consommateur vers le replica préféré (de préférence un qui soit proche du consommateur pour avoir le moins de trafic réseau possible, mais cela dépend de la configuration fournie dans replica.selector.class).

Diminution du temps de rebalancing des partitions des consommateurs

Toki wo tomare !

Le principe de stop-the-world est connu notamment avec le garbage collector : le programme s'arrête complètement afin de ne pas accidentellement rendre des objets inutilisables.

Le même principe s'applique avec le load balancing dans Kafka.

Quelles que soient les raisons amenant à un load balancing des clients Kafka (scaling up/down des clients, perte temporaire d’un broker, rolling upgrade...), les processus s’arrêtent le temps d’équilibrer la charge des partitions. Ce qui pose des soucis au niveau de la performance.

Une solution implémentée par la KIP-429 est donc le protocole incrémental de rebalancing. Celui-ci se repose sur deux idées :

  • Le rebalancing doit se faire de manière incrémentale, c’est-à-dire qu’il n’y a pas besoin d’avoir un rééquilibrage complet des charges à la fin d’un load balancing. Celui-ci se fait petit à petit, ce qui permet de ne pas "déranger" tous les processus à la fois en les stoppant.
  • Le rebalancing doit se faire de manière coopérative puisque chaque client de chaque groupe doit pouvoir être en mesure de relâcher des ressources, pour que les autres clients arrivants puissent prendre ces dernières.

Optimisation lorsqu'il n'y a pas de clé et de partitioning spécifiés

Lors de la production des messages dans Kafka, si la clé est "null" et si les messages ne sont pas partitionnés de manière spécifiques via un partitionner, les messages sont normalement distribués en utilisant le “round-robin” : chaque message est envoyé individuellement dans les différentes partitions en suivant un cycle, le but étant que chaque partition reçoit équitablement le même nombre de message. Ceci crée un problème de latence car les batch de messages à envoyer vers Kafka ne contiennent dans ce cas-là qu’un seul message. Conséquence : il y a plus de batch à envoyer. Pour rappel, plus il y a de batch à envoyer, plus la latence est grande, puisque envoyer un batch requiert une requête et une mise en queue.

Le but du KIP-480 est donc de diminuer cette latence en revisitant le "round-robin" appliqué par défaut lorsqu’on se trouve dans cette situation. La stratégie du "sticky partitioner" vise à ce qu’un batch de messages soit envoyé dans une partition dite "sticky". Puis pour le prochain batch de messages, une autre partition est désignée comme "sticky". Dans ce cas, les batchs sont créés avec les options du producteur : linger.ms et batch.size. Ceci a pour effet d’augmenter la taille des batchs et donc d’avoir moins de batchs à envoyer : on diminue la latence. Netflix avait une idée similaire au sticky partitionner qui se basait seulement sur le temps (linger.ms).

Note rappel :

  • linger.ms : maximum de temps à attendre avant d’envoyer un batch de message
  • batch.size : maximum de taille que le batch peut avoir avant d’être envoyé dans une partition d’un topic de Kafka

Toujours plus pour se débarasser de Zookeeper

  • KIP-455 : Create an Administrative API for Replica Reassignment
  • Motivation : Remplacer l’API de Zookeeper s’occupant de réassigner les partitions répliquées.

L’API de Zookeeper possède plusieurs défauts : des codes d’erreurs non exploitables, un manque de sécurité,... L’API ne réassigne pas non plus les partitions de manière incrémentale, et il n’est pas possible de stopper une opération de réassignation des partitions en cours.

Le but de ce KIP-455 est donc de créer une API Kafka AdminClient permettant ceci, avec plus de sécurité et des codes d’erreurs plus exploitables. Pour cela, deux nouvelles APIs AdminClient ont été créées : alterPartitionAssignments et listPartitionReassignments. La première permet de réassigner les partitions, et la seconde permet de lister les partitions en cours de réassignement. En plus de cela, des RPC ont été ajoutés de manière à stopper le réassignement (AlterPartitionReassignmentsRequest) ou encore obtenir la réponse de cette requête (AlterPartitionReassignmentsResponse), etc.

Avec cette KIP-455, le script kafka-reassing-partitions.sh utilisé pour réassigner les partitions utilise maintenant l’API de l’AdminClient, et l’option zookeeper devient déprécié. Le script kafka-topics.sh permettant de créer/modifier des topics se voit ajouter deux options : addingReplicas et removingReplicas.

Bug fix : KafkaConsummer ne se débarasse plus des données pré-extraites lorsque les partitions sont mises en pauses

Les consommateurs Kafka ont la possibilité d’être stoppés et redémarrés avec les méthodes .pause() et .resume(). Lorsqu’un client consomme des messages avec la méthode .poll(), il le fait de manière asynchrone et met les messages dans un buffer local au client consommateur.

Ce qui veut dire que la situation suivante peut arriver : un client stoppe la consommation d’une partition d’un topic et ensuite appelle le .poll() de cette partition. Dans ce cas, le .poll() met des données dans le buffer local et, lorsque le client reprend sa consommation, celui-ci refait une requête des données vers la partition en ignorant les données mise dans le buffer local. Ceci a des conséquences sur la performance des clients lorsque ceux-ci effectuent beaucoup de pause/resume. Ce problème est maintenant résolu grâce à KAFKA-7548.

Update de Kafka Connect

Mirror Maker 2.0

MirrorMaker permet de créer un “miroir” d’un cluster Kafka entier. Sur papier, cela semble être un bon outil pour répliquer un cluster, néanmoins il possède plusieurs défauts. Pour en citer quelques un : les topics miroirs sont créés avec une configuration par défaut, les ACLs (Access Control Lists) ne sont pas répliqués, les messages sont re-partitionnés avec la “DefaultPartitionner”, etc.

MirrorMaker 2.0 se base sur Kafka Connect afin de répliquer le mieux possible un cluster Kafka.

Ajout de headers dans Kafka Connect

  • KIP-440 : Extend Connect Converter to support headers
  • Les headers dans Kafka permettent d’enrichir les messages Kafka de metadonnées additionnelles. Par exemple transmettre le schéma ID si le message est en avro. Cette KIP-440 vise à également enrichir le Kafka Connect Converter afin que celui-ci puisse être utilisé en tandem avec les clients producteurs/consommateurs/streams utilisant les headers.

Update de Kafka Streams

Support des jointures sans clés avec les KTable

  • KIP-213 : Support non-key joining in KTable

Possibilité de tester rapidement grâce au TopologyTestDriver

  • KIP-470 : TopologyTestDriver test input and output usability improvements

Misc

  • Support scala 2.13
  • Upgrade zookeeper ⇒ 3.5.X

Photo by Roman Kraft on Unsplash