Parfois on veut refactorer du code avec Spark et on finit assez rapidement par :

  • soit avoir une SparkSession en implicit :
def superFonction(df: DataFrame)(implicit sparkSession: SparkSession): DataFrame = { ??? }
  • soit se retrouver avec d'autres implicits partout :
def superFonction2(df: DataFrame)(implicit sparkSession: SparkSession,
                             config: com.typesafe.config.Config): DataFrame = {
  ???
}
  • soit des erreurs d'implicit quand on réarrange le code  :
def groupByKeyRemoveNull[A, K](dataset: Dataset[A])(f: A => Option[K]): Dataset[(K, Seq[A])] = {
      dataset
        .flatMap(x => f(x).map(y => (y, x)))
        .groupByKey(_._1)
        .mapGroups({ case (k, it) => (k, it.map(_._2).toSeq) })
    }
Unable to find encoder for type (K, A). An implicit Encoder[(K, A)] is needed to store (K, A) instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
	.flatMap(x => f(x).map(y => (y, x)))

On va voir quelques astuces pour améliorer tout ça !

1re astuce : SparkSession est un membre de Dataframe/Dataset

On a rarement besoin d'avoir la SparkSession en implicit, elle est disponible sur les dataframes ou datasets en paramètre :

def superFonction(df: DataFrame): DataFrame = {
   val ss = df.sparkSession
   ???
}

2e astuce : Et si on faisait une classe ?

Dans beaucoup de base de code en Scala/Spark, pour passer/injecter les éléments de configuration supplémentaire, on peut être tenté d'utiliser le mécanisme des implicits, si cela revient trop souvent, cela va être plus cohérent d'utiliser une classe à la place.

Au lieu de faire :

object MonJob {
    def superFonction2(df: DataFrame)(implicit config: com.typesafe.config.Config): DataFrame = ???

    def superFonction3(df:DataFrame, i:Int)(implicit config: com.typesafe.config.Config):DataFrame = ???
}

on va avoir :

class MonJob(config: com.typesafe.config.Config) {
    def superFonction2(df: DataFrame): DataFrame = ???

    def superFonction3(df:DataFrame, i:Int):DataFrame = ???
}

2e bis : c'est mieux de typer un peu ses configurations

Les configurations Spark ou HOCON sont des formes de Json peu typé. Pour faciliter la maintenance et rendre le runtime plus robuste, cela peut valoir le coup de passer en mode structuré typé :

case class MonJobConfig(config1: String, config2: Int)

object MonJobConfig {
    def fromConfig(config: com.typesafe.config.Config): Try[MonJobConfig] = ???
}

class MonJob(config: MonJobConfig) {
    def superFonction2(df: DataFrame): DataFrame = ???

    def superFonction3(df: DataFrame; i:Int): DataFrame = ???
}

Aussi cela permet de valider la configuration du programme avant de démarrer les traitements (via MonJobConfig.fromConfig(...) qui renvoit un Try) et aussi de voir clairement quelle partie du programme utilise telle configuration. (alt+F7 sur IntelliJ, find usages)

3e : allo, il manque des encodeurs

Quand vous allez vouloir générifier du code en spark, il va vous manquer des implicits. Contrairement aux erreurs d'implicit classiques qui consistent à importer ss.implicits._, ici il faut récupérer les implicits manquants.

Dans la dernière version d'intelliJ, on peut voir les implicits "Show implicit hints"

IntelliJ est alors assez clair sur ce qui manque. On va ouvrir un troisième groupe d'arguments pour les récupérer (implicit xxx).

Puis rajouter tous les implicits manquant à ce groupe :

Une fois que c'est fini, on peut enlever le mode X-Ray-Implicit d'intelliJ et passer aux tests sur nos fonctions polymorphiques ([A,K]) de plus haut niveau (f: A ⇒ Option[K]) :

Conclusion

On a présenté deux trois astuces pour remettre au carré du code Scala qui utilise Spark, cela suffit à faire du code beaucoup plus propre dans de nombreux cas. Pour les autres cas, il faut souvent passer par la manipulation de schema avec spark, on va laisser ça pour d'autres articles sur le sujet !