JSON ⇒ CSV en Spark - Première approche

Dans quelle mesure est-il possible de convertir une structure imbriquée (nested structure) pour obtenir une structure non-imbriquée (unnested structure) ? Cette question revient généralement lorsque nous essayons par exemple de convertir des données JSON en CSV. C'est un sujet qui peut se présenter dans le cadre de Spark.

Par exemple, nous pouvons avoir ce JSON représentant un employé :

{
  id: "ac23",
  person: {
    firstname: "John",
    lastname: "Doe",
  },
  job: "data engineer"
}

Son équivalent en CSV pourrait être :

id,   person_firstname, person_lastname, job
ac23, John,             Doe,             data engineer

Nous utilisons ici une simple concaténation des noms des champs permettant d'accès à un sous-niveau.

Si nous utilisons des Dataframe Spark, nous allons devoir en étudier la structure (le schéma) pour établir la conversion nécessaire. La particularité d'une structure imbriquée est que son schéma apparaît sous la forme d'un arbre. Pour retrouver le nom des champs, sachant que les valeurs se trouvent au niveau des feuilles, nous devrons donc effectuer un parcours en profondeur pour constitué le nom des champs de la structure non-imbriquée.

Par exemple, la structure ci-dessous :

* a
| * b
| | * d
| | | * e
| | | * f
| | * g
* h

Peut donner les champs suivant :

a_b_d_e, a_b_d_f, a_b_g, h

Si nous utilisons _ pour joindre le nom des champs.

Voici le code permettant de parcourir la structure imbriquée :

def walk(structure: StructType,
         prefix: Vector[String] = Vector.empty): Seq[Vector[String]] =
  for {
    field <- structure.fields
    fieldPath <- field.dataType match {
      case subStruct: StructType => walk(subStruct, prefix :+ field.name)
      case _: ArrayType          => Nil
      case _                     => List(prefix :+ field.name)
    }
  } yield fieldPath

À noter, que dans cette version, nous nous intéressons qu'aux StructType (équivalent Dataframe des objets JSON) et nous supprimons les ArrayType.

La conversion de structure imbriquée en structure non-imbriquée est faite donc en appelant notre fonction walk sur le schéma du Dataframe récupéré. Les différents noms de champ sont joints par un . pour le parcours dans le JSON et par un _ pour obtenir leur équivalent en nom de colonne CSV.

import org.apache.spark.sql.functions.expr

for {
  fieldPath <- walk(schema)
} yield {
  val fieldref: Column = expr(fieldPath.names.mkString("."))
  val name: String     = fieldPath.names.mkString("_")

  fieldref.as(name)
}

Nous avons vu ici une approche permettant de convertir une structure imbriquée (de type JSON) en structure non-imbriqué (de type CSV). Une simple analyse structurelle permet de réaliser cette conversion. Néanmoins, nous ne traitons pas ici de certains aspect comme la gestion des array et nous laissons Catalyst inférer le DataType. Ces points seront traités dans un autre article.

Cet article se base sur le code donné dans ce repo :

schema-utils/src/main/scala/io/univalence/schemautils/JsonoidToTable.scala at master · univalence/schema-utils
Experiences avec Spark sur les schémas. Contribute to univalence/schema-utils development by creating an account on GitHub.