Provenant de lieux lointains dans l'univers (ce qui est pléonastique, puisque quoi que nous regardions dans l'univers, c'est forcément loin) et traversant l'espace à une vitesse inimaginable, des rayonnements viennent frapper les capteurs d'un satellite artificiel, délogeant sans gêne des électrons suivant leur course probabiliste tranquille autour d'atomes. Ce qui ne va pas sans électriser l'environnement. Par cette excitation locale et des mécanismes plus ou moins similaires, nos rayonnements initiaux sont transformés en quelque chose qu'on peut interpréter sous forme de nombres et transmis par radiocommunication vers une planète bleue (quoique pas trop), tellement petite qu'on la distingue à peine depuis Europe, le satellite de glace de Jupiter. Là sur cette planète, une partie certainement infime de nos émissions radios rebondissent sur un dôme inversé, atteignant un espace réduit, pour à nouveau être converties en signaux électriques et grâce à des procédés complexes terminer leur course écrasés contre un écran d'ordinateur sans pouvoir se plaindre. Tout ça pour vérifier si, par additions ou par multiplications successives lancées séquentiellement ou parallèlement, un rayonnement lointain permet de justifier une découverte astronomique... ou pas.

Cette histoire se voit attribuer de nombreux héros permettant de faire avancer la connaissance humaine. Si nous nous permettons quelques personnifications, il y a certes tous les dispositifs matériels et électroniques qui ont un rôle prépondérant. Mais il faut aussi compter sur les outils mathématiques. Et dans ce cadre extraordinaire, les seuls outils dont j'ai fait référence ici sont pourtant parmi ceux les plus utilisés au quotidien par nous simples développeurs sur des applis de gestions, du tracking d'utilisateurs ou des jeux sans fin qui consistent à aligner des friandises pour pouvoir avancer.

Invasion monoïde

Faire une succession d'additions est une opération courante :

def masseSalariale(remunerationBrutes: List[Double]): Double =
  remunerationBrutes.foldLeft(0.0) {
    case (sousTotal, remunerationBrute) => sousTotal + remunerationBrute }

Faire une succession de multiplications aussi :

def prixFinal(prixInitial: Double, tauxInflation: List[Double]): Double =
  tauxInflation.foldLeft(prixInitial) {
    case (prixActuel, inflation) => prixActuel * (1.0 + inflation) }

Faire autre chose aussi :

val data = Map(
  "Paris" -> 19.0,
  "Marseille" -> 12.0,
  "Paris" -> 8.0,
  "Paris" -> 15.0,
  "Marseille" -> 20.0,
  "Lyon" -> 10.0
)

def priceByCities(data: Map[String, Double]): Map[String, Double] =
  data.foldLeft(Map[String, Double]()) { case (acc, (city, price)) =>
    if (acc.contains(city)) acc.updated(city, acc(city) + price)
    else acc.updated(city, price)
  }

Dans ces exemples, nous avons utilisé à chaque fois, l'opération de foldLeft qui permet d'agréger des valeurs en fonction d'une valeur initiale et d'une opération binaire.

Nous allons justement nous concentrer sur, à la fois :

  • l'ensemble des valeurs acceptées,
  • l'opération binaire,
  • la valeur initiale.

Ces trois éléments forment un tout indissociable. Ce tout est ce que les mathématiciens appellent un monoïde.

Sur le plan physique, les monoïds sont de la même taille que les humains, mais leur peau est sombre et composée d'écailles. Il n'ont qu'un seul œil sur la tête, au niveau de la bouche. Au XXVIIe siècle, ils savent toujours parler. Mais depuis le 57e Segment du Temps, ils n'ont pas de bouche visible et s'en tiennent à un langage de signes pour communiquer... Dr Who... Désolé.

Dans Dr Who, les monoïds sont des êtres qui peuvent être sympas avec les humains. Mais ça n'a pas toujours été le cas.

Sinon, un monoïde est une structure algébrique correspondant à un ensemble (les valeurs) avec un élément neutre (la valeur initiale) et une loi de composition interne associative (l'opération binaire). C'est souvent noté (E, ✻, e), avec E l'ensemble de valeurs, ✻ la loi de composition interne et e l'élément neutre.

Alors, on dit loi de composition, car l'idée de notre opération est de combiner deux éléments de notre ensemble de valeurs (ie. de les composer, d'ailleurs on devrait pouvoir parler d'agréger des valeurs). On dit interne car en combinant ces deux valeurs, notre opération retourne une nouvelle valeur qui fait partie de notre ensemble initial de valeurs (ie. on ne sort pas de cet ensemble).

Pour associative, hé bien... Si nous utilisons plusieurs fois notre opération dans une expression, il est alors possible d'évaluer l'expression quelque soit l'endroit où on commence à l'évaluer. C'est-à-dire qu'avec l'expression a + b + c, je peux très bien commencer par x = a + b et ensuite faire x + c ou commencer par x = b + c et faire a + x après. Au final, on note ça (a + b) + c = a + (b + c). L'ordre d'évaluation des sous-expressions n'a pas d'importance pour un monoïde.

C'est une propriété intéressante, car ça permet de découper une expression en sous-expression et d'évaluer ces sous-expressions en parallèle, puis de récupérer et combiner les résultats de ces évaluations pour obtenir le résultat final... Et comme ça, on vient de réinventer MapReduce !

Voyons ce que ça donne...

Tes monoïdes font du MapReduce en ski !

Voici une liste de stations de ski avec notamment leur localisation et une évaluation sur 5.

Nous avons ci-dessous un extrait d'une liste des stations de ski en France. Bon... Ce n'est pas du big data, hein ! Mais on va faire comme si 😬.

station_name,station_rating,city,postal_code,county,departement,region,latitude,longitudeLus la Jarjatte,5.0,1,Lus-la-Croix-Haute,26620,Die,Drôme,Auvergne-Rhône-Alpes,44.679971,5.7661523
Panticosa,5.0,,,,,,,
Formigal,4.5,,,,,,,
Cerler,4.5,,,,,,,
Baqueira / Beret,4.5,,,,,,,
Val d'Isère,4.4,Val-d'Isère,73150,Albertville,Savoie,Auvergne-Rhône-Alpes,45.4498666,6.9804421
La Grave,4.4,La Grave,05320,Briançon,Hautes-Alpes,Provence-Alpes-Côte d'Azur,45.0455379,6.3068184
La Sambuy,4.4,,,,,,,
Le granier,4.4,Aime-la-Plagne,73210,Albertville,Savoie,Auvergne-Rhône-Alpes,45.60089445,6.62550908591405
Mont-Saxonnex,4.4,Mont-Saxonnex,74130,Bonneville,Haute-Savoie,Auvergne-Rhône-Alpes,46.0508713,6.4792764
Les Karellis,4.3,Montricher-Albanne,73870,Saint-Jean-de-Maurienne,Savoie,Auvergne-Rhône-Alpes,45.215851,6.399584218507
Peisey-Vallandry,4.3,,,,,,,
Le Mourtis,4.3,,,,,,,
Lans en Vercors,4.3,Lans-en-Vercors,38250,Grenoble,Isère,Auvergne-Rhône-Alpes,45.1277768,5.5891386
La Giettaz en Aravis,4.3,La Giettaz,73590,Albertville,Savoie,Auvergne-Rhône-Alpes,45.8827863,6.5334974
Sainte-Anne,4.3,Sainte-Anne,97180,Pointe-à-Pitre,Pointe-à-Pitre,Guadeloupe,16.225685,-61.3859128
Pic du Midi de Bigorre,4.3,Bagnères-de-Bigorre,,Bagnères-de-Bigorre,Hautes-Pyrénées,Occitanie,42.9367764,0.1416107

Cette liste n'est pas complète en terme d'information. Malgré ça, nous allons donner la moyenne des évaluations... tel que le ferait MapReduce ou Spark.

Dans MapReduce, il y a une étape de préparation des données où on part d'un fichier pour le convertir en un ensemble clé/valeur. Notez que l'évaluation est en position 1 dans chaque ligne (en commençant par 0) et que le département est en position 6.

import scala.collection.MapView
import scala.util.Try

def getRatingsByDepartement(lines: Iterable[String]): MapView[String, Iterable[Double]] = {
  // drop CSV header
  val data: Iterable[String] = lines.drop(1)

  val rows: Iterable[Array[String]] =
    data.map { line =>
      // cleaning line and separate fields
      val row: Array[String] = line.trim.split(",")

      // cleansing: if fields are missing, we pad row with empty strings
      row.padTo(7, "")
    }
  
  val deptRatings: Iterable[(String, Double)] =
    // we remove lines with no departement
    rows.filterNot(_(6).isEmpty)
      .map(fields =>
        (fields(6), Try { fields(1).toDouble }.getOrElse(0.0))
      )

  deptRatings
    .groupBy { case (departement, rating) => departement }
    .view.mapValues(row => row.map { case (departement, rating) => rating })
}

La fonction getRatingsByDepartement exécute en fait un shuffle (ie. une redistribution des données) en réalisant un partitionnement utilisant le département comme clé (ce qui n'est pas la meilleure des clés, dans la mesure où la répartition des données dans les différentes partitions sera ici déséquilibrée, puisque par exemple dans les Vosges il n'y a pas beaucoup de stations contrairement à la Haute-Savoie... Mais, bon. Ce n'est comme si on pouvait faire du big data avec l'énumération des stations de ski en France). Le résultat de getRatingsByDepartement est de type MapView[String, Iterable[Double]] — MapView est une "vue" sur une collection de type Map. Ici, à chaque clé correspond une partition des évaluations. Dans le cadre de MapReduce, chaque partition serait déposée dans des nœuds différents du cluster.

Il va maintenant falloir calculer la moyenne. En supposant, qu'on ait à faire à une liste immense, il est plus intéressant de parcourir cette liste en une seule passe qu'en deux. Car pour calculer une moyenne, il faut d'un côté une somme de valeurs et de l'autre leur quantité, avant de diviser ces deux résultats. Ce qui normalement implique deux passes sur notre dataset. Pour le faire en une seule passe, nous allons calculer la somme et la quantité en même temps, en stockant les résultats intermédiaires dans un couple de valeurs (somme, quantité).

Alors, il existe différentes approches pour implémenter ce calcul de moyenne. Pour l'exercice ici, nous allons étudier une solution mettant en avant la notion de monoïde, en se basant sur une typeclasse (un peu à la manière de la bibliothèque Scala Cats).

En Scala, pour déclarer une typeclasse Monoid, il faut déclarer un trait générique, où le paramètre A représente le type qui sera qualifié de monoïde. Ce trait contient deux méthodes empty qui retourne l'élément neutre et combine qui permet de combiner deux éléments de A.

trait Monoid[A] {
  def empty: A
  def combine(a: A, b: A): A
}

object Monoid {
  @inline def apply[A](implicit ev: Monoid[A]): Monoid[A] = ev
}

La méthode apply de l'object Monoid ci-dessus apporte une facilité d'utilisation du trait, qui permet par exemple d'écrire Monoid[Int].empty à la place de implicitly[Monoid[Int]].empty.

Déclarons quelques instances de notre typeclasse Monoid, qui nous servirons pour résoudre notre problème.

// Monoid (Int, +, 0)
implicit val intMonoid: Monoid[Int] = new Monoid[Int] {
  override def empty: Int = 0
  override def combine(a: Int, b: Int): Int = a + b
}

// Monoid (Double, +, 0.0)
implicit val doubleMonoid: Monoid[Double] = new Monoid[Double] {
  override def empty: Double = 0.0
  override def combine(a: Double, b: Double): Double = a + b
}

// turn any tuple (A, B) into Monoid, providing A and B both are Monoid
implicit def tupleMonoid[A: Monoid, B: Monoid]: Monoid[(A, B)] =
  new Monoid[(A, B)] {
    override def empty: (A, B) = (Monoid[A].empty, Monoid[B].empty)

    override def combine(left: (A, B), right: (A, B)): (A, B) =
      (Monoid[A].combine(left._1, right._1),
       Monoid[B].combine(left._2, right._2))
  }

Rajoutons à certaines collections une opération combineAll qui, dans la mesure où les valeurs contenues font parties d'un monoïde, combine toutes les valeurs contenues pour obtenir un unique résultat.

implicit class iterableWithCombineAll[A: Monoid](l: Iterable[A]) {
  def combineAll: A = l.fold(Monoid[A].empty)(Monoid[A].combine)
}

Utilisons notre fonction getRatingsByDepartement avec un fichier pour obtenir un partitionnement des données.

import scala.io.Source

val file = Source.fromFile("/data/geo/stations.csv")
val partitions = getRatingsByDepartement(file.getLines().to(Iterable))

Et maintenant, voici notre code MapReduce :

// phase 1 (Map): get ratings only and associate the value 1 to then
val partitionedRatingWithOne =
  partitions.mapValues(ratings => ratings.map(rating => (rating, 1)))

// phase 2 (Combine): locally sum ratings and 1s for each partition
val partitionedSumRatingsAndCount =
  partitionedRatingWithOne.mapValues(data => data.combineAll)

// phase 3 (Reduce): combine for all partitions the sum of ratings and counts
val (rating, count) =
    partitionedSumRatingsAndCount.values.combineAll

println(rating / count)

Ce qui donne environ 3.45 sur 5 sur notre dataset.

Et c'est ainsi que MapReduce fonctionne, si on inclut la phase Combine : 1/ Map : on transforme chaque élément dans une forme qui facilite les phases suivantes, 2/ Combine : on effectue un Reduce local pour réduire la quantité de données à transférer entre les nœuds du cluster à la phase suivante (c'est une optimisation), 3/ Reduce : on rassemble les résultats intermédiaires pour calculer et retourner le résultat final.

L'optimisation par la phase Combine est rendu possible par le fait que (Int, +, 0) est un monoïde, que (Double, +, 0.0) est un monoïde et qu'à partir de ((Double, Int), (+, +), (0.0, 0)) on a aussi un monoïde. Et comme l'opération combine sur un monoïde est associative, nous savons alors qu'il est possible de diviser notre calcul de la phase Reduce en plein de calculs intermédiaires de la manière qui arrange le plus.

Bien entendu, le code ci-dessus est très simple, alors que ce qui se passe dans une véritable implémentation de MapReduce est bien plus complexe. Mais pour arriver à un véritable MapReduce, ici il faudrait changer l'implémentation du type Iterable et s'intéresser aux méthodes drop, map, filter, groupBy, mapValues et fold. Il faudrait aussi changer la façon de lire un fichier surtout s'il est réparti sur un cluster. Tout ceci est disponible avec les RDD de Spark. Et avec les RDD de Spark, on aurait d'ailleurs une version bien plus simple.

// read and partition a file
val data: RDD[(String, Double)] =
  getRatingsByDepartement(sparkContext.textFile("/data/geo/stations.csv"))

// phase 1 (Map): get ratings only and associate the value 1 to then
val ratingWithOne: RDD[(Double, Int)] =
  data.map { case (departement, rating) => (rating, 1) }

// phase 2 (Reduce): combine for all partitions the sum of ratings and 1s
val (rating, count) =
  ratingWithOne.combineAll

println(rating / count)

// ----
// add combineAll to RDD
implicit class rddWithCombineAll[A: Monoid](l: RDD[A]) {
  def combineAll: A = l.fold(Monoid[A].empty)(Monoid[A].combine)
}

// ----
// data preparation
def getRatingsByDepartement(lines: RDD[String]): RDD[(String, Double)] = {
  val header: String = lines.first()
  val data: RDD[String] = lines.filter(row => row != header)
  val rows: RDD[Array[String]] =
    data.map { line =>
      // cleaning line and separate fields
      val row: Array[String] = line.trim.split(",")
      // cleansing: if fields are missing, we pad row with empty strings
      row.padTo(7, "")
    }
    // we remove lines with no departement
      .filter(row => !row(6).isEmpty)

  rows.map(fields => (
    fields(6), // departement
     Try { fields(1).toDouble }.getOrElse(0.0)) // rating
  )
}

Conclusion

Notre histoire de rayonnements cosmiques transformés en nombres écrasés sur un écran fait apparaître une forme de vie mathématique à part entière : les monoïdes. Ils sont partout et leur prolifération est incontrôlable. Car nous les retrouvons partout : dans les calculs scientifiques, dans les clusters, dans la gestion des règles métiers et même dans les additions de nos enfants. Et le plus saisissant est que nous les utilisons parfois sans nous en rendre compte.

Bien ou mal ? C'est à vous de décider. Mais une chose est sûr, c'est que les monoïdes apportent une aide indéniable.

Le code et les données : https://github.com/univalence/blog-code/tree/master/monoid

Photographie par drmakete lab sur Unsplash.