v4.1 — Édition 2026. Un guide complet sur PySpark 4.1, couvrant Spark Connect, les DataFrames, les types de données complexes, les transformations de données, SQL, les UDFs et le profiling.
Nous établissons le besoin fondamental d'utiliser PySpark. Découvrez pourquoi les bibliothèques Python standards comme Pandas échouent à grande échelle, et comment PySpark fournit un moteur d'exécution distribué pour traiter des jeux de données massifs en toute fluidité.
4m 06s
2
La révolution Spark Connect
Explorez l'architecture Spark Connect. Nous expliquons comment PySpark a découplé le client et le serveur, vous permettant d'exécuter des applications Spark n'importe où sans les lourdes dépendances de la JVM.
3m 05s
3
DataFrames et évaluation paresseuse
Plongez dans l'abstraction fondamentale de PySpark : le DataFrame. Nous abordons le concept d'évaluation paresseuse, la différence entre les transformations et les actions, et pourquoi Spark planifie avant d'exécuter.
3m 56s
4
Création et visualisation de DataFrames
Apprenez à instancier des DataFrames à partir d'objets Python bruts, de dictionnaires et de fichiers, et comment inspecter en toute sécurité vos données distribuées sans faire planter votre nœud driver.
3m 21s
5
Maîtriser les types de données de base
Un tour d'horizon des types numériques et chaînes de caractères fondamentaux de PySpark. Nous explorons comment définir explicitement des schémas à l'aide de StructType et StructField pour des pipelines de données robustes.
3m 41s
6
Les périls de la précision
Découvrez les différences critiques entre FloatType, DoubleType et DecimalType. Apprenez pourquoi le choix d'un mauvais type numérique peut introduire des erreurs d'arrondi désastreuses dans les données financières.
3m 53s
7
Apprivoiser les données complexes et imbriquées
Le Big Data n'est pas toujours plat. Nous explorons les types de données complexes de PySpark, notamment ArrayType, StructType et MapType, vous permettant d'analyser nativement du JSON profondément imbriqué.
3m 34s
8
Conversion de types et sélection
Apprenez à modeler activement les schémas de vos DataFrames. Nous couvrons la sélection de sous-ensembles de colonnes et la manière de convertir en toute sécurité des colonnes d'un type de données à un autre.
3m 17s
9
Carrefour des fonctions : Nettoyer les données sales
À données inexactes, résultats erronés. Apprenez les transformations essentielles des DataFrames pour supprimer les valeurs nulles, remplir les valeurs manquantes et gérer les enregistrements NaN nativement dans les systèmes distribués.
3m 34s
10
Transformer et remodeler les données
Prenez le contrôle de la forme de vos données. Nous explorons comment générer de nouvelles colonnes avec des fonctions mathématiques, effectuer des manipulations de chaînes de caractères et aplatir des tableaux imbriqués à l'aide de explode().
3m 45s
11
La mécanique du regroupement et de l'agrégation
Maîtrisez la stratégie split-apply-combine. Nous nous plongeons dans le regroupement de données par clés et l'application de puissantes fonctions d'agrégation pour résumer des jeux de données massifs.
3m 12s
12
Quand les DataFrames entrent en collision : L'art des jointures
Naviguer dans les nuances de la combinaison de jeux de données. Nous détaillons les sept différents types de jointures dans PySpark et expliquons comment fusionner des DataFrames en toute sécurité.
3m 32s
13
Vieux SQL, nouvelles astuces
Pourquoi apprendre une nouvelle API quand vous pouvez utiliser du SQL brut ? Apprenez à exécuter des requêtes SQL standards directement sur des DataFrames PySpark distribués.
3m 12s
14
Interchanger DataFrames et SQL
Mélangez et associez SQL et Python en toute fluidité. Découvrez comment créer des vues temporaires à partir de DataFrames, utiliser selectExpr et enchaîner des opérations programmatiques sur les résultats de requêtes SQL.
3m 27s
15
Étendre Spark avec les UDFs Python
Lorsque les fonctions intégrées ne suffisent pas, les User-Defined Functions entrent en jeu. Nous explorons comment écrire une logique Python personnalisée pour les DataFrames, et pourquoi les UDFs scalaires standards cachent une pénalité de performance.
3m 20s
16
Booster les UDFs avec Apache Arrow
Éliminez le goulot d'étranglement de la sérialisation JVM vers Python. Nous découvrons comment les Vectorized Pandas UDFs et les formats de mémoire Apache Arrow boostent vos transformations personnalisées.
3m 12s
17
Exploser les lignes avec les UDTFs Python
Les UDFs standards renvoient une valeur par ligne, mais que faire si vous avez besoin de plusieurs lignes ? Apprenez comment les Python User-Defined Table Functions (UDTFs) résolvent les problèmes complexes de génération un-à-plusieurs.
3m 47s
18
L'API Pandas sur Spark
Mettez à l'échelle vos scripts Pandas existants à l'infini. Découvrez comment l'API pyspark.pandas vous permet d'exécuter la syntaxe Pandas standard nativement sur un cluster Spark distribué.
3m 34s
19
Chargement et formats de stockage
Tous les formats de fichiers ne se valent pas. Nous comparons les CSV orientés lignes avec les formats orientés colonnes comme Parquet et ORC, en explorant les options de lecture/écriture et les techniques de stockage optimales.
3m 22s
20
Chasse aux bugs : Plans physiques et jointures
Jetez un coup d'œil sous le capot du moteur d'exécution de Spark. Apprenez à déboguer les requêtes en utilisant DataFrame.explain() et à éliminer les shuffles coûteux en utilisant les broadcast joins.
3m 06s
21
Profiling de la mémoire et des performances de PySpark
Nous concluons notre voyage PySpark en présentant des outils de profiling natifs. Apprenez à suivre la consommation de mémoire ligne par ligne et à exposer les tracebacks internes cachés de Python.
3m 38s
Épisodes
1
Le problème du Big Data et la promesse de PySpark
4m 06s
Nous établissons le besoin fondamental d'utiliser PySpark. Découvrez pourquoi les bibliothèques Python standards comme Pandas échouent à grande échelle, et comment PySpark fournit un moteur d'exécution distribué pour traiter des jeux de données massifs en toute fluidité.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 1 sur 21. Ton script Python standard fonctionne parfaitement en phase de test, mais dès que ton dataset atteint cinquante gigaoctets, il plante avec une erreur OutOfMemory. Tu as atteint les limites physiques d'une seule machine. La solution à ce bottleneck est au cœur de cet épisode : le problème du big data et la promesse de PySpark.
Les outils data Python standards sont conçus pour une exécution single-node. Des librairies comme pandas sont incroyablement efficaces, mais elles nécessitent que l'intégralité du dataset réside en mémoire locale. Si ton serveur a seize gigaoctets de RAM et que tu essaies de charger cinquante gigaoctets de logs applicatifs, l'OS intervient et kill le process. Faire du scaling vertical en louant un serveur plus grand et plus cher ne fait que retarder l'inévitable. La data grossit plus vite que les upgrades hardware. Tôt ou tard, la data finit par dépasser la machine.
PySpark résout cette limitation. C'est l'API Python pour Apache Spark. Apache Spark lui-même est un moteur de calcul distribué qui tourne sur la Java Virtual Machine. PySpark sert de pont, te permettant d'écrire ta logique purement en Python tout en tirant parti du moteur distribué hautement optimisé de Spark.
Ton architecture passe ainsi du scaling vertical au scaling horizontal. Au lieu de s'appuyer sur une seule machine massive, PySpark partitionne ta data et distribue tes calculs sur un cluster de plusieurs petites machines, appelées nodes. Tu écris ton code Python, et PySpark le traduit en un plan d'exécution parallèle. Si ton volume de data double le mois prochain, tu n'as pas besoin de réécrire une seule ligne de code. Tu ajoutes simplement plus de nodes au cluster.
L'écosystème PySpark est organisé en quelques modules principaux conçus pour différents workloads. Le premier est Spark SQL. C'est la fondation de la plupart des applications PySpark modernes. Il fournit une structure DataFrame pour gérer de la data tabulaire répartie sur plusieurs machines. Il te permet aussi d'exécuter des requêtes SQL standards directement sur ces datasets distribués.
Ensuite, il y a Structured Streaming. Ce module gère les pipelines de data en temps réel. Au lieu de traiter un batch massif de data pendant la nuit, Structured Streaming traite en continu des flux de records, comme des relevés de capteurs en direct ou des events de trafic web. Il utilise exactement le même modèle de programmation que Spark SQL, ce qui signifie que ta logique de batch processing et ta logique de streaming sont quasiment identiques.
Ensuite, il y a MLlib, la librairie de Machine Learning. Entraîner des modèles sur des datasets massifs sur une seule machine est un bottleneck bien connu. MLlib fournit des algorithmes de machine learning distribués pour des tâches comme la classification, la régression et le clustering. Elle répartit les opérations mathématiques lourdes sur l'ensemble du cluster, réduisant considérablement le temps de training.
Voici l'idée clé. La véritable puissance de PySpark, c'est l'abstraction. Tu ne découpes jamais manuellement tes fichiers massifs en chunks. Tu n'écris jamais de code réseau pour coordonner les serveurs. Tu définis simplement une séquence logique de transformations, et le moteur sous-jacent gère la distribution de la data, l'exécution parallèle, et même le processus de recovery si un node perd son alimentation en plein calcul.
PySpark n'est pas juste un utilitaire pour ouvrir de plus gros fichiers. C'est un changement fondamental : on passe d'un computing limité par une seule carte mère à un computing limité uniquement par la taille de ton cluster.
Si tu trouves ces épisodes utiles et que tu veux soutenir l'émission, tu peux chercher DevStoriesEU sur Patreon. C'est tout pour cet épisode. Merci pour ton écoute, et continue à builder !
2
La révolution Spark Connect
3m 05s
Explorez l'architecture Spark Connect. Nous expliquons comment PySpark a découplé le client et le serveur, vous permettant d'exécuter des applications Spark n'importe où sans les lourdes dépendances de la JVM.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 2 sur 21. Pendant des années, écrire du code PySpark en local impliquait de se trimballer une Java Virtual Machine massive et lourde, juste pour tester un simple script. Tu devais synchroniser parfaitement les versions de Python, les configurations Java et les dépendances du cluster avant même d'écrire la moindre ligne de logique. La révolution Spark Connect rend tout ça complètement obsolète.
Traditionnellement, PySpark reposait sur une architecture fortement couplée. Ton script Python et le moteur d'exécution Spark devaient coexister sur la même machine physique ou virtuelle. Lancer une session PySpark impliquait de faire tourner une Java Virtual Machine en arrière-plan via une bridge library. Cette architecture surchargeait ton environnement de développement local avec tout le poids du moteur d'exécution Spark. Ça rendait l'intégration de PySpark dans des applications web, des éditeurs de code modernes ou des edge devices vraiment peu pratique.
Spark Connect résout ce problème en introduisant une architecture client-serveur découplée. Ton environnement Python est maintenant strictement séparé du serveur Spark. Le client PySpark local devient une library légère. Il ne nécessite plus d'installation Java locale et n'exécute plus lui-même les tâches de traitement de données. Il agit purement comme une interface remote vers le vrai cluster Spark.
Voici l'idée clé. Quand tu écris des opérations DataFrame avec Spark Connect, le client léger enregistre tes method calls et les traduit en un unresolved logical plan. Tu peux voir ce plan comme un blueprint abstrait de ta query, qui décrit strictement quelles données traiter sans se soucier de comment elles sont traitées. Le client package ce blueprint en utilisant Protocol Buffers et le transmet via une connexion réseau gRPC au serveur Spark remote. Le serveur unpack le plan, gère toute l'optimisation complexe de la query, exécute le job sur le cluster, et enfin stream les résultats calculés vers ton script Python.
Mettre ça en place demande un petit changement dans la façon dont tu démarres ton application. Tu utilises toujours le builder SparkSession, mais au lieu de te reposer sur des configurations locales, tu appelles la méthode remote. Tu fournis une connection string qui détaille où se trouve le serveur Spark. Cette string utilise un connection scheme dédié qui commence par les lettres s c. Donc, si tu te connectes à un serveur de test local sur le port par défaut, tu fournis la string s c deux-points slash slash localhost deux-points un cinq zéro zéro deux. Après cette unique étape de connexion, tu écris ton code DataFrame exactement comme tu l'as toujours fait.
Comme l'exécution est entièrement remote, tu peux connecter simultanément plusieurs clients Python différents, depuis différentes applications, exactement au même serveur Spark. Le code de ton application demande simplement des transformations de données, et le gros du travail reste entièrement côté serveur.
En isolant complètement le client Python du runtime d'exécution, Spark Connect élimine les fameux conflits de dépendances qui cassaient les deployments, ce qui te permet d'upgrader tes environnements applicatifs de façon complètement indépendante du cluster Spark lui-même.
Merci d'avoir passé quelques minutes avec moi. À la prochaine, prends soin de toi.
3
DataFrames et évaluation paresseuse
3m 56s
Plongez dans l'abstraction fondamentale de PySpark : le DataFrame. Nous abordons le concept d'évaluation paresseuse, la différence entre les transformations et les actions, et pourquoi Spark planifie avant d'exécuter.
Salut, ici Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 3 sur 21. Et si ton code ne s'exécutait pas au moment où tu le tapes, mais qu'il attendait, analysait ton objectif final et calculait le chemin le plus rapide ? Tu enchaînes les filtres, les agrégations et les joins, et pourtant ta machine bronche à peine. C'est parce qu'elle ne fait rien tant que tu ne lui forces pas la main. Ce mécanisme s'appelle la lazy evaluation, et c'est le moteur principal des DataFrames PySpark.
Un DataFrame PySpark est une collection distribuée de données organisées en colonnes nommées. Si tu connais pandas, le concept est identique. La différence, c'est qu'un DataFrame PySpark répartit ses données sur plusieurs nœuds de calcul au sein d'un cluster. Historiquement, la structure fondamentale de Spark était le Resilient Distributed Dataset, plus communément appelé RDD. L'écosystème s'est largement éloigné de la manipulation brute des RDD. En fait, depuis la version 4.0 de Spark, l'utilisation directe des RDD n'est plus supportée sur Spark Connect. Les DataFrames sont désormais le standard définitif, offrant une API stricte qui permet à Spark d'optimiser automatiquement tes requêtes.
Cette optimisation repose entièrement sur la lazy evaluation. Chaque opération que tu effectues sur un DataFrame appartient à l'une de ces deux catégories strictes : une transformation ou une action. Les transformations sont des commandes qui renvoient un nouveau DataFrame. Il peut s'agir, par exemple, de sélectionner des colonnes spécifiques, de filtrer des lignes selon une condition, de regrouper des enregistrements ou de faire un join entre deux tables distinctes.
Quand tu appliques une transformation, PySpark n'exécute pas le traitement des données. Il se contente d'enregistrer l'opération. Il met à jour un plan interne appelé le logical execution plan. Tu peux écrire cinquante transformations consécutives, Spark va juste valider rapidement la syntaxe et mettre à jour son graphe. Voici le point clé. En retardant l'exécution réelle, PySpark donne à son query engine sous-jacent, le Catalyst Optimizer, une vision complète de ton data pipeline. L'optimizer inspecte toute ta chain de transformations, les réorganise pour une efficacité maximale, et supprime complètement les étapes inutiles avant même qu'un seul octet de données ne soit lu sur le disque.
Ce plan reste complètement inactif jusqu'à ce que tu invoques une action. Une action est une commande qui exige un résultat concret. Elle renvoie des données à ton driver program ou écrit les données sur le storage. Les actions courantes incluent compter le nombre total de lignes, collecter les données dans une liste Python locale, ou demander au système d'afficher les vingt premiers enregistrements sur ton écran. Au moment où tu déclenches une action, le moteur se met en marche. Il traduit ton logical plan optimisé en un physical plan, distribue les tâches aux workers du cluster, et lance le calcul.
Prends l'exemple d'un data workflow standard. D'abord, tu crées un DataFrame en pointant vers un fichier massif. Ensuite, tu fais un join avec une table distincte contenant les détails des utilisateurs. Après le join, tu filtres les résultats pour n'inclure que les utilisateurs d'une ville spécifique. Enfin, tu demandes à Spark d'afficher l'output. Grâce à la lazy evaluation, Spark ne charge pas réellement tout le fichier, ne fait pas un join distribué massif, pour ensuite filtrer les résultats à la fin. Au lieu de ça, l'optimizer regarde ta requête finale, remarque le filtre, et remonte cette opération de filtre dans la chain, bien avant que le join ne se produise. Il lit sélectivement uniquement les enregistrements pertinents, réduisant ainsi considérablement la consommation de mémoire et le trafic réseau sur le cluster.
Ton script PySpark n'est jamais une séquence de commandes immédiates. C'est un ensemble d'instructions qui dessine un plan d'architecture, et le système ne commence la construction que lorsque tu exiges enfin le résultat final.
C'est tout pour aujourd'hui. Merci pour ton écoute — va construire un truc cool.
4
Création et visualisation de DataFrames
3m 21s
Apprenez à instancier des DataFrames à partir d'objets Python bruts, de dictionnaires et de fichiers, et comment inspecter en toute sécurité vos données distribuées sans faire planter votre nœud driver.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 4 sur 21. Appeler une méthode spécifique sur un dataset massif est le meilleur moyen de faire planter instantanément toute ton application avec une erreur out-of-memory. Il est donc crucial de savoir comment transférer des données vers et depuis Spark en toute sécurité sans faire exploser ton driver node. C'est exactement ce que couvre cet épisode : la création et la visualisation de DataFrames.
Toute application PySpark a besoin de données pour fonctionner. Tu peux généralement créer des DataFrames de trois manières. Premièrement, tu peux les créer directement à partir de structures Python in-memory. Tu définis simplement une liste de dictionnaires, où chaque dictionnaire représente une ligne et les clés sont les noms des colonnes, et tu la passes à la méthode create DataFrame de ta SparkSession. Deuxièmement, si tu as déjà un DataFrame pandas in-memory, tu peux passer cet objet pandas exact à la même méthode create DataFrame. PySpark gère la conversion automatiquement. La troisième méthode, et la plus courante, c'est de lire des fichiers externes. Tu utilises l'attribut read sur ta SparkSession, suivi du format que tu veux, comme csv ou json, et tu fournis le chemin du fichier.
Une fois tes données chargées, tu dois les vérifier. Les DataFrames PySpark sont distribués, ce qui veut dire que tu ne peux pas juste faire un print de la variable pour voir les données comme tu le ferais dans un script Python standard. Pour voir la structure de tes données, tu appelles la méthode print schema. Ça affiche un arbre textuel qui montre chaque nom de colonne et son type de données correspondant. C'est le moyen le plus rapide de vérifier que ton fichier s'est chargé correctement.
Pour voir le contenu réel, tu utilises la méthode show. Par défaut, appeler show affiche les vingt premières lignes sous forme de tableau. Fais bien attention à ça. Si tes colonnes contiennent de longues strings, la méthode show les tronque. Tu peux désactiver ça en passant un argument truncate défini sur false, ou le définir sur un nombre spécifique de caractères. Si ton DataFrame a des dizaines de colonnes, la vue standard en tableau déborde de l'écran et devient illisible. Dans ce cas, tu peux passer l'argument vertical défini sur true. Ça affiche chaque ligne comme un bloc vertical de paires clé-valeur, ce qui rend les datasets larges beaucoup plus faciles à lire dans un terminal.
Maintenant, on en arrive au crash out-of-memory mentionné plus tôt. Parfois, tu as besoin de ramener les données distribuées dans des objets Python classiques. La méthode pour faire ça s'appelle collect. Voici le point essentiel. La méthode collect prend chaque ligne de chaque executor à travers tout ton cluster et la force dans la mémoire de ton seul driver node. Si ton DataFrame contient un milliard de lignes, ton driver va se retrouver out-of-memory et crasher instantanément. Tu ne devrais appeler collect que quand tu as agrégé ou filtré tes données pour obtenir une petite taille.
Quand tu travailles avec de grands datasets, extrais toujours des échantillons plus petits. Au lieu de collect, utilise la méthode take, en lui passant le nombre de lignes que tu veux. Ça renvoie une liste Python standard contenant juste ces quelques premières lignes. Si tu as besoin de vérifier la fin de ton dataset, utilise la méthode tail pour récupérer les dernières lignes. Ces deux méthodes limitent en toute sécurité la quantité de données transférées vers ton driver.
La règle pour les données distribuées est simple : pousse les calculs vers le cluster, mais limite strictement le nombre de lignes que tu ramènes vers le driver.
C'est tout pour cet épisode. Merci de ton écoute, et continue à développer !
5
Maîtriser les types de données de base
3m 41s
Un tour d'horizon des types numériques et chaînes de caractères fondamentaux de PySpark. Nous explorons comment définir explicitement des schémas à l'aide de StructType et StructField pour des pipelines de données robustes.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 5 sur 21. Te reposer sur l'inférence automatique de schema peut te faire gagner quelques lignes de code, mais ça va te coûter cher en performances en production. Le cluster doit souvent lire l'intégralité de ton dataset juste pour deviner ce qu'il y a dedans avant même de faire le moindre vrai travail. Tu règles ça en maîtrisant les data types de base et les schemas explicites.
C'est courant de confondre les types Python standards avec les data types PySpark. Quand tu déclares un integer ou une string en Python standard, cet objet vit dans la mémoire de ta machine locale. Les types PySpark fonctionnent à un niveau complètement différent. Ce sont des instructions de mapping pour le Catalyst optimizer et la Java Virtual Machine sous-jacente. Quand tu utilises les data types PySpark, tu définis une structure stricte, adaptée au cluster. Ça garantit la cohérence des données sur des centaines de worker nodes distribués et ça dicte exactement comment les données sont sérialisées sur le réseau.
PySpark fournit un type spécifique pour chaque format de données standard, et choisir le bon est crucial pour les performances. Pour les nombres, tu as ByteType pour les très petits integers, IntegerType pour les nombres standards, et LongType pour les grandes valeurs. Choisir ByteType au lieu de LongType pour un simple status code fait gagner énormément de mémoire quand ce choix est multiplié sur des milliards de lignes. Pour le texte et la logique, tu utilises StringType et BooleanType.
Bien gérer le temps est un autre domaine où le typage exact a son importance. PySpark sépare les données temporelles en DateType et TimestampType. Tu utilises DateType quand tu t'intéresses uniquement à la date calendaire, comme l'anniversaire d'un utilisateur. Tu utilises TimestampType quand tu as besoin de points précis dans le temps, en traçant à la fois la date et l'heure, la minute et la seconde exactes où un événement s'est produit.
Connaître ces types, c'est juste la base. Tu dois les appliquer directement à ton processus d'ingestion de données en utilisant un schema explicite. Tu construis ce schema avec deux objets spécifiques : StructType et StructField. Tu peux voir un StructType comme le blueprint d'une ligne entière dans ton dataframe. Un StructField, c'est le blueprint pour une seule colonne à l'intérieur de cette ligne.
Pour construire un schema explicite, tu instancies un StructType et tu lui passes une collection de StructFields. Chaque StructField demande trois arguments spécifiques. Premièrement, tu donnes le nom de la colonne sous forme de string standard. Deuxièmement, tu passes le data type PySpark spécifique que tu veux forcer, comme IntegerType ou StringType. Troisièmement, tu fournis un flag boolean qui indique si cette colonne a le droit de contenir des valeurs null.
Par exemple, tu construis un schema en commençant par un StructField appelé user identifier, assigné à un StringType, et tu mets le flag null sur false. Tu enchaînes avec un StructField appelé account age, assigné à un IntegerType, en mettant le flag null sur true. Une fois que cet objet StructType est complètement assemblé, tu le passes directement à ton dataframe reader en utilisant la méthode schema avant d'appeler la commande load pour lire tes fichiers.
C'est ça la partie importante. Quand tu fournis ce schema explicite dès le départ, PySpark saute complètement la phase de data scanning. Il applique ton blueprint directement au data stream entrant. Ça réduit considérablement le temps qu'il faut pour lire un fichier. Ça agit aussi comme une quality gate immédiate. Si un fichier malformé arrive avec du texte dans ta colonne integer, le pipeline le gère en se basant sur ta structure définie, plutôt que de décaler silencieusement le schema inféré en aval et de casser tes transformations.
Définir ton schema explicitement transforme une opération de lecture fragile et coûteuse en une étape de pipeline prévisible et hautement optimisée.
Merci d'avoir écouté, et happy coding tout le monde !
6
Les périls de la précision
3m 53s
Découvrez les différences critiques entre FloatType, DoubleType et DecimalType. Apprenez pourquoi le choix d'un mauvais type numérique peut introduire des erreurs d'arrondi désastreuses dans les données financières.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 6 sur 21. Utiliser un float standard peut sembler anodin, jusqu'à ce que ta query d'agrégation fausse silencieusement le calcul de millions de transactions financières. Un code qui tourne parfaitement peut produire des nombres légèrement, mais dangereusement, erronés. C'est exactement pour ça qu'on doit parler des dangers de la précision.
Dans PySpark, tu as trois façons principales de stocker des nombres à virgule. Tu as FloatType, DoubleType et DecimalType. Ils ne sont pas interchangeables. Une erreur courante, c'est de laisser PySpark déduire un schema à partir de tes données brutes. L'inférence assigne généralement un DoubleType à n'importe quel nombre avec une décimale. Si tu calcules des revenus financiers, te fier à ce comportement par défaut est un risque opérationnel majeur.
Pour comprendre pourquoi, on doit regarder comment FloatType et DoubleType fonctionnent sous le capot. FloatType utilise le calcul en virgule flottante IEEE 754 32 bits. DoubleType utilise la version 64 bits de ce même standard. Les deux représentent les nombres comme des fractions binaires. Pense à la fraction un tiers, qui ne peut pas être écrite parfaitement en base dix. Ça devient une string infinie de trois. Exactement la même limitation existe en binaire. Des nombres décimaux courants, comme 0,1 ou 0,2, ne peuvent pas être représentés parfaitement en base deux. L'ordinateur stocke une minuscule approximation.
Avec DoubleType, tu as 64 bits d'espace, ce qui veut dire que l'approximation est incroyablement proche du vrai nombre. Si tu fais une query sur une seule row de données, tu remarqueras rarement la différence. Mais voici le point clé. L'erreur s'accumule pendant les agrégations. Quand tu calcules les revenus financiers totaux en additionnant des milliards de rows individuelles, ces imprécisions microscopiques s'additionnent. Une fraction de centime perdue ou gagnée sur chaque transaction finit par fausser le total agrégé final de plusieurs milliers, voire millions de dollars. Ta logique d'agrégation est mathématiquement correcte, mais le data type sous-jacent corrompt le résultat.
Si ton système calcule des simulations physiques ou entraîne des modèles de machine learning, FloatType et DoubleType sont exactement ce que tu veux. Ils sacrifient l'exactitude pour un traitement hardware à haute vitesse. Mais à la seconde où tu gères de l'argent, tu as besoin d'une précision stricte et infaillible.
Ce qui nous amène à DecimalType. DecimalType n'utilise pas d'approximations à virgule flottante. Il stocke les nombres exactement comme tu les définis, en utilisant un scale fixe. Quand tu configures un DecimalType, tu définis deux paramètres distincts. Premièrement, tu spécifies la precision, qui est le nombre total maximum de chiffres que la valeur peut contenir. Deuxièmement, tu spécifies le scale, qui dicte le nombre exact de chiffres autorisés à droite de la virgule.
Si tu configures un DecimalType avec une precision de dix et un scale de deux, PySpark alloue l'espace exact nécessaire pour stocker cette valeur au centime près. Il n'y a pas de fractions binaires ni d'approximations d'arrondi.
En pratique, tu implémentes ça en prenant le contrôle strict de tes schemas. Quand tu lis des enregistrements financiers depuis un fichier source, ne laisse pas PySpark deviner les types. D'abord, tu crées un objet schema strict. Ensuite, tu définis tes champs financiers comme les revenus ou les taxes. Enfin, tu leur assignes explicitement un DecimalType avec la precision et le scale de ton choix. Une fois que ton dataframe est chargé avec ce schema, tes agrégations standard de somme ou de moyenne s'exécuteront parfaitement, de la première row jusqu'à la milliardième. Tu sacrifies une petite quantité de performances de compute par rapport à un DoubleType standard, mais tu garantis que ton reporting financier est absolument irréprochable.
La règle est simple : utilise les types à virgule flottante pour la vitesse et les approximations scientifiques, mais à la seconde où un nombre représente une devise, verrouille-le avec un DecimalType. Merci d'avoir écouté. À la prochaine !
7
Apprivoiser les données complexes et imbriquées
3m 34s
Le Big Data n'est pas toujours plat. Nous explorons les types de données complexes de PySpark, notamment ArrayType, StructType et MapType, vous permettant d'analyser nativement du JSON profondément imbriqué.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 7 sur 21. Dans le monde réel, le big data ressemble rarement à une simple feuille de calcul. Parfois, tu as besoin d'un array de dictionnaires imbriqués juste pour parser un seul event JSON. Pour gérer ça, il faut qu'on parle de la maîtrise des données complexes et imbriquées.
Les workflows relationnels préfèrent les tables plates, mais les event data modernes arrivent souvent très imbriquées. PySpark gère ça en fournissant trois types de données complexes. Ce sont ArrayType, StructType et MapType. Ils te permettent de modéliser explicitement des structures hiérarchiques nativement dans le moteur. Prends un profil client standard pour voir comment ces types fonctionnent.
Le premier concept, c'est ArrayType. Ça représente une collection d'éléments. La règle stricte, c'est que chaque élément à l'intérieur d'un ArrayType doit partager exactement le même type de données sous-jacent. Tu ne peux pas mélanger des strings et des integers dans le même array. Si ton profil client inclut une liste d'ID de commandes récentes, tu définis cette colonne comme un ArrayType contenant des integers.
Ensuite, on a StructType. Un StructType modélise un record hiérarchique imbriqué, qui fonctionne en gros comme une row intégrée dans une autre row. Il contient des champs spécifiques et nommés. Contrairement à un array, chaque champ à l'intérieur d'un StructType peut avoir un type de données complètement différent. Supposons que ton client ait une adresse. Cette adresse contient un nom de rue sous forme de string, un code postal sous forme d'integer, et un flag boolean indiquant si c'est une propriété commerciale. Tu regroupes ces champs distincts ensemble dans un seul StructType.
Voici le point clé. Tu peux imbriquer ces types complexes aussi profondément que tu veux. Si un client a plusieurs adresses, tu ne crées pas de colonnes plates et numérotées. À la place, tu crées un ArrayType où le type d'élément interne est exactement ce StructType d'adresse. Tu as maintenant un array de structs, ce qui correspond parfaitement à un array JSON standard d'objets.
La troisième structure, c'est MapType, conçue spécifiquement pour les paires key-value. Elle diffère d'un StructType dans sa façon de gérer la structure par rapport au schema. Un StructType t'oblige à hardcoder les noms de champs exacts dès le départ. Un MapType est flexible sur le contenu de ses données, mais strict sur ses types de données. Chaque key dans la map doit être d'un type spécifique, et chaque value doit être d'un autre type spécifique.
Tu pourrais utiliser un MapType pour stocker les préférences d'application du client. Les keys pourraient être des strings, comme le thème ou la langue, et les values pourraient aussi être des strings, comme dark ou English. Comme c'est un MapType, l'application upstream peut injecter de toutes nouvelles keys de préférence plus tard sans t'obliger à modifier le schema de base du DataFrame. Tu requêtes simplement les values dynamiquement via leurs keys.
Quand tu construis ce schema complexe dans ton code, tu le bâtis de l'intérieur vers l'extérieur. D'abord, tu définis les champs internes du StructType de l'adresse. Ensuite, tu passes cette struct terminée dans une définition d'ArrayType. Après, tu définis le MapType pour les préférences utilisateur. Enfin, tu englobes tous ces composants, avec des types scalaires simples comme la string du nom du client, dans un StructType maître qui définit la row globale du DataFrame.
Au lieu d'aplatir des structures imbriquées en strings JSON brouillonnes, définir explicitement ces schemas complexes permet à l'optimiseur Spark de pruner les données et de filtrer en profondeur dans les champs imbriqués sans désérialiser toute la payload en mémoire.
Merci de ton écoute, on se capte la prochaine fois.
8
Conversion de types et sélection
3m 17s
Apprenez à modeler activement les schémas de vos DataFrames. Nous couvrons la sélection de sous-ensembles de colonnes et la manière de convertir en toute sécurité des colonnes d'un type de données à un autre.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 8 sur 21. Une simple valeur string cachée dans une colonne d'entiers peut complètement paralyser un cluster de mille nœuds. Tu as besoin d'un moyen fiable d'imposer les bonnes structures de données et de choisir exactement quelles données traversent ton pipeline. C'est pour ça qu'on va regarder le Type Casting et la sélection aujourd'hui.
Pour manipuler des données dans PySpark, tu dois d'abord comprendre ce qu'est vraiment une colonne. Une instance de colonne n'est pas un array physique de données chargé en mémoire. C'est la représentation d'une expression évaluée de façon lazy. Quand tu références une colonne dans ton code, tu ne touches pas aux données sous-jacentes. Tu ajoutes simplement une étape au logical plan de Spark. Les données ne bougent que lorsqu'une action est déclenchée plus tard.
Pour récupérer et modeler ces données, tu utilises la méthode select sur ton DataFrame. Tu as deux façons principales de dire à la méthode select quelles colonnes tu veux. La plus simple, c'est de passer les noms de colonnes sous forme de strings de texte standard. Si tu passes une string à select, Spark te renvoie un nouveau DataFrame qui contient exactement cette colonne, sans aucune modification. Ça marche bien pour une extraction basique, mais ça ne laisse aucune place pour la modification.
Pour modifier les données pendant la sélection, tu dois utiliser des objets Column au lieu de strings. Tu accèdes à un objet Column en le référençant directement depuis le DataFrame. Tu peux faire ça en utilisant la dot notation, comme dataframe point age, ou en utilisant la bracket notation avec le nom de la colonne sous forme de string à l'intérieur des crochets. La bracket notation est particulièrement utile quand tes noms de colonnes contiennent des espaces ou des caractères spéciaux qui casseraient la dot notation standard.
C'est la partie qui compte. Quand tu passes un objet Column à la méthode select, tu peux lui attacher des méthodes pour transformer les données à la volée. L'une des transformations les plus critiques, c'est la conversion de type. Les données arrivent souvent dans le mauvais format. Par exemple, tu pourrais recevoir des métriques numériques formatées comme des strings de texte. Pour corriger ça, tu utilises la méthode cast. PySpark fournit aussi un alias appelé astype, qui exécute exactement la même logique.
Tu appelles la méthode cast directement sur ton objet Column à l'intérieur du statement select. La méthode cast nécessite un argument, qui est le data type cible. Tu peux définir cette cible en passant une représentation string du type, comme le mot int, ou en passant un objet data type Spark spécifique, comme IntegerType.
Voici comment ça se passe dans un vrai script. Tu appelles la méthode select sur ton DataFrame. À l'intérieur des parenthèses de cette méthode, tu références ta colonne cible en utilisant la bracket notation. Juste à côté de cette référence de colonne, tu appelles point cast et tu fournis ton nouveau type. Une fois évalué, ça te renvoie un tout nouveau DataFrame où ta colonne sélectionnée est maintenant convertie en toute sécurité vers le type spécifié. Le DataFrame d'origine reste complètement intact parce que les DataFrames sont immuables.
Ce qu'il faut retenir, c'est que le type casting dans PySpark n'est pas un processus standalone appliqué in place à un dataset existant. C'est une expression de colonne évaluée de façon lazy, intrinsèquement liée à l'action de sélectionner des données pour construire un nouveau DataFrame fortement typé.
Si tu aimes le podcast et que tu veux soutenir l'émission, tu peux chercher DevStoriesEU sur Patreon. C'est tout pour cet épisode. Merci d'avoir écouté, et continue de builder !
9
Carrefour des fonctions : Nettoyer les données sales
3m 34s
À données inexactes, résultats erronés. Apprenez les transformations essentielles des DataFrames pour supprimer les valeurs nulles, remplir les valeurs manquantes et gérer les enregistrements NaN nativement dans les systèmes distribués.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 9 sur 21. Garbage in, garbage out. Mais que fais-tu quand ton dataset rempli de déchets pèse des centaines de téraoctets et que tu ne peux pas inspecter manuellement une seule ligne ? Il te faut une méthode systématique pour le nettoyer à grande échelle. C’est exactement ce qu'on aborde aujourd’hui dans Function Junction : Cleaning Dirty Data.
La première étape du nettoyage, c'est généralement de standardiser ton schema. Tu vas souvent recevoir des fichiers bruts avec des espaces, des caractères spéciaux ou des fautes de frappe dans les en-têtes. Utilise la méthode appelée withColumnRenamed. Tu lui passes simplement l’ancien nom en string et le nouveau nom en string que tu veux. Si tu as plusieurs colonnes à corriger, tu chain cette méthode séquentiellement pour chaque colonne avant d’appliquer des transformations complexes en aval.
Avant de supprimer les mauvaises données, on doit dissiper une confusion fréquente concernant null et NaN dans PySpark. Null signifie qu’un point de donnée est totalement absent. NaN signifie Not a Number, ce qui représente un résultat mathématique indéfini, comme diviser zéro par zéro. En pur Python, ils nécessitent un traitement séparé. Cependant, PySpark les regroupe pour plus de praticité. Quand tu utilises les fonctions NA du data frame, Spark évalue les valeurs NaN comme des nulls dans le but de faire un drop ou un fill.
Pour éliminer les lignes avec des valeurs manquantes, tu utilises la méthode NA dot drop. Appeler cette fonction complètement à vide va drop n'importe quelle ligne contenant un null ou un NaN dans n'importe quelle colonne. Cette approche est très destructrice sur les datasets très larges. Une seule valeur manquante dans une colonne de metadata optionnelle va effacer une ligne de données de transaction par ailleurs parfaite. Pour éviter ça, passe une liste de noms de colonnes au paramètre subset. PySpark va alors évaluer uniquement ces colonnes critiques et spécifiques au moment de décider s'il faut drop la ligne.
Faire un drop de lignes n'est pas toujours permis par les règles métier. Souvent, tu dois remplacer les valeurs manquantes par des valeurs par défaut sûres. Tu accomplis ça en utilisant NA dot fill. Même si tu peux passer une seule valeur pour faire un fill sur toutes les colonnes, la meilleure approche est de passer un dictionnaire. Les clés du dictionnaire représentent les noms de colonnes spécifiques, et les valeurs représentent tes remplacements choisis. Ce pattern te permet de faire un fill sur une métrique numérique manquante avec un zéro, tout en remplaçant simultanément une catégorie manquante par une string de texte comme unknown. Faire ça via un dictionnaire s'exécute en une seule passe, ce qui est très efficace.
Enfin, tes données peuvent être entièrement remplies mais toujours invalides. Les outliers et les valeurs physiquement impossibles nécessitent un filtrage logique. Tu isoles les bonnes données en utilisant la méthode where pour ne garder que les lignes qui satisfont une condition spécifique. Pour les limites numériques ou de dates, la méthode between est ton meilleur outil. Tu sélectionnes ta colonne, tu appelles between, et tu fournis les limites inférieure et supérieure. Ça remplace la logique verbeuse du greater-than et less-than, ce qui rend ton code plus facile à lire. Toute ligne tombant en dehors de ces limites est filtrée en dehors du data frame résultant.
Voici le point clé. L'ordre compte énormément quand on nettoie à grande échelle. Renomme toujours les colonnes en premier pour verrouiller ton schema, fais un drop ou un fill des valeurs manquantes ensuite pour stabiliser tes types de données, et filtre les outliers en dernier, uniquement quand tu sais que les données sous-jacentes sont structurellement saines.
C'est tout pour cette fois. Merci d'avoir écouté, et continue de développer !
10
Transformer et remodeler les données
3m 45s
Prenez le contrôle de la forme de vos données. Nous explorons comment générer de nouvelles colonnes avec des fonctions mathématiques, effectuer des manipulations de chaînes de caractères et aplatir des tableaux imbriqués à l'aide de explode().
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 10 sur 21. Parfois, une seule ligne de données contient un array d'enregistrements cachés, et tu dois exploser cet array pour l'analyser correctement. Transformer et faire du reshaping sur les données, c'est comme ça que tu déballes, formates et structures cette information pour le processing en aval.
Quand tu dois modifier un dataframe dans PySpark, tu ne changes pas les données in place. Les dataframes sont immuables. À la place, tu crées de nouvelles versions en utilisant une méthode qui s'appelle withColumn. Cette méthode prend deux arguments. Le premier est une string qui représente le nom de la colonne que tu veux créer ou remplacer. Le deuxième est une expression de colonne qui définit les données réelles. Si tu donnes un nom qui existe déjà dans le dataframe, PySpark écrase la colonne d'origine. Si le nom est complètement nouveau, PySpark ajoute la nouvelle colonne sur le côté droit de ton dataset.
Pour définir ce qui va dans cette nouvelle colonne, tu utilises généralement les built-in functions de PySpark. Elles sont importées depuis le module SQL functions et fournissent des opérations hautement optimisées qui s'exécutent sur tout ton cluster. Prends l'exemple de la manipulation de strings. Les données textuelles qui viennent de sources externes sont rarement formatées à la perfection. Tu pourrais avoir une colonne qui contient des noms d'utilisateurs écrits dans un mélange imprévisible de majuscules et de minuscules. Tu peux corriger ça en passant ta colonne existante à une built-in function comme lower, qui force tout le texte en minuscules. Sinon, tu peux utiliser une fonction de capitalisation pour t'assurer que la première lettre est en majuscule et le reste en minuscules.
En pratique, tu intègres ces opérations directement dans les transformations de ton dataframe. Tu appelles withColumn, tu nommes ta colonne cible, et tu lui assignes le résultat de la fonction lower appliquée à ta colonne d'entrée. PySpark évalue cette expression pour chaque ligne. Tu peux enchaîner plusieurs appels à withColumn pour appliquer plusieurs transformations de façon séquentielle, en passant le dataframe mis à jour progressivement à l'étape suivante à chaque fois.
Maintenant, la deuxième partie de tout ça, c'est le reshaping. Nettoyer des strings modifie les valeurs, mais que se passe-t-il quand la forme fondamentale de tes données empêche l'analyse ? C'est là que ça devient intéressant. Tu pourrais recevoir un dataset où l'identifiant d'une personne est dans une colonne, et ses revenus mensuels pour toute l'année sont regroupés dans un seul array dans la colonne adjacente. Tu ne peux pas lancer des agrégations relationnelles standards sur un array nested. Tu as besoin que chaque valeur de revenu individuelle soit sur sa propre ligne pour calculer des moyennes ou trouver des minimums.
Tu résous ce problème structurel en utilisant une built-in function qui s'appelle explode. La fonction explode gère spécifiquement les arrays et les maps. Tu appelles withColumn, tu spécifies le nom de la colonne que tu veux pour l'output, et tu passes la fonction explode qui englobe ta colonne array. PySpark exécute ça en prenant la ligne unique d'origine et en l'éclatant. Si l'array de revenus contient douze valeurs distinctes, explode génère douze lignes entièrement séparées.
Dans le nouveau dataframe, la colonne cible contient maintenant une seule valeur de revenu flat par ligne au lieu d'une liste. Point crucial, PySpark duplique toutes les autres colonnes de la ligne d'origine. L'identifiant de l'utilisateur est copié à l'identique sur les douze nouvelles lignes. La relation logique entre l'utilisateur et son revenu reste parfaitement intacte, mais les données sont maintenant flat. Tu as transformé une structure nested en une table longue prête pour les opérations standards de grouping et de filtering.
La vraie puissance des transformations PySpark, c'est que des fonctions comme explode et lower ne font pas que manipuler des valeurs individuelles. Elles définissent un plan de calcul logique qui scale instantanément, que tu aies cent lignes ou cent milliards de lignes, sans jamais te demander d'écrire la moindre boucle manuelle. C'est tout pour cet épisode. À la prochaine !
11
La mécanique du regroupement et de l'agrégation
3m 12s
Maîtrisez la stratégie split-apply-combine. Nous nous plongeons dans le regroupement de données par clés et l'application de puissantes fonctions d'agrégation pour résumer des jeux de données massifs.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 11 sur 21. Quand tu te retrouves face à des milliards de records individuels, c'est impossible de les lire ligne par ligne. Pour en extraire un vrai sens, tu dois les résumer. Aujourd'hui, on va voir exactement comment ça se passe : la mécanique du grouping et de l'agrégation.
Sous le capot, PySpark traite les agrégations en utilisant une data strategy classique appelée split-apply-combine. Ce pattern est exactement ce que son nom indique. D'abord, PySpark divise le dataset massif en buckets logiques distincts, en se basant sur une clé que tu choisis. Ensuite, il applique un calcul spécifique à chaque bucket de façon indépendante à travers le cluster. Enfin, il combine ces réponses indépendantes pour donner un seul résultat résumé.
Dans ton code, tu déclenches la phase de split en appelant la méthode group by sur ton DataFrame. Tu fournis simplement le nom de la colonne que tu veux utiliser comme grouping key. Par exemple, si tu as une table massive de transactions historiques, tu pourrais faire un group by sur la colonne user name.
Voici le point clé. Appeler group by ne retourne pas un nouveau DataFrame. À la place, ça retourne une structure de transition appelée un objet GroupedData. Parce que PySpark évalue ton code de façon lazy, il a seulement construit le plan d'exécution pour organiser ces buckets. Il ne va pas réellement bouger de data tant que tu ne lui auras pas dit quelle opération mathématique effectuer sur ces buckets.
Pour fournir cette opération mathématique, tu chaînes la méthode aggregate, généralement écrite agg, directement sur tes grouped data. Ça gère les phases apply et combine. À l'intérieur de la méthode aggregate, tu dis à PySpark quoi calculer en utilisant des outils du module PySpark SQL functions. Ce module contient des dizaines d'opérations d'agrégation optimisées.
Disons que tu veuilles calculer le revenu moyen pour chacun de ces utilisateurs. Tu importerais la fonction average, qu'on appelle généralement avg. Tu passes le nom de ta colonne income dans la fonction average, et tu places ça à l'intérieur de la méthode aggregate. Quand ça s'exécute, PySpark calcule le revenu moyen pour chaque bucket d'utilisateur distinct simultanément. La phase de combine prend ensuite le relais, retournant un DataFrame standard et lisible. Ce nouveau DataFrame contient juste une ligne par utilisateur, associée à son revenu moyen fraîchement calculé.
À ce stade, tu as une table parfaitement résumée. Cependant, comme le calcul s'est fait en parallèle sur un cluster distribué, les lignes finales sont retournées dans l'ordre aléatoire dans lequel les processing nodes ont terminé leur travail. Si tu as besoin de voir les plus hauts revenus, un ordre aléatoire est inutile.
Pour corriger ça, tu chaînes la méthode order by à la fin de ton étape d'agrégation. Tu passes à la méthode order by la colonne qui contient tes nouvelles moyennes, et tu lui dis de trier par ordre décroissant. PySpark va prendre les résultats combinés, les classer, et livrer une table propre et triée.
Le pattern split-apply-combine est puissant précisément parce qu'il s'adapte parfaitement à du hardware distribué, ce qui permet de résumer des datasets massifs en quelques secondes. Mais rappelle-toi que le grouping des data n'est que la moitié de l'opération. Le grouping nécessite une agrégation pour finir le travail, sinon tu te retrouves juste avec un cluster plein de buckets vides qui attendent des instructions.
Merci d'avoir passé quelques minutes avec moi. À la prochaine, prends soin de toi.
12
Quand les DataFrames entrent en collision : L'art des jointures
3m 32s
Naviguer dans les nuances de la combinaison de jeux de données. Nous détaillons les sept différents types de jointures dans PySpark et expliquons comment fusionner des DataFrames en toute sécurité.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 12 sur 21. Fusionner deux tables massives est l'opération la plus coûteuse en calcul distribué. Applique la mauvaise logique de matching, et ça devient le moyen le plus simple de faire crasher ton cluster avec un out of memory. Savoir exactement comment combiner des datasets en toute sécurité, c'est tout le sujet de When DataFrames Collide: The Art of Joining.
Le mécanisme principal pour combiner des données dans PySpark, c'est la méthode join. Tu l'appelles sur ton DataFrame de base, en lui passant le DataFrame que tu veux attacher, la ou les colonnes spécifiques sur lesquelles faire le matching, et la méthode de join. Si tu ne fournis aucune méthode de join, PySpark fait un inner join par défaut.
Prenons un scénario concret. Tu as un premier DataFrame qui enregistre la taille des gens, et un deuxième DataFrame qui enregistre leurs revenus. Les deux datasets partagent une colonne qui s'appelle name.
Avec un inner join, PySpark regarde la colonne name dans les deux datasets et ne garde que les lignes où le name existe aux deux endroits. Si une personne apparaît dans les données de taille mais manque dans les données de revenus, son enregistrement est complètement droppé du résultat.
Pour conserver les enregistrements qui n'ont pas de match, tu changes le type de join. Un left join garde chaque ligne de ton DataFrame de départ, qui dans ce cas est celui des tailles. Si PySpark trouve un name qui matche dans les données de revenus, il ajoute ce revenu. S'il ne trouve pas de match, il garde la ligne de taille mais place une valeur null dans la colonne des revenus. Un right join fait exactement l'inverse, en gardant tous les revenus et en remplissant les tailles manquantes avec des nulls.
Quand tu as besoin d'absolument tout, tu utilises un full join. PySpark conserve chaque enregistrement des deux DataFrames. Les names qui matchent sont fusionnés en une seule ligne, et tous les names qui n'existent que dans un seul dataset sont conservés, avec des valeurs null pour remplir les données manquantes de l'autre côté.
Voici le point clé. Un cross join fonctionne différemment parce qu'il ignore complètement la condition de join. Il associe chaque ligne du DataFrame heights avec chaque ligne du DataFrame incomes, ce qui crée un produit cartésien. Si les deux tables ont juste mille lignes, un cross join sort un million de lignes. Cette croissance explosive est la raison pour laquelle les cross joins sont fortement restreints par défaut et nécessitent souvent une configuration explicite pour s'exécuter sans lever une erreur.
Les deux derniers types de join sont en fait des opérations de filtrage plutôt que de vraies fusions de données. Un left semi join cherche des matchs, et ne renvoie les lignes du DataFrame heights que si le name apparaît aussi dans le DataFrame incomes. La différence cruciale avec un inner join, c'est qu'un left semi join ne récupère aucune colonne du côté droit. Tu te retrouves avec exactement les mêmes colonnes qu'au départ, juste filtrées pour ne garder que les enregistrements qui ont un match correspondant.
Un left anti join fait exactement l'inverse. Il renvoie les lignes du DataFrame heights uniquement si le name n'existe pas dans les données incomes. Il drop complètement les colonnes du côté droit. Ça fait du left anti join le moyen le plus efficace pour identifier des données manquantes ou trouver des enregistrements qui ont échoué au traitement en downstream. Le choix du join détermine non seulement quelles données tu récupères, mais aussi quelle quantité de données doit physiquement transiter sur ton réseau pour générer le résultat.
Merci de ton écoute. À la prochaine !
13
Vieux SQL, nouvelles astuces
3m 12s
Pourquoi apprendre une nouvelle API quand vous pouvez utiliser du SQL brut ? Apprenez à exécuter des requêtes SQL standards directement sur des DataFrames PySpark distribués.
Bonjour, ici Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 13 sur 21. Tu as une équipe d'analystes qui écrit un excellent SQL, mais tes données se trouvent sur un énorme cluster distribué. Tu pourrais les forcer à apprendre une toute nouvelle syntaxe Python, ou tu pourrais les laisser libérer la puissance du langage qu'ils connaissent déjà. C'est là qu'intervient l'exécution de strings SQL brutes directement dans PySpark, pour donner de nouveaux pouvoirs à ce bon vieux SQL.
PySpark t'offre un pont direct vers le SQL standard via une seule méthode sur ta session Spark, appelée simplement sql. Tu passes une string SQL brute à cette méthode. L'output n'est pas du texte brut. C'est un DataFrame PySpark standard. Ça veut dire que tu peux exécuter une query de base de données standard, récupérer un DataFrame, et le passer immédiatement à une autre fonction Python. C'est totalement interopérable.
Avant que tu puisses requêter des données avec SQL, PySpark a besoin de savoir quelles tables existent. Tu as deux moyens principaux d'exposer tes données au moteur SQL. Premièrement, si tu as déjà un DataFrame en Python, tu peux appeler une méthode pour l'enregistrer comme temporary view. Tu lui donnes un nom sous forme de string, et soudainement, il agit comme une table dans tes requêtes SQL. Deuxièmement, tu peux créer des tables entièrement à l'intérieur de ta string SQL. Tu passes un statement create table à la méthode sql. À l'intérieur de cette string, tu définis le schéma et tu dis exactement à PySpark où se trouvent les fichiers de données sous-jacents, comme un chemin de cloud storage contenant des fichiers Parquet. PySpark enregistre ça dans son catalog interne. À partir de là, tu la requêtes par son nom, exactement comme une table de base de données traditionnelle.
Compare à quoi ressemble la même logique dans les deux approches. Disons que tu as besoin de récupérer les noms des clients, de dropper tous ceux qui ont un solde à zéro, et de merger le résultat avec une table orders. Dans l'API DataFrame, tu construis une chain de méthodes Python. Tu appelles select sur ton dataset customer pour choisir la colonne name. Ensuite, tu enchaînes une méthode filter, pour vérifier si le solde est supérieur à zéro. Enfin, tu ajoutes une méthode join qui référence le dataset orders sur une clé correspondante. C'est hautement programmatique. Dans l'approche SQL, tu écris un statement select standard qui tire la colonne name, tu ajoutes une clause where pour le solde, et tu écris un inner join pour la table orders. Ça se trouve dans ton script sous la forme d'un seul bloc de string lisible.
Voici le point clé. Il y a une idée reçue très courante selon laquelle écrire du SQL à l'intérieur de strings Python serait forcément plus lent ou moins natif que d'utiliser les méthodes structurées des DataFrames. C'est faux. Que tu enchaînes des méthodes Python ou que tu passes une string SQL brute, PySpark les traite de manière identique. Les deux inputs sont immédiatement parsés, traduits exactement dans le même logical plan, et transmis à l'optimizer Catalyst. Le moteur d'exécution ne sait pas et se fiche complètement de l'API que tu as utilisée pour exprimer ton intention. Les performances sont exactement les mêmes.
Le choix entre l'API DataFrame et le SQL brut n'est jamais une question de performances du cluster. C'est purement une question de ce qui rend ton équipe plus rapide et ta codebase plus facile à maintenir.
Merci d'avoir écouté. J'espère que tu as appris quelque chose de nouveau.
14
Interchanger DataFrames et SQL
3m 27s
Mélangez et associez SQL et Python en toute fluidité. Découvrez comment créer des vues temporaires à partir de DataFrames, utiliser selectExpr et enchaîner des opérations programmatiques sur les résultats de requêtes SQL.
Bonjour, ici Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 14 sur 21. Tu pourrais te retrouver coincé dans un débat pour savoir s'il faut écrire tes data transformations en Python ou en SQL. Imposer un choix strict entre les deux, c'est se priver d'énormément de possibilités. Le vrai avantage, c'est de pouvoir interchanger facilement des DataFrames et du SQL au sein du même pipeline.
Parfois, un ensemble complexe de jointures imbriquées est beaucoup plus facile à lire et à maintenir pour ton équipe en raw SQL. D'autres fois, tu as besoin d'itérer dynamiquement sur des noms de colonnes, ce qui est impossible en pure SQL mais trivial en Python. PySpark te permet de mixer les deux approches sans casser ton data flow.
Pour commencer à écrire du SQL sur un DataFrame Python existant, tu dois d'abord exposer ce DataFrame au moteur Spark SQL. Tu fais ça en appelant la méthode create or replace temp view directement sur ton DataFrame. Tu lui passes un seul argument string, qui devient le nom de la table. Cette opération ne déplace aucune donnée. Elle n'écrit pas sur le disque. Elle enregistre simplement un pointeur temporaire dans ta session Spark actuelle. Le moteur SQL sait maintenant comment résoudre ce nom de table pour retrouver ton DataFrame Python.
Maintenant, tu peux l'interroger. Tu appelles spark dot sql et tu passes ton select statement standard sous forme de string, en faisant référence au nom de la table que tu viens de créer.
Voici le point clé. L'output de cet appel spark dot sql n'est pas un résultat texte statique, ni un type d'objet différent. Il retourne un DataFrame PySpark standard. Ça veut dire que tu peux immédiatement chainer des méthodes de DataFrame Python normales directement à la fin de ton appel SQL. Tu peux écrire une string SQL de cinquante lignes pour gérer une window function complexe, fermer la parenthèse de spark dot sql, et ajouter immédiatement une méthode dot filter ou dot group by. Tu passes de Python à SQL et tu reviens à Python dans un seul bloc de code.
Si tu as seulement besoin de SQL pour le calcul d'une colonne spécifique, enregistrer une temporary view complète n'est pas nécessaire. À la place, tu utilises la méthode select expression. Cette méthode agit comme un pont. Elle fonctionne exactement comme une méthode select standard de DataFrame, mais elle accepte des expressions string en raw SQL au lieu d'objets column Python.
Si tu as besoin d'exécuter un statement case-when, de faire des fonctions mathématiques, ou de cast un data type en utilisant la syntaxe SQL native, tu passes ces strings SQL exactes dans select expression. Spark prend ces strings, les parse, et les exécute exactement comme il le ferait dans une query SQL complète. Ça te permet de rester entièrement dans l'API DataFrame chainable tout en t'appuyant sur la syntaxe SQL pour de la logique complexe au niveau de la row.
La frontière entre ces deux paradigmes est complètement artificielle. Que tu chaines des méthodes Python, que tu écrives des queries en raw SQL, ou que tu utilises des strings select expression, Spark compile tout dans exactement le même execution plan optimisé.
Si tu veux nous aider à continuer de faire ces épisodes, tu peux chercher DevStoriesEU sur Patreon pour soutenir l'émission. C'est tout pour celui-ci. Merci pour ton écoute, et continue de développer !
15
Étendre Spark avec les UDFs Python
3m 20s
Lorsque les fonctions intégrées ne suffisent pas, les User-Defined Functions entrent en jeu. Nous explorons comment écrire une logique Python personnalisée pour les DataFrames, et pourquoi les UDFs scalaires standards cachent une pénalité de performance.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 15 sur 21. Tu écris une fonction custom en Python, tu l'intègres à ton data pipeline, et elle marche parfaitement sur un petit sample. Mais quand tu la lances sur tout le dataset, le job devient super lent alors que l'utilisation du CPU explose. Le code en lui-même est bon, mais tu paies une taxe d'exécution cachée. Aujourd'hui, on parle de l'extension de Spark avec les UDF Python.
Une User Defined Function, ou UDF, te permet d'exécuter de la logique Python custom directement sur un DataFrame Spark. Tu l'utilises quand les fonctions Spark SQL built-in ne couvrent pas ta logique métier spécifique. Le process est super simple. Tu commences par écrire une fonction Python standard. Par exemple, tu écris une fonction qui prend une string en texte, applique une règle de formatage custom complexe, et retourne la string modifiée. Pour que Spark reconnaisse cette fonction, tu importes la fonction udf depuis le module PySpark SQL functions, et tu l'appliques comme decorator juste au-dessus de la définition de ta fonction Python. Tu passes aussi un return type au decorator, comme un type string ou un type integer. Si tu ne fournis pas de return type, Spark utilise un type string par défaut, ce qui peut causer des problèmes de données silencieux si ta fonction retourne en fait un nombre. Une fois décorée, ta fonction Python custom se comporte exactement comme une fonction Spark native. Tu peux la passer dans des opérations de DataFrame, comme un statement select, en lui donnant des noms de colonnes comme arguments.
Voici le point clé. Une UDF Python scalaire standard opère strictement une ligne à la fois. Elle prend une ou plusieurs valeurs de colonnes d'une seule ligne en input, évalue ta logique Python custom, et retourne exactement une valeur en output pour cette ligne spécifique. Si ton DataFrame contient dix millions de lignes, ta fonction Python est invoquée dix millions de fois séparément. Cette opération ligne par ligne est facile à comprendre, mais ça crée l'énorme bottleneck de performance dont on a parlé au début.
Pour comprendre pourquoi c'est si lent, tu dois regarder comment Spark exécute le code sous le capot. Spark est construit en Scala, ce qui veut dire que son moteur principal tourne dans une Java Virtual Machine, ou JVM. Ton UDF custom est écrite en Python. La JVM ne peut pas exécuter de code Python nativement. Pour appliquer ton UDF, Spark est forcé de lancer des process workers Python séparés, en parallèle de ses propres executors. Il doit ensuite déplacer physiquement les données hors de l'espace mémoire de la JVM pour les envoyer dans le process Python. Spark s'appuie sur une librairie de sérialisation Python appelée cloudpickle pour gérer ce transfert complexe.
C'est là que la taxe de performance est prélevée. Pour chaque ligne de ton dataset, Spark sérialise les inputs dans la JVM, envoie ces données binaires via un socket local au worker Python, et les désérialise en objets Python standards. Ta fonction custom s'exécute enfin sur ces objets. Ensuite, tout le cycle se fait à l'envers. Python sérialise la valeur en output avec cloudpickle, la renvoie via le socket, et la JVM la désérialise pour la remettre dans le format mémoire interne de Spark. Cette sérialisation et désérialisation constantes entre Java et Python sont incroyablement coûteuses.
Le vrai coût d'une UDF Python standard, c'est rarement la logique que tu écris ; c'est l'overhead silencieux de traduire les données dans un sens et dans l'autre entre deux environnements de runtime complètement différents, pour chaque ligne. Merci d'avoir passé quelques minutes avec moi. À la prochaine, prends soin de toi.
16
Booster les UDFs avec Apache Arrow
3m 12s
Éliminez le goulot d'étranglement de la sérialisation JVM vers Python. Nous découvrons comment les Vectorized Pandas UDFs et les formats de mémoire Apache Arrow boostent vos transformations personnalisées.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 16 sur 21. Et si tu pouvais multiplier par dix la vitesse de tes fonctions Python custom dans Spark, juste en changeant un seul décorateur ? Les UDFs Python standard sont réputées pour être super lentes, mais la solution ne te demande pas de réécrire ta logique en Scala. Aujourd'hui, on va voir comment booster les UDFs avec Apache Arrow.
Quand tu lances une UDF Python standard, tu te prends un énorme mur de performance à la frontière entre les langages. Spark tourne dans la Java Virtual Machine, mais ta logique custom s'exécute dans un process worker Python séparé. Pour faire passer les données de l'un à l'autre, Spark extrait les lignes de sa mémoire interne, les sérialise avec une librairie qui s'appelle cloudpickle, et les envoie à Python. Python traite les données ligne par ligne, sérialise le résultat, et le renvoie. Faire ça pour des millions de lignes individuelles crée un bottleneck de sérialisation insupportable.
Apache Arrow change les règles de cet échange de données. Arrow est un format de données in-memory, en colonnes et cross-language. Il standardise l'apparence des données en mémoire, pour que la JVM et Python les comprennent nativement sans traduction complexe. Au lieu de sérialiser les données ligne par ligne, Spark regroupe les données dans de gros batches en colonnes. Toutes les valeurs d'une colonne spécifique se retrouvent juste à côté les unes des autres dans une mémoire contiguë. Spark envoie ces gros blocs à Python en une seule étape super efficace.
Tu peux en profiter de deux manières. D'abord, tu peux activer l'optimisation Arrow pour les UDFs standard. Tu fais ça en passant la propriété de configuration Spark pour l'exécution Arrow à true, ou en spécifiant le paramètre useArrow égal à true quand tu enregistres ton UDF. Spark va utiliser Arrow pour transférer les données par batches, ce qui réduit considérablement l'overhead de sérialisation, même si ta fonction Python exécute toujours techniquement la logique ligne par ligne.
Mais voici le point clé. Pour obtenir le boost de vitesse maximum, tu veux que ton code Python traite ces batches Arrow en même temps. C'est là qu'interviennent les Pandas UDFs. En encapsulant ta fonction custom avec le décorateur pandas UDF, tu changes la façon dont la fonction reçoit les données. Au lieu de recevoir une seule valeur pour une ligne, ta fonction reçoit une Pandas Series qui contient un batch entier de valeurs. Ta fonction applique une opération vectorisée à tout ce batch et retourne une nouvelle Pandas Series d'exactement la même longueur.
Imagine une fonction qui s'appelle calculate tax. Tu appliques le décorateur pandas UDF et tu déclares qu'elle retourne un type double. La fonction accepte une Pandas Series contenant les prix des produits. À l'intérieur de la fonction, tu n'écris pas de for-loop. Tu écris simplement un return statement qui multiplie la Series d'entrée par un virgule deux. Comme Pandas tourne sur du code C hautement optimisé sous le capot, il multiplie le bloc entier de prix instantanément. Spark prend ensuite cette Series retournée et la fusionne de manière transparente dans le DataFrame en utilisant Arrow.
La vraie puissance d'une Pandas UDF, ce n'est pas juste qu'elle contourne le bottleneck de sérialisation cloudpickle, c'est qu'elle déplace ton calcul réel depuis des boucles Python lentes vers une exécution native et vectorisée.
Merci d'avoir écouté. Prenez soin de vous tous.
17
Exploser les lignes avec les UDTFs Python
3m 47s
Les UDFs standards renvoient une valeur par ligne, mais que faire si vous avez besoin de plusieurs lignes ? Apprenez comment les Python User-Defined Table Functions (UDTFs) résolvent les problèmes complexes de génération un-à-plusieurs.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 17 sur 21. Les User-Defined Functions standard sont strictement limitées à un mapping one-to-one. Tu passes une valeur en input, tu récupères exactement une valeur en output. Mais que faire si une seule entrée de log bien dense doit être explosée en une centaine de rows séparées ? Pour résoudre ça, tu utilises les Python User-Defined Table Functions, ou UDTFs.
Une UDTF fait exactement ce que son nom indique. Elle retourne une table entière à partir d'un seul input. Alors qu'une UDF standard calcule une seule valeur scalaire, une UDTF peut émettre plusieurs rows et plusieurs colonnes. C'est l'outil que tu choisis quand tu as besoin d'exploser une string JSON imbriquée, de parser un fichier texte délimité ligne par ligne, ou de générer une séquence de dates à partir d'un seul timestamp.
Pour créer une UDTF en PySpark, tu n'écris pas une fonction standalone basique. À la place, tu définis une classe Python. Cette classe requiert une méthode spécifique appelée eval. C'est dans la méthode eval que la vraie transformation se passe. Quand tu exécutes l'UDTF, Spark appelle cette méthode pour chaque valeur en input.
Voici le point clé. À l'intérieur de cette méthode eval, tu n'utilises pas une instruction return standard. À la place, tu utilises le mot-clé Python yield. À chaque fois que la méthode fait un yield d'une valeur, Spark traduit ça en une nouvelle row dans ta table en output. Si tu passes une seule string en input, la méthode eval peut boucler dessus et faire un yield dix fois. Spark prend ces dix yields et produit dix rows distinctes.
Prenons un exemple concret. Tu crées une classe appelée ProcessWords. Ton but est de passer une phrase complète et de récupérer une table où chaque mot a sa propre row. Tu écris la méthode eval pour accepter une string de texte. À l'intérieur de la méthode, tu découpes la phrase par les espaces. Ensuite, tu boucles sur les mots obtenus. Pour chaque mot, tu fais un yield d'un tuple contenant le mot lui-même.
Avant que Spark puisse utiliser cette classe, tu lui appliques le decorator UDTF de PySpark. Le decorator est obligatoire parce qu'il définit ton schema en output. Tu déclares explicitement les noms des colonnes et les types de données que ta fonction génère. Si tu fais un yield d'une string, tu dis au decorator que l'output est une colonne string. Si tu veux faire un yield du mot et de son nombre de caractères, tu fais un yield d'un tuple à deux éléments, et ton decorator spécifie un schema avec une colonne string et une colonne integer.
Au-delà de la méthode eval, une classe UDTF peut aussi inclure une méthode terminate optionnelle. Spark appelle la méthode terminate exactement une fois pour chaque partition de données, après que toutes les rows en input ont été traitées par la méthode eval. C'est super utile pour l'agrégation. Si ta méthode eval suit un compteur interne sur plusieurs rows en input, la méthode terminate peut faire un yield d'une dernière row contenant ce compte total avant que la partition ne se ferme.
Quand tu appelles une UDTF dans une opération DataFrame, elle se comporte comme une inline table. Si tu passes une colonne de DataFrame existante dans l'UDTF, Spark applique la table function row par row. Parce qu'une table function sort plusieurs rows pour chaque row en input, combiner cet output avec ton dataset d'origine nécessite une lateral join implicite. Spark gère ça en arrière-plan, en dupliquant les données de la row d'origine pour correspondre aux nouvelles rows explosées générées par ta classe Python.
La vraie force d'une UDTF Python, c'est de complètement détacher ton volume en input de ton volume en output, permettant à un seul point de données d'éclore en un dataset complet à plusieurs colonnes.
C'est tout pour cet épisode. Merci d'avoir écouté, et continue de développer !
18
L'API Pandas sur Spark
3m 34s
Mettez à l'échelle vos scripts Pandas existants à l'infini. Découvrez comment l'API pyspark.pandas vous permet d'exécuter la syntaxe Pandas standard nativement sur un cluster Spark distribué.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 18 sur 21. Tu as un script de données en local qui marche parfaitement, mais soudain, la taille de ton dataset quadruple et ta machine manque de mémoire. Tu maîtrises la syntaxe sur le bout des doigts, mais tout réécrire pour un framework distribué prendrait des jours. L'API pandas sur Spark comble exactement ce besoin.
L'API pandas sur Spark te permet de faire tourner des workloads pandas standards sur un cluster distribué. Elle ne se contente pas d'émuler pandas aveuglément. Elle intercepte ton code pandas et le traduit en plans d'exécution Spark optimisés sous le capot. Pour l'utiliser, tu importes le module nommé pyspark dot pandas. La convention standard est de lui assigner l'alias ps, ce qui reflète directement l'alias pd familier utilisé dans les workloads de data science en local.
Si tu as déjà un DataFrame pandas standard en mémoire en local, la transition est super simple. Tu invoques une fonction appelée from pandas sur ton module ps et tu lui passes ton DataFrame local. Ça convertit l'objet single-node en un DataFrame pandas-on-Spark distribué. À partir de là, la syntaxe que tu utilises pour interagir avec ce nouvel objet reste identique à ce que tu connais déjà.
Cette cohérence s'étend à la façon dont les données sont traitées en interne. L'API distribuée gère nativement les données manquantes exactement comme le fait pandas en local. Si ton dataset contient des valeurs NumPy Not-a-Number, l'API pandas sur Spark les gère correctement pendant les opérations mathématiques ou les transformations structurelles. Tu n'as pas besoin d'inventer une nouvelle logique de data cleaning pour tes jobs Spark.
Les opérations standards se traduisent directement. Si tu veux grouper tes données par une colonne spécifique, tu appelles la fonction de grouping standard. Si tu veux calculer la moyenne ou la somme, tu chaînes la fonction d'agrégation juste après. Tu peux même appeler des fonctions de plotting directement sur le DataFrame distribué. Spark traite les calculs lourds sur le cluster, agrège les data points nécessaires, et te renvoie la visualisation exactement comme si tu bossais sur une seule machine.
Voici le point clé. L'architecture en dessous est fondamentalement différente, et ça introduit un edge case critique concernant la génération d'index. Pandas en local repose énormément sur un index séquentiel et strictement ordonné pour chaque ligne. Spark, en revanche, partitionne la data et la distribue sur plusieurs machines indépendantes. Imposer un index séquentiel strict et globalement ordonné sur un système distribué nécessite une communication constante entre les worker nodes.
Quand tu crées un DataFrame pandas-on-Spark sans définir explicitement de colonne d'index, l'API génère automatiquement un index par défaut pour imiter parfaitement le comportement standard de pandas. Créer et maintenir cet index par défaut nécessite de synchroniser l'état sur l'ensemble du cluster. Si tu travailles sur un dataset massif, cette synchronisation introduit un gros overhead de performance. L'API va souvent émettre un warning concernant cet overhead interne lors de son exécution. Pour éviter ce bottleneck, il est fortement recommandé d'assigner tout de suite une colonne existante comme index, ou de configurer l'API pour utiliser un type d'index distributed-friendly.
L'API pandas sur Spark te donne la syntaxe exacte de pandas propulsée par le moteur d'exécution distribué de Spark, mais garder à l'esprit que les index séquentiels stricts entraînent un lourd coût de synchronisation évitera à ton cluster des ralentissements inutiles. C'est tout pour aujourd'hui. Merci d'avoir écouté — va construire un truc cool.
19
Chargement et formats de stockage
3m 22s
Tous les formats de fichiers ne se valent pas. Nous comparons les CSV orientés lignes avec les formats orientés colonnes comme Parquet et ORC, en explorant les options de lecture/écriture et les techniques de stockage optimales.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 19 sur 21. Sauvegarder un dataset massif en CSV, c'est le truc le plus facile au monde, mais c'est aussi l'une des pires choses que tu puisses faire pour les performances de ton data lake. Tu paies pour plus de storage, tu paies pour plus de compute, et chaque query en aval rame complètement. La solution se trouve dans la façon dont tu gères Load and Behold : Storage Formats, et pourquoi la manière dont tu sauvegardes ta data compte tout autant que la manière dont tu la transformes.
PySpark utilise une interface unifiée pour read et write de la data sur des dizaines de systèmes de stockage. Tu appelles l'attribut read ou write sur ta session Spark ou ton DataFrame, tu spécifies un format, tu fournis une chain d'options, et tu pointes vers un file path. C'est un pattern prévisible, mais les options que tu choisis dictent la quantité de travail que ton cluster devra faire plus tard.
Commençons par les formats human-readable, CSV et JSON. Ce sont des formats row-based. Quand tu lis un CSV, Spark parse la data ligne par ligne. Tu as souvent besoin de chain des options spécifiques pour interpréter le texte. Par exemple, tu pourrais chain une option pour dire à Spark que le fichier a un header, une autre option pour définir un delimiter custom comme un pipe ou une tabulation, et une troisième option pour définir exactement à quoi ressemble une valeur null, peut-être en passant une string spécifique pour que Spark la mappe correctement vers une valeur vide au lieu de la traiter comme du texte littéral. Le JSON est un peu mieux parce qu'il gère les structures imbriquées nativement, mais il répète les clés du schema pour chaque record, ce qui gonfle massivement la taille du fichier. Les deux formats forcent Spark à lire la row entière depuis le disque, même si ta query ne demande qu'une seule column.
C'est là qu'interviennent les formats columnar comme Parquet et ORC. Fais bien attention à cette partie. Les queries analytiques ont rarement besoin de chaque column dans une table très large. Elles ont généralement besoin de columns spécifiques sur des millions de rows pour faire des agrégations. Parquet et ORC stockent la data organisée par column, et non par row. Si tu query trois columns sur une centaine, Spark lit uniquement les chunks du fichier qui contiennent ces trois columns. Il skip complètement le reste, réduisant les inputs et outputs disque à une fraction de ce qu'un CSV demande. Parce que la data du même type est stockée ensemble, les formats columnar se compressent aussi à merveille. Un directory de fichiers JSON peut rétrécir de 70 % ou plus quand il est converti en Parquet. Ils embarquent aussi le schema exact et les types de data dans les metadata du fichier, ce qui veut dire que Spark n'a pas à deviner ou inférer les types au load.
Quand tu es prêt à write cette data, tu dois gérer le state à la destination. Par défaut, si tu essaies de write vers un path où de la data existe déjà, Spark throw une erreur pour éviter une perte de data accidentelle. Tu contrôles ça en utilisant la méthode mode avant de déclencher le save. Si tu passes la string overwrite, Spark supprime la data existante sur le path cible et la remplace par ton DataFrame actuel. Si tu passes append, Spark ajoute simplement tes nouveaux part files au directory existant. Il y a aussi un mode ignore, qui ne fait silencieusement rien si le directory est déjà peuplé.
Écrire de la data propre, typée et columnar aujourd'hui épargne à ton cluster des heures de processing gaspillées demain. Si tu veux aider à ce que ces épisodes continuent, tu peux soutenir l'émission en cherchant DevStoriesEU sur Patreon. Merci d'avoir passé quelques minutes avec moi. À la prochaine, prends soin de toi.
20
Chasse aux bugs : Plans physiques et jointures
3m 06s
Jetez un coup d'œil sous le capot du moteur d'exécution de Spark. Apprenez à déboguer les requêtes en utilisant DataFrame.explain() et à éliminer les shuffles coûteux en utilisant les broadcast joins.
Salut, c'est Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 20 sur 21. Ton job PySpark n'est pas lent parce qu'il calcule des données. Il est lent parce qu'il passe tout son temps à déplacer des données sur le réseau. Quand un simple join met ton cluster à genoux, la solution se trouve dans le Bug Busting : Physical Plans et Joins.
Quand tu écris un script PySpark, tu définis des logical operations. Tu dis à Spark ce que tu veux, pas comment le faire. Mais quand un job sous-performe, tu dois savoir exactement comment Spark a exécuté ta requête. Tu fais ça en appelant la méthode explain sur ton DataFrame. Appeler explain affiche le physical plan. C'est le plan détaillé des tâches réelles que Spark lance sur ton cluster. Tu lis ce plan de bas en haut, en traçant les données depuis les fichiers sources jusqu'à l'output final.
Si tu regardes le physical plan pour un join standard entre deux DataFrames, tu verras sûrement une étape appelée SortMergeJoin. Pour faire un SortMergeJoin, Spark doit s'assurer que les lignes avec les mêmes join keys sont physiquement situées sur le même executor. Pour y arriver, Spark fait un Exchange. Exchange, c'est le terme du physical plan pour un network shuffle. Ça veut dire que Spark arrache les données des partitions, les pousse sur le réseau et les écrit sur le disque pour que les autres executors puissent les lire. Le shuffle est l'opération la plus coûteuse en distributed computing.
Voici l'idée clé. Si tu fais un join entre une énorme fact table et une petite lookup table, faire un shuffle de la grande table est un énorme gaspillage de ressources. Au lieu de faire un shuffle des deux tables pour aligner les clés, tu peux juste envoyer toute la petite table à chaque executor. Ça se fait en utilisant la fonction broadcast du module PySpark SQL functions. Quand tu appelles ta méthode join, tu as juste à wrapper le plus petit DataFrame dans la fonction broadcast.
En wrappant la petite table, tu donnes à Spark une directive stricte. Spark va collecter le petit DataFrame sur le driver node, puis transmettre une copie complète dans la mémoire de chaque executor. Maintenant, quand le grand DataFrame est processé, les executors ont déjà toutes les lookup data dont ils ont besoin directement en RAM. Ils parcourent simplement leurs partitions existantes et matchent les lignes localement. Pas besoin de tri, et aucune donnée de la grande table ne bouge sur le réseau.
Si tu appelles explain sur ce nouveau broadcasted join, le physical plan a l'air complètement différent. Le SortMergeJoin a disparu. L'étape coûteuse d'Exchange est complètement absente. À la place, tu verras un BroadcastExchange et un BroadcastHashJoin. Le BroadcastExchange déplace juste la petite table une fois, et le join lui-même se fait entièrement sur place. Le moyen le plus simple de doubler la vitesse d'un job Spark, c'est d'arrêter de déplacer des données qui n'ont pas besoin de bouger. Lis tes physical plans, repère les network exchanges, et fais un broadcast de tes petites tables.
C'est tout pour aujourd'hui. Merci d'avoir écouté — va construire un truc cool.
21
Profiling de la mémoire et des performances de PySpark
3m 38s
Nous concluons notre voyage PySpark en présentant des outils de profiling natifs. Apprenez à suivre la consommation de mémoire ligne par ligne et à exposer les tracebacks internes cachés de Python.
Bonjour, ici Alex de DEV STORIES DOT EU. PySpark Fundamentals, épisode 21 sur 21. Déboguer du code Python distribué implique généralement de fouiller dans des milliers de lignes d'erreurs Java incompréhensibles, en essayant de deviner pourquoi ta fonction a échoué ou pourquoi elle a consommé toute la mémoire de ton cluster. Plus besoin de deviner. Aujourd'hui, on va regarder le profilage de la mémoire et des performances de PySpark, ainsi que la simplification des stack traces.
Quand tu écris une User Defined Function, ou UDF, en PySpark, ton code Python s'exécute sur une infrastructure Java Virtual Machine. Si ton code Python fait une division par zéro ou fait référence à une clé de dictionnaire manquante, cette simple exception Python est avalée. Elle est renvoyée par le daemon PySpark, à travers le réseau, et encapsulée dans d'énormes exceptions Java. Retrouver l'erreur Python réelle dans tes logs est fastidieux.
Tu peux résoudre ce problème en activant les tracebacks simplifiés. Quand tu mets la configuration Spark pour le traceback simplifié sur true, PySpark modifie la façon dont il signale les erreurs. Il élimine tous les logs d'interopérabilité Java et le bruit des processus workers. La prochaine fois qu'une UDF échouera, ta console affichera une stack trace Python standard et propre, indiquant le numéro de ligne exact de ton fichier Python où l'exception s'est produite.
Corriger les plantages ne représente que la moitié du travail. Corriger du code lent ou gourmand en mémoire est bien plus complexe. Si tu écris une UDF Pandas qui traite des millions de lignes, elle peut s'exécuter correctement, mais prendre beaucoup trop de temps ou provoquer des erreurs out-of-memory sur tes nœuds executors. Auparavant, identifier le goulot d'étranglement nécessitait d'ajouter des logs manuels ou de deviner quelle opération Pandas était inefficace. Spark 4.0 change la donne en introduisant des profilers d'UDF Python intégrés.
Voici l'information clé. Tu peux désormais profiler ton code Python distribué ligne par ligne, directement dans PySpark. Pour utiliser ça, tu configures le profiler d'UDF sur l'un des deux modes : performance ou mémoire.
Si tu règles la configuration du profiler sur le mot perf, Spark active le profiler de performance. Tu lances ensuite ton job Spark normalement. Pendant que les nœuds workers exécutent ton UDF Pandas, Spark enregistre le temps d'exécution de chaque ligne de ta fonction Python. Une fois ton job terminé, tu appelles la méthode show sur l'objet profile de Spark. Spark affichera alors un rapport détaillé dans ta console. Pour chaque ligne de ton code, tu verras précisément combien de fois elle a été appelée et le temps total passé à l'exécuter. Tu peux ainsi voir instantanément si une manipulation de string spécifique ou une opération mathématique ralentit l'ensemble de ton pipeline.
Si tu gères des limites de mémoire, tu configures plutôt le profiler d'UDF sur le mot memory. Le workflow est exactement le même, mais le résultat change. Quand tu regardes le rapport du profile, Spark te montre l'incrément exact en mégaoctets causé par chaque ligne de ton code Python. Tu peux voir précisément où de grands arrays sont alloués et où la mémoire ne parvient pas à être libérée.
Cette visibilité ligne par ligne élimine les devinettes lors de l'optimisation de transformations de données complexes. Tu peux identifier la cause exacte de tes problèmes de performance sans quitter ton environnement PySpark.
Puisqu'il s'agit du dernier épisode de notre série sur PySpark, je t'encourage à consulter la documentation officielle de Spark et à essayer ces outils de débogage toi-même. Si tu as des idées de technologies qu'on devrait aborder dans notre prochaine série, passe sur devstories.eu et dis-le-nous. Merci d'avoir passé quelques minutes avec moi. À la prochaine, prends soin de toi.
Tap to start playing
Browsers block autoplay
Share this episode
Episode
—
Copy this episode in another language:
Ce site n'utilise pas de cookies. Notre hébergeur peut enregistrer votre adresse IP à des fins d'analyse. En savoir plus.