
Développé par Databricks, Delta Lake est un projet Open Source qui apporte les propriétés ACID aux lacs de données.
A la différence des SGBD, les Data Lakes sont souvent dépourvus de fonctionnalités permettant de garantir la fiabilité d’une transaction. Les lacs de données sont, en effet, souvent confrontés à des problèmes de cohérence de données et à des données corrompues suite à l’apparition d’une erreur lors d’une transaction. Delta Lake apporte également des fonctionnalités qui simplifieront la vie des ingénieurs telles que la validation et l’évolution de schémas, la possibilité de requêter une table à un de ses états passés (Time Travel) ou encore l’ajout des opérations de type DELETE, UPDATE et UPSERT.
Delta Lake apporte un changement de paradigme important puisque, si la plupart des lacs de données utilisent l’approche schema on read, Delta Lake passe les tables en mode schema on write utilisé depuis l’invention des SGBD. Cela dit, il est tout à fait possible de faire cohabiter ces deux types d’approche afin d’obtenir les fonctionnalités voulues selon le cas d’usage.
Concrètement, Delta Lake applique une surcouche à un stockage classique de type Parquet. Nous verrons que Delta Lake fonctionne grâce à un système de commit successifs ce qui rend possible notamment le Time Travel. Ce mécanisme qui s’apparente à un journal de transactions est extrêmement léger et permet une adoption progressive au sein d’un système existant.
Afin de rentrer dans le vif du sujet, je propose d’essayer Delta Lake en Scala dans un environnement Jupyter. Le but est de comprendre plus en détail le fonctionnement de Delta Lake ainsi que ses principales features.
Environnement de travail
Les lecteurs souhaitant reproduire le code peuvent se baser sur l’image Docker jupyter/all-spark-notebook afin d’obtenir un environnement de test fonctionnel.
Installation de Delta Lake
La procédure d’installation de Delta Lake est disponible dans la documentation du projet.
Nous utiliserons :
- Scala 2.12
- Spark 3.0
- Delta Lake 0.7
- JupyterLab 2.2.8
Initialisation de Spark
L’initialisation de la session Spark s’écrit comme cela avec le noyau Spylon que j’utilise pour ce notebook. Il faut veiller à inclure le jar Delta Lake et à définir des configurations additionnelles. « io.delta.sql.DeltaSparkSessionExtension » permet l’utilisation des fonctionnalités de Delta Lake à travers la syntaxe spark.sql et « org.apache.spark.sql.delta.catalog.DeltaCatalog » permet l’interaction avec un metastore de base de données et de tables.
%%init_spark launcher.num_executors = 1 launcher.executor_cores = 1 launcher.driver_memory = '4g' launcher.conf.set("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") launcher.conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") launcher.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
Ci-dessous, nous notons la ligne import io.delta.sql.DeltaSparkSessionExtension qui nous permettra d’interagir programmatiquement avec les tables au format Delta :
import java.io.File import scala.reflect.io.Directory import org.apache.spark.sql.Encoders import io.delta.sql.DeltaSparkSessionExtension
Nous définissons des variables pour la gestion de nos dossiers de travail. Le dossier raw contient les données brutes, le dossier parquet contient les fichiers sous-jacents à la table au format Parquet que nous allons créer et le dossier delta contient les fichiers sous-jacents à la table au format Delta que nous allons créer.
val dataFolder: String = "/home/jovyan/data/" val rawDataFolder: String = s"${dataFolder}raw/" val rawDataFile: String = s"${rawDataFolder}fake_people.csv" val parquetDataFolder: String = s"${dataFolder}parquet/" val deltaDataFolder: String = s"${dataFolder}delta/" val deltaLogFolder: String = s"${deltaDataFolder}_delta_log/"
Nous vidons les dossiers pour retrouver le même état initial à chaque run :
val directoryRaw = new Directory(new File(rawDataFolder)) val directoryParquet = new Directory(new File(parquetDataFolder)) val directoryDelta = new Directory(new File(deltaDataFolder)) val directoryDeltaLog = new Directory(new File(deltaLogFolder)) directoryParquet.deleteRecursively() directoryDelta.deleteRecursively() directoryParquet.createDirectory() directoryDelta.createDirectory()
Enfin, nous définissons une courte fonction pour afficher le contenu d’un dossier donné. Nous utiliserons un jeu de donnée fictif créé avec la bibliothèque Python Faker.
def showFilesInDir(dir: Directory): Unit = { val it = for { file <- dir.files; if !(file.toString.contains("/.")) } yield f"${file} ${file.length.toDouble / 1000000} MB" it foreach println } showFilesInDir(directoryRaw) /home/jovyan/data/raw/generateFakeData.ipynb 0.0017 MB /home/jovyan/data/raw/fake_people.csv 63.050827 MB
Chargement des données
Nous commençons par charger les données fictives à partir d’un simple fichier csv :
case class Person(id: Int, name: String, email: String, address: String, city: String, dateTime: java.sql.Timestamp, randomInt: Int) val personSchema = Encoders.product[Person].schema val data = spark.read .format("csv") .schema(personSchema) .option("header", "true") .option("multiLine", true) .load(rawDataFile) data.show(15)
Le jeu contient 500 000 enregistrements :
data.count
res4: Long = 500000
Les tables au format Parquet
Commençons par aborder la création et la manipulation de tables avec un format classique.
Nous utilisons la syntaxe sql afin de créer un metastore (parfois appelé catalogue de metadata) :
val db = "deltalake_tuto_margo" spark.sql(s"DROP DATABASE IF EXISTS ${db} CASCADE") spark.sql(s"CREATE DATABASE ${db}") spark.sql("SHOW DATABASES").show() +--------------------+ | namespace| +--------------------+ | default| |deltalake_tuto_margo| +--------------------+ spark.sql(s"USE $db") spark.sql("SHOW TABLES").show +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ +--------+---------+-----------+
Et nous créons la table à partir des données chargées avec Spark :
data.write .option("path", parquetDataFolder) .saveAsTable("parquet_table") spark.sql("SHOW TABLES").show spark.sql("SELECT id, name, city FROM parquet_table LIMIT 5").show(truncate=false) +--------------------+-------------+-----------+ | database| tableName|isTemporary| +--------------------+-------------+-----------+ |deltalake_tuto_margo|parquet_table| false| +--------------------+-------------+-----------+ +---+-------------------+----------------------+ |id |name |city | +---+-------------------+----------------------+ |0 |René Diallo |Pelletier | |1 |Thomas Bouchet-Mary|Sainte Alain-les-Bains| |2 |Susan Mathieu |Simon-la-Forêt | |3 |Jeanne Gallet |Sainte FrançoisVille | |4 |Camille Legendre |LacroixBourg | +---+-------------------+----------------------+
La table parquet_table est maintenant contenue dans un fichier parquet (en conditions réelles, il y aura autant de fichiers parquets que de workers mais travailler avec un seul worker simplifie cette présentation).
showFilesInDir(directoryParquet)
/home/jovyan/data/parquet/_SUCCESS 0.0 MB
/home/jovyan/data/parquet/part-00000-2d1517bb-5b4b-40b7-bf7a-470f7efc1e1d-c000.snappy.parquet 33.467321 MB
Afin de créer une opération fictive nous ajoutons à notre table les dix premières lignes de celle-ci. Les dix premières lignes sont donc en double dans la table.
spark.sql(""" INSERT INTO parquet_table SELECT * FROM parquet_table LIMIT 10; """)
Spark va simplement créer un second fichier parquet qui contient les dix lignes que nous venons d’ajouter à la table :
showFilesInDir(directoryParquet)
/home/jovyan/data/parquet/part-00000-92dfb354-aa8e-4c48-92fd-88ddee1a0226-c000.snappy.parquet 0.003009 MB
/home/jovyan/data/parquet/_SUCCESS 0.0 MB
/home/jovyan/data/parquet/part-00000-2d1517bb-5b4b-40b7-bf7a-470f7efc1e1d-c000.snappy.parquet 33.467321 MB
Ce mode de stockage ne permet pas de connaître l’historique de la table car seul son état à l’instant T est disponible. L’exemple ci-dessus est simpliste mais imaginons un dossier de plusieurs centaines de fichiers sur une table régulièrement modifiée, il sera impossible de reconstituer un historique des transformations.
spark.sql("SELECT COUNT(*) FROM parquet_table").show +--------+ |count(1)| +--------+ | 500010| +--------+
Par ailleurs, la commande suivante ne marchera pas en raison de l’immutabilité des fichiers parquet :
spark.sql("DELETE FROM parquet_table WHERE id >= 480000")
Les tables au format Delta
D’un point de vue purement pratique, Delta Lake s’utilise comme un format de fichier. Pour écrire une table, il suffit donc de remplacer .format(« parquet ») par .format(« delta ») :
data.write .format("delta") .mode("overwrite") .save(deltaDataFolder)
Comme pour une table classique, Delta Lake permet l’utilisation de ces fonctionnalités via la syntaxe spark.sql :
spark.sql(s""" CREATE TABLE deltalake_table USING DELTA LOCATION '${deltaDataFolder}' """) spark.sql("SELECT * FROM deltalake_table LIMIT 10;").show
Le format Delta est composé de fichiers au format Parquet ainsi que d’un dossier _delta_log qui contient les éléments qui permettent de retracer la vie de la table :
showFilesInDir(directoryDelta) /home/jovyan/data/delta/part-00000-5f7b541b-3498-42be-8db7-197e596a94e9-c000.snappy.parquet 33.467321 MB showFilesInDir(directoryDeltaLog) /home/jovyan/data/delta/_delta_log/00000000000000000000.json 0.001227 MB
Ce journal est constitué de fichiers json numérotés. Si l’on inspecte le premier fichier, nous pouvons voir qu’il contient les détails de la transaction créant la table. Nous pouvons ainsi voir qu’il s’agit d’une opération Overwrite et qu’un fichier contenant 500 000 lignes a été créé. Nous pouvons aussi voir que le détail du schéma est enregistré dans la section metadata :
! head /home/jovyan/data/delta/_delta_log/00000000000000000000.json
{"commitInfo":{"timestamp":1601816303684,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isBlindAppend":false,"operationMetrics":{"numFiles":"1","numOutputBytes":"33467321","numOutputRows":"500000"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"9389096b-6342-4a79-b522-f2ad492940d1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"email\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"city\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"dateTime\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"randomInt\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1601816288584}}
{"add":{"path":"part-00000-5f7b541b-3498-42be-8db7-197e596a94e9-c000.snappy.parquet","partitionValues":{},"size":33467321,"modificationTime":1601816302227,"dataChange":true}}
Les tables Delta peuvent également être manipulées via une API native. Nous allons utiliser cette API afin d’analyser l’historique des opérations sur notre table :
import io.delta.tables._ val deltaTable = DeltaTable.forPath(deltaDataFolder)
En l’état actuel, la table n’a connu que l’opération initiale :
deltaTable.history.show()
Afin de créer une nouvelle opération, nous allons insérer les cinquante premiers enregistrements de la table à la suite de celle-ci. Ensuite, nous regardons de nouveau l’historique des opérations :
data.filter($"id"<50).write.mode("append") .format("delta").save(deltaDataFolder) deltaTable.history .select( "version", "timestamp", "operation", "operationParameters", "readVersion", "isBlindAppend", "operationMetrics" ) .show(truncate = false)
Une seconde opération a donc été créée. La table d’historique est assez claire et nous voyons immédiatement que la seconde opération est une opération de type WRITE utilisant le mode Append qui a créé un fichier de cinquante lignes. Nous voyons également que cette dernière opération est liée à la première (colonne readVersion). Le schéma n’est donc pas repris dans ce second fichier comme nous pouvons le voir en l’inspectant :
showFilesInDir(directoryDeltaLog) /home/jovyan/data/delta/_delta_log/00000000000000000000.json 0.001227 MB /home/jovyan/data/delta/_delta_log/00000000000000000001.json 4.11E-4 MB ! head /home/jovyan/data/delta/_delta_log/00000000000000000001.json {"commitInfo":{"timestamp":1601816311698,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"7021","numOutputRows":"50"}}} {"add":{"path":"part-00000-ba7dd090-5a1f-4379-933e-ace506bf07f9-c000.snappy.parquet","partitionValues":{},"size":7021,"modificationTime":1601816311686,"dataChange":true}} showFilesInDir(directoryDelta) /home/jovyan/data/delta/part-00000-ba7dd090-5a1f-4379-933e-ace506bf07f9-c000.snappy.parquet 0.007021 MB /home/jovyan/data/delta/part-00000-5f7b541b-3498-42be-8db7-197e596a94e9-c000.snappy.parquet 33.467321 MB deltaTable.toDF.count res23: Long = 500050
Nous vérifions que les enregistrements insérés lors de la seconde opération vivent dans leur propre fichier Parquet :
deltaTable.toDF.filter($"id".isin(8,9,10)).withColumn("file", input_file_name()).select("id", "name", "file").show(false)
Nous allons maintenant essayer de passer des opérations DELETE et UPDATE, ce qui n’était pas possible avec le format Parquet classique. Nous supprimons les lignes dont l’ID est supérieur ou égal à 480 000 et nous ajoutons 2000 à la colonne contenant des entiers aléatoires tirés entre 1000 et 2000 :
deltaTable.delete("id >= 480000") deltaTable.update(Map("randomInt" -> expr("randomInt + 2000")))
Le tableau des historiques est logiquement enrichi. Chaque table résulte donc de son état initial successivement enrichi par des opérations archivées dans le journal de transactions :
deltaTable.history .select( "version", "timestamp", "operation", "operationParameters", "readVersion", "isBlindAppend", "operationMetrics" ) .show(truncate = false)
Les nouvelles opérations ont logiquement généré de nouveaux fichiers Parquet :
showFilesInDir(directoryDelta) /home/jovyan/data/delta/part-00000-808f71b9-da2b-4c22-92d1-58c7fbf239f9-c000.snappy.parquet 7.94E-4 MB /home/jovyan/data/delta/part-00002-d3d122df-d960-4277-a91e-16c40bfbaf29-c000.snappy.parquet 32.051001 MB /home/jovyan/data/delta/part-00000-ba7dd090-5a1f-4379-933e-ace506bf07f9-c000.snappy.parquet 0.007021 MB /home/jovyan/data/delta/part-00002-2376857d-ac2d-4623-92f5-8c796739a608-c000.snappy.parquet 32.051001 MB /home/jovyan/data/delta/part-00004-b0e947f3-1260-440c-b2b1-7af3490c341c-c000.snappy.parquet 0.007021 MB /home/jovyan/data/delta/part-00000-abf0afc9-2523-46cb-abfb-213cd52465b4-c000.snappy.parquet 7.94E-4 MB /home/jovyan/data/delta/part-00000-5f7b541b-3498-42be-8db7-197e596a94e9-c000.snappy.parquet 33.467321 MB showFilesInDir(directoryDeltaLog) /home/jovyan/data/delta/_delta_log/00000000000000000000.json 0.001227 MB /home/jovyan/data/delta/_delta_log/00000000000000000002.json 7.63E-4 MB /home/jovyan/data/delta/_delta_log/00000000000000000001.json 4.11E-4 MB /home/jovyan/data/delta/_delta_log/00000000000000000003.json 0.001178 MB
Après ces quelques transformations, nous allons essayer la fonctionnalité Time Travel qui permet d’obtenir l’état d’une table en spécifiant une version ou un TimeStamp en utilisant « timestampAsOf ». Il est donc possible de remonter dans la vie de la table pour, par exemple, analyser une opération ou pour corriger une erreur. Dans notre cas, nous pouvons facilement voir l’évolution du nombre d’enregistrements dans la table :
val dfInitial = spark.read.format("delta").option("versionAsOf", 0).load(deltaDataFolder) println(s"Initial row count: ${dfInitial.count}") val dfAfterAppend = spark.read.format("delta").option("versionAsOf", 1).load(deltaDataFolder) println(s"Row count after append: ${dfAfterAppend.count}") val dfAfterDelete = spark.read.format("delta").option("versionAsOf", 2).load(deltaDataFolder) println(s"Row count after delete: ${dfAfterDelete.count}") val dfAfterUpdate = spark.read.format("delta").option("versionAsOf", 3).load(deltaDataFolder) Initial row count: 500000 Row count after append: 500050 Row count after delete: 480050
Ou bien, créer une vue des changements entre deux états de la même table :
dfAfterDelete.toDF .select("RandomInt", "id") .withColumnRenamed("RandomInt", "RandomIntOriginal") .join(dfAfterUpdate.toDF.select("RandomInt", "id"), "id") .filter($"id".isin(100, 101, 102)) .withColumn("diff", $"RandomInt" - $"RandomIntOriginal") .show +---+-----------------+---------+----+ | id|RandomIntOriginal|RandomInt|diff| +---+-----------------+---------+----+ |100| 1854| 3854|2000| |101| 1665| 3665|2000| |102| 1215| 3215|2000| +---+-----------------+---------+----+
Pour terminer l’illustration du fonctionnement du journal de transactions, nous noterons que Delta Lake génère un checkpoint au format Parquet toutes les dix transactions. Ce fichier contient l’état complet de la table à un certain point dans le temps. Spark peut donc s’appuyer sur le checkpoint le plus récent ainsi que sur les quelques fichiers json plus récents. Cela évite d’avoir à reparser potentiellement des milliers de petits fichiers json :
deltaTable.update(Map("randomInt" -> expr("randomInt + 3"))) deltaTable.update(Map("randomInt" -> expr("randomInt + 8"))) deltaTable.update(Map("randomInt" -> expr("randomInt + 22"))) deltaTable.update(Map("randomInt" -> expr("randomInt + 55"))) deltaTable.update(Map("randomInt" -> expr("randomInt + 32"))) deltaTable.update(Map("randomInt" -> expr("randomInt + 48"))) deltaTable.update(Map("randomInt" -> expr("randomInt + 2"))) showFilesInDir(directoryDeltaLog) /home/jovyan/data/delta/_delta_log/00000000000000000010.json 0.001178 MB /home/jovyan/data/delta/_delta_log/00000000000000000007.json 0.001178 MB /home/jovyan/data/delta/_delta_log/00000000000000000009.json 0.001178 MB /home/jovyan/data/delta/_delta_log/00000000000000000005.json 0.001178 MB /home/jovyan/data/delta/_delta_log/00000000000000000010.checkpoint.parquet 0.017121 MB /home/jovyan/data/delta/_delta_log/00000000000000000000.json 0.001227 MB /home/jovyan/data/delta/_delta_log/00000000000000000002.json 7.63E-4 MB /home/jovyan/data/delta/_delta_log/00000000000000000006.json 0.001178 MB /home/jovyan/data/delta/_delta_log/00000000000000000004.json 0.001178 MB /home/jovyan/data/delta/_delta_log/00000000000000000008.json 0.001178 MB /home/jovyan/data/delta/_delta_log/_last_checkpoint 2.5E-5 MB /home/jovyan/data/delta/_delta_log/00000000000000000001.json 4.11E-4 MB /home/jovyan/data/delta/_delta_log/00000000000000000003.json 0.001178 MB
La dernière feature majeure de Delta Lake est son utilitaire de gestion du stockage. Vous avez pu noter que certains fichiers Parquet sont gardés alors qu’ils ne sont plus utilisés pour obtenir la vue d’une table à son état actuel. Ces fichiers sont indispensables pour la fonction Time Travel mais peuvent être effacés grâce à la méthode vacuum de la classe DeltaTable :
def vacuum(retentionHours: Double)
L’utilisateur peut donc définir un nombre d’heures au-delà duquel il juge que la fonctionnalité de Time Travel n’est pas nécessaire. Il abandonne donc la possibilité de revenir à un état antérieur de sa table et libère de l’espace de stockage.
Sources
- Table batch reads and writes
- Databricks Delta Lake — Database on top of a Data Lake
- Databricks Tech Talks
- Schema-on-Read vs Schema-on-Write
- The Internals of Delta Lake
- The Internals of Spark
Image : Delta Lake hike, Grand Teton Park, Free Roaming Photography