Avez-vous déjà vu... un DataFrame Spark ayant deux colonnes ayant le même nom ? Et avez-vous essayé de lancer une requête SQL dessus ?

Il est possible que ça soit le cas si vous utilisez Spark, même de manière sporadique. Lorsque vous faites une jointure entre deux dataframes, vous allez rencontrer rapidement ce cas de figure :

case class Person(id: String, name: String, age: Int)
case class Status(id: String, status: String)

val statusDF: DataFrame = ss.createDataFrame(Seq(Person(id = "id1", name = "toto", age = 13)))
val personDF: DataFrame = ss.createDataFrame(Seq(Status(id = "id1", status = "active")))

val dataframe: DataFrame =
  personDF.join(statusDF, personDF("id") === statusDF("id"))

dataframe.select("id", "name", "status").show()

Lors du select, Spark va vous rappeler gentiment à l'ordre avec l'exception suivante :

org.apache.spark.sql.AnalysisException: Reference 'id' is ambiguous, could be: id, id.;

Mais, vous pouvez aussi avoir la maladresse d'enregistrer un tel DataFrame sur disque (en utilisant le format Parquet, par exemple). Sans le savoir, vous allez vous retrouver avec une colonne illisible. Vous allez du coup être accueillis par une autre exception lors de la lecture :

java.lang.RuntimeException: [id] BINARY was added twice

Alors que faire dans ce cas-là ? Sommes-nous condamnés à maudire la personne qui a osé mettre deux noms de colonne identiques ? 🤬

Vous pouvez vous en sortir avec trois solutions :

Il existe une solution très facile à ce problème, en modifiant le nom de la colonne responsable (ici id) en amont de la jointure :

val statusDFRenamed: DataFrame = statusDF.withColumnRenamed("id", "idStatus") //ou bien val personDFRenamed: DataFrame = personDF.withColumnRenamed("id", "idPerson")

Si vous souhaitez travailler directement sur la dataframe en sortie de jointure, vous pouvez utiliser une fonction que nous avons mis dans Plumbus, qui réalise un coalesce entre des colonnes de même nom. Pour réussir à définir cette transformation, on passe par la structure de la requête (Catalyst), afin d'adresser individuellement les colonnes qui ont le même nom (NamedExpression).

def coalesceColWithSameName(df: DataFrame): DataFrame = {
    //pour être sûr que c'est un Project
    val frame: DataFrame = df.select("*")
    //récupération de toutes les expressions : id, name, age, id, status
    val cols: Seq[NamedExpression] = frame.queryExecution.analyzed.asInstanceOf[Project].projectList
    //colonnes en sortie
    val outputCols: Array[String] = frame.columns.distinct

    import org.apache.spark.sql.functions.coalesce
   
    val colMap: Map[String, Column] = {
      val nameExpressionsByName: Map[String, Seq[NamedExpression]] = cols.groupBy(_.name)

      nameExpressionsByName.map({
        //seulement une expression pour ce nom
        case (name, Seq(expr)) => (name, new Column(expr).as(name))
        //plusieurs expressions pour ce nom => fusion avec coalesce
        case (name, nameExpressions) => (name, coalesce(nameExpressions.map(x => new Column(x)): _*).as(name))
      })
    }
    frame.select(outputCols.map(colMap): _*)
  }

Vous pouvez utiliser une autre solution que nous avons aussi ajouté à Plumbus, avec un comportement différent. Ici, les colonnes en doublon vont être renommées séparément avec un suffixe généré automatiquement (_1, _2, _3, ..., _n). Même technique, nous utilisons la structure de la requête pour adresser individuellement les colonnes.

def renameColumnsWithSameName(df: DataFrame): DataFrame = {
    val frame: DataFrame = df.select("*")

    val namedExpressions: Seq[NamedExpression] = frame.queryExecution.analyzed.asInstanceOf[Project].projectList

    val duplicateNames: Set[String] = namedExpressions.map(_.name).groupBy(identity).filter(_._2.size > 1).keys.toSet

    if (duplicateNames.isEmpty) df else {

      type NameCount = Map[String, Int]
      type Res = (NameCount, Seq[Column])

      val zero: Res = (Map.empty, Nil)

      val res: (NameCount, Seq[Column]) = namedExpressions.foldLeft(zero)({
        case ((counts, cols), exp) =>
          if (duplicateNames(exp.name)) {
            val i = counts.getOrElse(exp.name, 0)
            val col = new Column(exp).as(exp.name + "_" + i)

            (counts + (exp.name -> (i + 1)), cols :+ col)
          } else {
            (counts, cols :+ new Column(exp))
          }
      })

      df.select(res._2: _*)
    }

  }

Voici un exemple :

import ss.implicits._

val person: Person = Person(id = "1", name = "toto", age = 13)
val status: Status = Status(id = "1", status = "active")

val df1: Dataset[Person] = ss.createDataset(Seq(person))
val df2: Dataset[Status] = ss.createDataset(Seq(status))

val frame: DataFrame = df1.join(df2, df1("id") === df2("id"))

println("initial DataFrame")
frame.show()

//clean auto1
println("coalesceColWithSameName")
coalesceColWithSameName(frame).show()

//clean auto2
println("renameColWithSameName")
renameColumnsWithSameName(frame).show()

Nous obtenons en sortie :

initial DataFrame
+---+----+---+---+------+
| id|name|age| id|status|
+---+----+---+---+------+
|  1|toto| 13|  1|active|
+---+----+---+---+------+

coalesceColWithSameName
+---+----+---+------+
| id|name|age|status|
+---+----+---+------+
|  1|toto| 13|active|
+---+----+---+------+

renameColWithSameName
+----+----+---+----+------+
|id_0|name|age|id_1|status|
+----+----+---+----+------+
|   1|toto| 13|   1|active|
+----+----+---+----+------+

Conclusion

Nous avons vu rapidement comment résoudre le problème des doublons de colonnes de manière générique, en allant chercher un peu d'info dans les représentations internes de Spark SQL (ie. avec Catalyst). Ces APIs, assez méconnues, peuvent être très intéressantes si nous devons aller plus loin que la manipulation générique de schéma (avec df.schema).