Kafka Streams propose depuis la version 0.10.0.0 de Kafka un paradigme intéressant pour traiter des données au fil de l'eau. Comme son nom l'indique, il se base déjà sur Kafka qui propose une plateforme de traitement en streaming parmi les plus performantes actuellement (bon, il y a aussi Pulsar). Et puis Kafka Streams propose une API avec des opérations que nous retrouvons fréquemment en programmation fonctionnelle (map, flatMap, reduce...). Enfin, Kafka Streams nous pousse à nous orienter vers une architecture microservices. Tant de buzzwords :)

Mouep ! Ça ne vous empêchera pas nécessairement d'avoir un réseau de micro-sévices.

Il est en effet important de savoir dans quel état est chaque service, de surveiller leur activité. Une façon de faire est de coller à Kafka Streams un service Web qui expose une API Rest de monitoring.

Que proposent les outils de la programmation fonctionnelle pour faire ça ?

Ici, je vous propose de tester une approche basée sur ZIO et http4s.

Un microservice Kafka Streams

Nous allons nous baser sur un des "hello world" des services de streaming : le découpage de lignes de texte en mots. Nous recevons des lignes de texte depuis un premier topic "text-stream", nous découpons ces lignes en mots et nous émettons ces mots sur un second topic "word-stream".

Le main d'un service Kafka Streams ressemble à peu près à ça.

val inputStream = "text-stream"
val outputStream = "word-stream"

// -- Definition of the topology
val topology: Topology = {
  import org.apache.kafka.streams.scala.ImplicitConversions._
  import org.apache.kafka.streams.scala.Serdes._
  import org.apache.kafka.streams.scala.StreamsBuilder

  val builder = new StreamBuilder()

  builder.stream[String, String](inputStream)
      // our algorithme
      .flatMapValues(line =>
        line.split("[\\\\s.,;:!?]+").toIterable)  // cut cut cut... 🔪
    .to(outputStream)

  builder.build()
}

// -- Kafka Streams parameters
val properties = {
  import StreamsConfig._
  
  JavaProperties(
      APPLICATION_ID_CONFIG -> "mon-service",
      BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      DEFAULT_KEY_SERDE_CLASS_CONFIG -> Serdes.String().getClass,
      DEFAULT_VALUE_SERDE_CLASS_CONFIG -> Serdes.String().getClass
    )
}

// -- Kafka Streams service
val streams = new KafkaStreams(topology, properties)

streams.start()

sys.ShutdownHookThread { stream.close() }

JavaProperties est une petite astuce pour produire des java.lang.Properties en Scala avec moins de boilerplate 😁

import java.util.Properties

object JavaProperties {
  import scala.jdk.CollectionConverters._ // scala 2.13

  def apply(properties: (String, AnyRef)*): Properties = {
    val prop = new Properties()
    prop.putAll(properties.toMap.asJava)

    prop
  }
}

Pour information, la topologie de notre service ressemble à (lien)

Pour tester ce service, il faudra d'abord créer les topics. Vous pourrez alors lancer le service. Puis vous instancierez un consumer pour récupérer les données depuis le topic "word-stream" et instancierez un producer pour émettre des lignes de texte sur le topic "text-stream". Kafka propose ses propres outils pour ça. Sinon, vous pouvez toujours jeter un œil à Conduktor.

Kafka Streams et ZIO

Peut-on avoir un code plus fonctionnellement pur ?

Ici, nous allons lier l'exécution de notre application avec l'exécution du service Kafka Streams... et plus encore !

En première étape, nous allons encapsuler Kafka Streams avec ZIO, car d'un point de vue de ZIO, Kafka Streams n'est pas purement fonctionnelle.

Voici une classe abstraite qui nous permettra d'arriver à nos fins.

import zio.{RIO, Task, UIO, ZIO}

/**
  * Wrap a mutable dependency into a ZIO context.
  *
  * @param a reference to the mutable dependency
  * @tparam A mutable dependency type
  */
abstract class WrapMutable[A](private val a: A) {

  final protected def executeTotal[B](f: A => B): UIO[B] = UIO(f(a))

  final protected def executeTotalM[R, E, B](
    f: A => ZIO[R, E, B]
  ): ZIO[R, E, B] = f(a)

  final protected def unsafeTotal[B](f: A => B): B = f(a)

  final def execute[B](f: A => B): Task[B] = Task(f(a))

  final def executeM[R, B](f: A => RIO[R, B]): RIO[R, B] = Task(f(a)).flatten

}

Cette classe permet d'encapsuler d'autres classes ou services qui ne sont pas référentiellement transparents (voir 1 et 2 pour plus d'explication sur ce terme). Les appels sont aussi encapsulés dans des instances de ZIO. Ainsi, pour une service myService avec une méthode perform, au lieu de faire myService.perform, je vais faire val zMyService = new WrapMutable(myService) {}; zMyService.execute(_.perform) pour gérer avec ZIO l'appel de la méthode.

Voici une utilisation de WrapMutable dans le cadre Kafka Streams, dans laquelle je lui crée une surcouche ZIO.

import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.{KafkaStreams, TopologyDescription}
import zio.duration.Duration
import zio.{UIO, ZEnv, ZIO}

class ZKafkaStreams(streams: KafkaStreams,
                    stateCheckingDelay: Duration)
    extends WrapMutable[KafkaStreams](streams) {

  /**
    * The main function that launch the service
    */
  def serve: ZIO[Clock, Throwable, Unit] = 
    (cleanup *> start *> waitWhileIsUp).ensuring(close).unit

  // this part execute a check loop that the Kafka Streams is still up
  def waitWhileIsUp:URIO[Clock,Unit] =
    isUp.repeat {
      import zio.Schedule._
      doWhile[Boolean](x => x) >>> fixed(Duration(500, TimeUnit.MILLISECONDS))
    }.unit

  def isUp:UIO[Boolean] = status.map(s => s.isRunning || s == KafkaStreams.State.CREATED)

  //REMAP
  def cleanUp:Task[Unit]              = execute(_.cleanUp())
  def start:Task[Unit]                = execute(_.start)
  def close:UIO[Unit]                 = executeTotal(_.close)
  def status: UIO[KafkaStreams.State] = executeTotal(_.state())

}

L'opérateur *> permet de séquencer des appels (implémenté avec un flatMap). L'exécution du service Kafka Streams est commandé par la méthode serve. Une méthode status est ajoutée à des fins de monitoring comme nous le verrons plus loin.

Il ne reste plus qu'à créer notre application ZIO.

import zio.ZIO

object LineProcessingService extends zio.App {

  val topology: Topology = { /* ... (see above) */ }

  val properties: Properties = { /* ... (see above) */ }

  lazy val zKafkaStream: ZKafkaStreams =
    new ZKafkaStreams(
      streams = new KafkaStreams(topology, properties),
    )

  override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
    zKafkaStream.serve
      .as(0)
      .orDie

}

Sur les dernières lignes, j'indique que la sortie nominale de notre application retourne la valeur 0 à l'OS (.as(0)) ou la valeur 1 en cas de problème (.orDie).

OK, j'ai mon service embarqué dans ZIO. Mais je n'ai pas moyen de savoir si celui-ci fonctionne normalement si je suis en dehors de la machine hôte.

Émettre des donnés de monitoring depuis son service

Une fonctionnalité nécessaire dans un service est un endpoint HTTP indiquant si ce service fonctionne. Nous allons utiliser http4s et le lier à ZIO pour envoyer une réponse 200 (OK) ou 503 (SERVICE_UNAVAILABLE) en fonction de l'état de Kafka Streams.

Au moment d'écrire ces lignes, il existe quelques tests et tutos associant ZIO et http4s (un article de Wiem Zien ici, un article , une petit tour sur Google...). http4s est par défaut associé cats, cats-effect et fs2. Mais, il y a dans ZIO un projet qui lui permet d'exposer une interface cats-effect.

Voici comment est-ce que ça se passe. Tout d'abord, nous codons le service.

import org.http4s._
import org.http4s.dsl.Http4sDsl
import zio.interop.catz._
import zio.Task

object HealthCheckService {
  private val dsl = Http4sDsl[Task]
  import dsl._

  def webService(streams: ZKafkaStreams): HttpRoutes[Task] =
    HttpRoutes.of[Task] {
        case GET -> Root =>
          streams.status.flatMap { st =>
            if (st.isRunning) Ok()
            else ServiceUnavailable()
          }
      }
}

Dans les projets ZIO, il y a une lib d'interopérabilité avec cats. En effet, http4s est basé sur cats, cats-effect et fs2 pour instancier des services. De plus, le projet ZIO-http n'a pas démarrer. Grâce ZIO-interop-cats, l'API ZIO est adapté pour utilisé zio.Task à la place du type IO de cats-effect et qui est conceptuellement proche (l'aspect bifunctor en moins).

Et voici en parallèle, un générateur de service Web avec la variable router à redéfinir pour exposer des endpoints.

import org.http4s.HttpRoutes
import org.http4s.server.blaze.BlazeServerBuilder
import zio.interop.catz._
import zio.interop.catz.implicits._
import zio.{Task, ZEnv, ZIO}

abstract class ZKafkaStreamsWebService(host: String, port: Int) {
  import org.http4s.implicits._

  val router: HttpRoutes[Task]

  final def serve: ZIO[ZEnv, Throwable, Unit] =
    ZIO.runtime[ZEnv]
      .flatMap { implicit rts =>
        BlazeServerBuilder[Task]
          .bindHttp(port, host)
          .withHttpApp(router.orNotFound)
          .serve
          .compile
          .drain
      }

}

En associant le tout, nous avons un service Web pour notre service avec un endpoint de health check.

import com.carrefour.phenix.atp.poc_ks.lib.{ZKafkaStreams, ZKafkaStreamsWebService}
import com.carrefour.phenix.atp.poc_ks.lib.endpoints.{HealthCheckService, ServiceInfoService, TopologyService}
import org.http4s.HttpRoutes
import org.http4s.server.Router
import zio._
import zio.interop.catz._

class LineProcessingWebService(zKafkaStreams: ZKafkaStreams,
                               host: String,
                               port: Int)
    extends ZKafkaStreamsWebService(host, port) {

  override val router: HttpRoutes[Task] =
    Router(
      "/monitoring/healthCheck" -> HealthCheckService.webService(zKafkaStreams)
    )

}

Pour terminer, voici le contenu du code de notre application.

lazy val webService: LineProcessingWebService =
  new LineProcessingWebService(
    zKafkaStreams = zKafkaStream,
    host          = "localhost",
    port          = 8080)

override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
  (webService.serve.fork
    *> zKafkaStream.serve)
    .as(0)
    .orDie

Ici .fork permet de lancer le service Web en parallèle, en plaçant son exécution dans un fiber. Un fiber est une sorte de version plus légère et collaborative des threads.

On a donc un service Web qui est lancé en parallèle et le service Kafka Streams sur lequel notre application est synchronisé.

Conclusion

Ici, Kafka Streams fonctionne très bien avec ZIO et http4s. L'ensemble offre un cadre en partie fonctionnellement pure qui favorise la composition des différentes briques de notre application.

Actuellement, de mon côté, la solution est à l'état de POC. En dehors de ce qui est propre à Kafka Streams, il faudrait voir l'ajout d'autres fonctionnalités comme l'ajout d'autres endpoints de monitoring plus sophistiqués et l'intégration avec des outils de monitoring existants, la sécurisation de l'API Rest ou plus simplement voir comment peut se mettre en place des tests unitaires et des tests d'intégration. Mais les premiers résultats obtenus ici sont en ce me concerne déjà très intéressants.

Le code est disponible avec quelques ajouts sur GitHub.

Photo par Kate Townsend sur Unsplash.