Spark : Déduplication dans le schéma

Avez-vous déjà vu... un DataFrame Spark ayant deux colonnes ayant le même nom ? Et avez-vous essayé de lancer une requête SQL dessus ?

Il est possible que ça soit le cas si vous utilisez Spark, même de manière sporadique. Lorsque vous faites une jointure entre deux dataframes, vous allez rencontrer rapidement ce cas de figure :

case class Person(id: String, name: String, age: Int)
case class Status(id: String, status: String)

val statusDF: DataFrame = ss.createDataFrame(Seq(Person(id = "id1", name = "toto", age = 13)))
val personDF: DataFrame = ss.createDataFrame(Seq(Status(id = "id1", status = "active")))

val dataframe: DataFrame =
  personDF.join(statusDF, personDF("id") === statusDF("id"))

dataframe.select("id", "name", "status").show()

Lors du select, Spark va vous rappeler gentiment à l'ordre avec l'exception suivante :

org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id, id.;

Mais, vous pouvez aussi avoir la maladresse d'enregistrer un tel DataFrame sur disque (en utilisant le format Parquet, par exemple). Sans le savoir, vous allez vous retrouver avec une colonne illisible. Vous allez du coup être accueillis par une autre exception lors de la lecture :

java.lang.RuntimeException: [id] BINARY was added twice

Alors que faire dans ce cas-là ? Sommes-nous condamnés à maudire la personne qui a osé mettre deux noms de colonne identiques ? 🤬

Vous pouvez vous en sortir avec trois solutions :

  • Il existe une solution très facile à ce problème, en modifiant le nom de la colonne responsable (ici id) en amont de la jointure :
val statusDFRenamed: DataFrame = statusDF.withColumnRenamed("id", "idStatus")
//ou bien
val personDFRenamed: DataFrame = personDF.withColumnRenamed("id", "idPerson")
  • Si vous souhaitez travailler directement sur la dataframe en sortie de jointure, vous pouvez utiliser une fonction que nous avons mis dans Plumbus, qui réalise un coalesce entre des colonnes de même nom. Pour réussir à définir cette transformation, on passe par la structure de la requête (Catalyst), afin d'adresser individuellement les colonnes qui ont le même nom (NamedExpression).
def coalesceColWithSameName(df: DataFrame): DataFrame = {
    //pour être sûr que c'est un Project
    val frame: DataFrame = df.select("*")
    //récupération de toutes les expressions : id, name, age, id, status
    val cols: Seq[NamedExpression] = frame.queryExecution.analyzed.asInstanceOf[Project].projectList
    //colonnes en sortie, dans le même ordre, sans les doublons
    val outputCols: Array[String] = frame.columns.distinct

    import org.apache.spark.sql.functions.coalesce
    
    val colMap: Map[String, Column] = {
      val nameExpressionsByName: Map[String, Seq[NamedExpression]] = cols.groupBy(_.name)

      nameExpressionsByName.map({
        //seulement une expression pour ce nom
        case (name, Seq(expr)) => (name, new Column(expr).as(name))
        //plusieurs expressions pour ce nom => fusion avec coalesce
        case (name, nameExpressions) => (name, coalesce(nameExpressions.map(x => new Column(x)): _*).as(name))
      })
    }

    frame.select(outputCols.map(colMap): _*)
  }
  • Vous pouvez utiliser une autre solution que nous avons aussi ajouté à Plumbus, avec un comportement différent. Ici, les colonnes en doublon vont être renommées séparément avec un suffixe généré automatiquement (_1, _2, _3, ..., _n). Même technique, nous utilisons la structure de la requête pour adresser individuellement les colonnes.
def renameColumnsWithSameName(df: DataFrame): DataFrame = {
    val frame: DataFrame = df.select("*")
    val namedExpressions: Seq[NamedExpression] = frame.queryExecution.analyzed.asInstanceOf[Project].projectList
    val duplicateNames: Set[String] = namedExpressions.map(_.name).groupBy(identity).filter(_._2.size > 1).keys.toSet

    if (duplicateNames.isEmpty) {
      df
    } else {
      type NameCount = Map[String, Int]
      type Res = (NameCount, Seq[Column])

      val zero: Res = (Map.empty, Nil)

      val res = namedExpressions.foldLeft(zero)({
        case ((counts, cols), exp) =>
          if (duplicateNames(exp.name)) {
            val i = counts.getOrElse(exp.name, 0)
            val col = new Column(exp).as(exp.name + "_" + i)

            (counts + (exp.name -> (i + 1)), cols :+ col)
          } else {
            (counts, cols :+ new Column(exp))
          }
      })

      df.select(res._2: _*)
    }

  }

Voici un exemple :

import ss.implicits._

val person: Person = Person(id = "1", name = "toto", age = 13)
val status: Status = Status(id = "1", status = "active")

val df1: Dataset[Person] = ss.createDataset(Seq(person))
val df2: Dataset[Status] = ss.createDataset(Seq(status))

val frame: DataFrame = df1.join(df2, df1("id") === df2("id"))

println("initial DataFrame")
frame.show()

//clean auto1
println("coalesceColWithSameName")
coalesceColWithSameName(frame).show()

//clean auto2
println("renameColWithSameName")
renameColumnsWithSameName(frame).show()

Nous obtenons en sortie :

initial DataFrame
+---+----+---+---+------+
| id|name|age| id|status|
+---+----+---+---+------+
|  1|toto| 13|  1|active|
+---+----+---+---+------+

coalesceColWithSameName
+---+----+---+------+
| id|name|age|status|
+---+----+---+------+
|  1|toto| 13|active|
+---+----+---+------+

renameColWithSameName
+----+----+---+----+------+
|id_0|name|age|id_1|status|
+----+----+---+----+------+
|   1|toto| 13|   1|active|
+----+----+---+----+------+

Conclusion

Nous avons vu comment gérer ces dataframes assez embêtant et nous avons vu trois manières de répondre à ce problème, chacune ayant un niveau de difficulté variable. Évidemment le but de cet article était d'en apprendre plus sur Catalyst et sur les différentes opérations possibles faisable avec celui-ci. Il peut être très bénéfique d'utiliser Catalyst dans le cadre d'opérations manipulant le schéma d'un dataframe en Spark.

Nous avons vu rapidement comment résoudre le problème des doublons de colonnes de manière générique, en allant chercher un peu d'info dans les représentations internes de Spark SQL (ie. avec Catalyst). Ces APIs, assez méconnues, peuvent être très intéressantes si nous devons aller plus loin que la manipulation générique de schéma (avec df.schema).


Notes

  • lorsque l'on manipule les dataframes :org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id#3, id#7.;
  • si on sauvegarde en parquet et que l'on relit ça après : Caused by: java.lang.RuntimeException: [id] BINARY was added twice

Solutions :

  • à la main
  • avec def coalesceColwithSameName(df: DataFrame):DataFrame
  • avec def renameColumnsWithSameName(df:DataFrame):DataFrame

Code
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.scalatest.FunSuite

case class Person(id:String,name:String,age:Int)

class TestDuplicateCols extends FunSuite with TestUtils {

  val ss: SparkSession = TestSparkSession.create()

  def coalesceColwithSameName(df: DataFrame):DataFrame = {
    //pour être sûr que c'est un Projet
    val frame = df.select("*")
    //récupération de toutes les expressions : id#3, name#4, age#5,id#7, name#8, age#9
    val cols: Seq[NamedExpression] = frame.queryExecution.analyzed.asInstanceOf[Project].projectList
    //colonnes en sortie
    val outpuCols = frame.columns.distinct

    import org.apache.spark.sql.functions.coalesce
    //
    val colMap: Map[String, Column] = {
      val nameExpressionsByName: Map[String, Seq[NamedExpression]] = cols.groupBy(_.name)

      nameExpressionsByName.map({
        //seulement une expression pour ce nom
        case (name, Seq(expr))       => (name, new Column(expr).as(name))
        //plusieurs expressions pour ce nom => fusion avec coalesce
        case (name, nameExpressions) => (name, coalesce(nameExpressions.map(x => new Column(x)): _*).as(name))
      })
    }
    frame.select(outpuCols.map(colMap):_*)
  }

  def renameColumnsWithSameName(df:DataFrame):DataFrame = {
    val frame = df.select("*")

    val namedExpressions = frame.queryExecution.analyzed.asInstanceOf[Project].projectList

    val duplicateNames: Set[String] = namedExpressions.map(_.name).groupBy(identity).filter(_._2.size > 1).keys.toSet

    if(duplicateNames.isEmpty) df else {

      type NameCount = Map[String,Int]
      type Res = (NameCount,Seq[Column])

      val zero:Res = (Map.empty,Nil)

      val res = namedExpressions.foldLeft(zero)({
        case ((counts,cols),exp) =>
        if(duplicateNames(exp.name)) {
          val i = counts.getOrElse(exp.name,0)
          val col = new Column(exp).as(exp.name + "_" + i)

          (counts + (exp.name  -> (i + 1)),cols :+ col)
        } else {
          (counts, cols :+ new Column(exp))
        }
      })

      df.select(res._2:_*)
    }

  }


  test("toto") {

    import ss.implicits._

    val person = Person(id = "1", name = "toto", age = 13)

    val df1 = ss.createDataset(Seq(person))

    val df2 = df1

    val frame = df1.join(df2, df1("id") === df2("id"))



    //frame.apply("xyz")

    frame.show()

    val plan = frame.select("*").queryExecution.analyzed

    //bug
    //frame.select("id","age","name")

    //work
    frame.select(df1("id"),df1("age"),df1("name")).show()


    //clean auto1
    coalesceColwithSameName(frame).show()

    //clean auto2
    renameColumnsWithSameName(frame).show()
  }

}
Output
+---+----+---+---+----+---+
| id|name|age| id|name|age|
+---+----+---+---+----+---+
|  1|toto| 13|  1|toto| 13|
+---+----+---+---+----+---+

+---+---+----+
| id|age|name|
+---+---+----+
|  1| 13|toto|
+---+---+----+

+---+----+---+
| id|name|age|
+---+----+---+
|  1|toto| 13|
+---+----+---+

+----+------+-----+----+------+-----+
|id_0|name_0|age_0|id_1|name_1|age_1|
+----+------+-----+----+------+-----+
|   1|  toto|   13|   1|  toto|   13|
+----+------+-----+----+------+-----+
Code formaté
import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.scalatest.FunSuite

case class Person(id: String, name: String, age: Int)

class TestDuplicateCols extends FunSuite {

  val ss: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()

  def coalesceColwithSameName(df: DataFrame): DataFrame = {
    //pour être sûr que c'est un Project
    val frame = df.select("*")
    //récupération de toutes les expressions : id#3, name#4, age#5,id#7, name#8, age#9
    val cols: Seq[NamedExpression] = frame.queryExecution.analyzed.asInstanceOf[Project].projectList
    //colonnes en sortie
    val outpuCols = frame.columns.distinct

    import org.apache.spark.sql.functions.coalesce
    
    val colMap: Map[String, Column] = {
      val nameExpressionsByName: Map[String, Seq[NamedExpression]] = cols.groupBy(_.name)

      nameExpressionsByName.map({
        //seulement une expression pour ce nom
        case (name, Seq(expr)) => (name, new Column(expr).as(name))
        //plusieurs expressions pour ce nom => fusion avec coalesce
        case (name, nameExpressions) => (name, coalesce(nameExpressions.map(x => new Column(x)): _*).as(name))
      })
    }
    frame.select(outpuCols.map(colMap): _*)
  }

  def renameColumnsWithSameName(df: DataFrame): DataFrame = {
    val frame = df.select("*")

    val namedExpressions = frame.queryExecution.analyzed.asInstanceOf[Project].projectList

    val duplicateNames: Set[String] = namedExpressions.map(_.name).groupBy(identity).filter(_._2.size > 1).keys.toSet

    if (duplicateNames.isEmpty) df else {

      type NameCount = Map[String, Int]
      type Res = (NameCount, Seq[Column])

      val zero: Res = (Map.empty, Nil)

      val res = namedExpressions.foldLeft(zero)({
        case ((counts, cols), exp) =>
          if (duplicateNames(exp.name)) {
            val i = counts.getOrElse(exp.name, 0)
            val col = new Column(exp).as(exp.name + "_" + i)

            (counts + (exp.name -> (i + 1)), cols :+ col)
          } else {
            (counts, cols :+ new Column(exp))
          }
      })

      df.select(res._2: _*)
    }

  }


  test("toto") {

    import ss.implicits._

    val person = Person(id = "1", name = "toto", age = 13)

    val df1 = ss.createDataset(Seq(person))

    val df2 = df1

    val frame = df1.join(df2, df1("id") === df2("id"))

    frame.show()

    val plan = frame.select("*").queryExecution.analyzed

    //bug
    //frame.select("id","age","name")

    //work
    frame.select(df1("id"), df1("age"), df1("name")).show()


    //clean auto1
    coalesceColwithSameName(frame).show()

    //clean auto2
    renameColumnsWithSameName(frame).show()
  }

}