Zurück zum Katalog
Season 14 21 Episoden 1h 25m 2026

PySpark Fundamentals

v4.1 — 2026 Edition. Ein umfassender Leitfaden zu PySpark 4.1, der Spark Connect, DataFrames, komplexe Datentypen, Datentransformationen, SQL, UDFs und Profiling abdeckt.

Big Data Verteiltes Rechnen Datenwissenschaft
PySpark Fundamentals
Aktuelle Wiedergabe
Click play to start
0:00
0:00
1
Das Big-Data-Problem & das Versprechen von PySpark
Wir klären die grundlegende Notwendigkeit von PySpark. Entdecke, warum Standard-Python-Bibliotheken wie Pandas bei großen Datenmengen versagen und wie PySpark eine verteilte Ausführungs-Engine bietet, um riesige Datensätze nahtlos zu verarbeiten.
3m 54s
2
Die Spark Connect Revolution
Erkunde die Spark Connect Architektur. Wir erklären, wie PySpark Client und Server entkoppelt hat, sodass du Spark-Anwendungen überall ohne sperrige JVM-Abhängigkeiten ausführen kannst.
3m 52s
3
DataFrames und Lazy Evaluation
Tauche ein in die grundlegende Abstraktion von PySpark: den DataFrame. Wir diskutieren das Konzept der Lazy Evaluation, den Unterschied zwischen Transformationen und Aktionen und warum Spark plant, bevor es ausführt.
4m 18s
4
Erstellen und Anzeigen von DataFrames
Lerne, wie man DataFrames aus rohen Python-Objekten, Dictionaries und Dateien instanziiert und wie du deine verteilten Daten sicher überprüfen kannst, ohne deinen Driver-Node zum Absturz zu bringen.
4m 06s
5
Grundlegende Datentypen meistern
Ein Rundgang durch die grundlegenden numerischen und String-Typen von PySpark. Wir untersuchen, wie man Schemata mit StructType und StructField für robuste Datenpipelines explizit definiert.
4m 23s
6
Die Tücken der Präzision
Entdecke die entscheidenden Unterschiede zwischen FloatType, DoubleType und DecimalType. Lerne, warum die Wahl des falschen numerischen Typs zu katastrophalen Rundungsfehlern in Finanzdaten führen kann.
4m 49s
7
Komplexe und verschachtelte Daten bändigen
Big Data ist nicht immer flach. Wir erkunden die komplexen Datentypen von PySpark, einschließlich ArrayType, StructType und MapType, mit denen du tief verschachteltes JSON nativ parsen kannst.
4m 17s
8
Type Casting und Selektion
Lerne, wie du deine DataFrame-Schemata aktiv formst. Wir behandeln, wie man Teilmengen von Spalten auswählt und wie man Spalten sicher von einem Datentyp in einen anderen castet.
3m 56s
9
Function Junction: Schmutzige Daten bereinigen
Garbage in, garbage out. Lerne die wesentlichen DataFrame-Transformationen kennen, um Nullwerte zu entfernen, fehlende Werte aufzufüllen und NaN-Datensätze nativ in verteilten Systemen zu behandeln.
4m 03s
10
Daten transformieren und umformen
Übernimm die Kontrolle über die Form deiner Daten. Wir untersuchen, wie man neue Spalten mit mathematischen Funktionen generiert, String-Manipulationen durchführt und verschachtelte Arrays mit explode abflacht.
4m 13s
11
Die Mechanik von Gruppierung und Aggregation
Meistere die Split-Apply-Combine-Strategie. Wir tauchen ein in das Gruppieren von Daten nach Schlüsseln und die Anwendung leistungsstarker Aggregationsfunktionen, um riesige Datensätze zusammenzufassen.
3m 46s
12
Wenn DataFrames kollidieren: Die Kunst des Joins
Navigiere durch die Nuancen der Kombination von Datensätzen. Wir schlüsseln die sieben verschiedenen Join-Typen in PySpark auf und erklären, wie man DataFrames sicher zusammenführt.
3m 59s
13
Altes SQL, neue Tricks
Warum eine neue API lernen, wenn du rohes SQL verwenden kannst? Lerne, wie du Standard-SQL-Abfragen direkt gegen verteilte PySpark-DataFrames ausführst.
3m 37s
14
Wechselspiel zwischen DataFrames und SQL
Mische SQL und Python nahtlos. Entdecke, wie du temporäre Views aus DataFrames erstellst, selectExpr verwendest und programmatische Operationen an SQL-Abfrageergebnisse anhängst.
3m 36s
15
Spark mit Python UDFs erweitern
Wenn integrierte Funktionen nicht ausreichen, kommen User-Defined Functions ins Spiel. Wir untersuchen, wie man benutzerdefinierte Python-Logik für DataFrames schreibt und warum standardmäßige skalare UDFs Leistungseinbußen verbergen.
4m 24s
16
UDFs mit Apache Arrow beschleunigen
Beseitige den JVM-zu-Python-Serialisierungsengpass. Wir decken auf, wie Vectorized Pandas UDFs und Apache Arrow-Speicherformate deine benutzerdefinierten Transformationen beschleunigen.
3m 41s
17
Zeilen mit Python UDTFs explodieren lassen
Standard-UDFs geben einen Wert pro Zeile zurück, aber was ist, wenn du mehrere Zeilen benötigst? Lerne, wie Python User-Defined Table Functions (UDTFs) komplexe One-to-Many-Generierungsprobleme lösen.
4m 33s
18
Die Pandas API auf Spark
Skaliere deine bestehenden Pandas-Skripte ins Unendliche. Entdecke, wie die pyspark.pandas API es dir ermöglicht, Standard-Pandas-Syntax nativ auf einem verteilten Spark-Cluster auszuführen.
4m 28s
19
Laden und Staunen: Speicherformate
Nicht alle Dateiformate sind gleich. Wir stellen zeilenbasierte CSVs spaltenbasierten Formaten wie Parquet und ORC gegenüber und untersuchen Lese-/Schreiboptionen sowie optimale Speichertechniken.
4m 12s
20
Bug Busting: Physische Pläne und Joins
Wirf einen Blick unter die Haube der Ausführungs-Engine von Spark. Lerne, wie man Abfragen mit DataFrame.explain() debuggt und wie man kostspielige Shuffles durch die Verwendung von Broadcast Joins eliminiert.
3m 23s
21
Profiling von PySpark-Speicher und -Leistung
Wir schließen unsere PySpark-Reise ab, indem wir native Profiling-Tools vorstellen. Lerne, wie man den Speicherverbrauch Zeile für Zeile verfolgt und versteckte interne Python-Tracebacks aufdeckt.
4m 10s

Episoden

1

Das Big-Data-Problem & das Versprechen von PySpark

3m 54s

Wir klären die grundlegende Notwendigkeit von PySpark. Entdecke, warum Standard-Python-Bibliotheken wie Pandas bei großen Datenmengen versagen und wie PySpark eine verteilte Ausführungs-Engine bietet, um riesige Datensätze nahtlos zu verarbeiten.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 1 von 21. Dein Standard-Python-Script läuft im Testing einwandfrei, aber sobald dein Dataset fünfzig Gigabyte erreicht, stürzt es mit einem OutOfMemory-Error ab. Du hast die physischen Grenzen einer einzelnen Maschine erreicht. Die Lösung für diesen Bottleneck ist das Thema dieser Folge: das Big-Data-Problem und das Versprechen von PySpark. Standard-Python-Data-Tools sind für Single-Node-Execution gebaut. Libraries wie pandas sind unglaublich effizient, aber sie erfordern, dass das gesamte Dataset im lokalen Memory liegt. Wenn dein Server sechzehn Gigabyte RAM hat und du versuchst, fünfzig Gigabyte an Application Logs zu laden, greift das Operating System ein und killt den Prozess. Vertikales Skalieren durch das Mieten eines größeren, teureren Servers zögert das Unvermeidliche nur hinaus. Daten wachsen schneller als Hardware-Upgrades. Irgendwann sprengen die Daten die Kapazität der Maschine. PySpark löst diese Einschränkung. Es ist die Python-API für Apache Spark. Apache Spark selbst ist eine Distributed Computing Engine, die auf der Java Virtual Machine läuft. PySpark fungiert als Brücke und ermöglicht es dir, deine Logik rein in Python zu schreiben und gleichzeitig die Vorteile von Sparks hochoptimierter Distributed Engine zu nutzen. Dadurch verschiebt sich deine Architektur von vertikalem Scaling zu horizontalem Scaling. Anstatt dich auf eine einzige riesige Maschine zu verlassen, partitioniert PySpark deine Daten und verteilt deine Computations über einen Cluster aus vielen kleineren Maschinen, die als Nodes bekannt sind. Du schreibst deinen Python-Code, und PySpark übersetzt ihn in einen Parallel Execution Plan. Wenn sich dein Datenvolumen nächsten Monat verdoppelt, musst du keine einzige Zeile Code neu schreiben. Du fügst dem Cluster einfach weitere Nodes hinzu. Das PySpark-Ecosystem ist in einige Core-Module für unterschiedliche Workloads unterteilt. Das erste ist Spark SQL. Das ist die Foundation für die meisten modernen PySpark-Applications. Es bietet eine DataFrame-Struktur für das Handling von tabellarischen Daten, die über mehrere Maschinen verteilt sind. Außerdem ermöglicht es dir, Standard-SQL-Queries direkt auf diesen verteilten Datasets auszuführen. Als Nächstes kommt Structured Streaming. Dieses Modul übernimmt Real-Time Data Pipelines. Anstatt über Nacht einen riesigen Batch an Daten zu verarbeiten, verarbeitet Structured Streaming kontinuierlich Flows von Records, wie Live-Sensor-Readings oder Web-Traffic-Events. Es nutzt exakt dasselbe Programming Model wie Spark SQL, was bedeutet, dass deine Batch-Processing-Logik und deine Streaming-Logik fast identisch aussehen. Dann gibt es noch MLlib, die Machine Learning Library. Das Trainieren von Models auf riesigen Datasets auf einer einzelnen Maschine ist ein berüchtigter Bottleneck. MLlib bietet Distributed-Machine-Learning-Algorithmen für Tasks wie Classification, Regression und Clustering. Es verteilt die schweren mathematischen Operationen über den gesamten Cluster und reduziert so die Training Time drastisch. Hier ist die Key Insight: Die wahre Power von PySpark ist Abstraction. Du musst deine riesigen Files nie manuell in Chunks slicen. Du schreibst nie Networking-Code, um die Server zu koordinieren. Du definierst einfach eine logische Sequenz von Transformations, und die zugrunde liegende Engine übernimmt die Data Distribution, die Parallel Execution und sogar den Recovery Process, falls ein Node mitten in der Computation den Strom verliert. PySpark ist nicht einfach nur ein Utility zum Öffnen größerer Files. Es ist ein fundamentaler Shift von Computing, das durch ein einzelnes Motherboard limitiert ist, hin zu Computing, das nur noch durch die Größe deines Clusters limitiert wird. Wenn du diese Episoden hilfreich findest und die Show unterstützen möchtest, kannst du auf Patreon nach DevStoriesEU suchen. Das war's für diese Folge. Danke fürs Zuhören und keep building!
2

Die Spark Connect Revolution

3m 52s

Erkunde die Spark Connect Architektur. Wir erklären, wie PySpark Client und Server entkoppelt hat, sodass du Spark-Anwendungen überall ohne sperrige JVM-Abhängigkeiten ausführen kannst.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 2 von 21. Jahrelang bedeutete das lokale Schreiben von PySpark-Code, eine riesige, schwere Java Virtual Machine mitzuschleppen, nur um ein einfaches Script zu testen. Du musstest Python-Versionen, Java Configurations und Cluster Dependencies perfekt synchronisieren, bevor du auch nur eine einzige Zeile Logik schreiben konntest. Die Spark Connect Revolution macht das komplett überflüssig. Traditionell basierte PySpark auf einer eng gekoppelten Architektur. Dein Python-Script und die Spark Execution Engine mussten auf exakt derselben physischen oder virtuellen Maschine laufen. Eine PySpark-Session zu starten hieß, im Hintergrund über eine Bridge Library eine Java Virtual Machine hochzufahren. Diese Architektur belastete deine lokale Development Environment mit dem vollen Gewicht der Spark Execution Engine. Dadurch war es extrem unpraktisch, PySpark in Web Applications, moderne Code-Editoren oder Edge Devices einzubetten. Spark Connect löst das durch die Einführung einer entkoppelten Client-Server-Architektur. Deine Python-Umgebung ist jetzt strikt vom Spark-Server getrennt. Der lokale PySpark-Client wird zu einer schlanken Library. Er braucht keine lokale Java-Installation mehr und führt selbst keine Data Processing Tasks aus. Er fungiert rein als Remote Interface zum eigentlichen Spark-Cluster. Hier ist der entscheidende Punkt. Wenn du DataFrame Operations mit Spark Connect schreibst, zeichnet der schlanke Client deine Method Calls auf und übersetzt sie in einen Unresolved Logical Plan. Du kannst dir diesen Plan wie eine abstrakte Blaupause deiner Query vorstellen, die strikt beschreibt, welche Daten verarbeitet werden sollen, ohne sich darum zu kümmern, wie sie verarbeitet werden. Der Client verpackt diese Blaupause mit Protocol Buffers und überträgt sie über eine gRPC-Netzwerkverbindung an den Remote-Spark-Server. Der Server entpackt den Plan, übernimmt die gesamte komplexe Query Optimization, führt den Job im Cluster aus und streamt die berechneten Results schließlich zurück an dein Python-Script. Das Setup erfordert eine kleine Änderung daran, wie du deine Application startest. Du verwendest weiterhin den SparkSession-Builder, aber anstatt dich auf lokale Configurations zu verlassen, rufst du die Remote-Methode auf. Du übergibst einen Connection String, der genau angibt, wo der Spark-Server liegt. Dieser String verwendet ein spezielles Connection Scheme, das mit den Buchstaben s c beginnt. Wenn du dich also mit einem lokalen Testserver auf dem Default-Port verbindest, übergibst du den String s c Doppelpunkt Slash Slash localhost Doppelpunkt eins fünf null null zwei. Nach diesem einzigen Connection-Schritt schreibst du deinen DataFrame-Code genauso, wie du es immer getan hast. Weil die Execution komplett remote abläuft, kannst du mehrere verschiedene Python-Clients aus unterschiedlichen Applications gleichzeitig mit exakt demselben Spark-Server verbinden. Dein Application-Code fordert einfach nur Data Transformations an, und das Heavy Lifting bleibt komplett auf der Server-Seite. Indem der Python-Client komplett von der Execution Runtime isoliert wird, eliminiert Spark Connect die berüchtigten Dependency-Konflikte, die früher Deployments kaputt gemacht haben. So kannst du deine Application Environments völlig unabhängig vom eigentlichen Spark-Cluster upgraden. Danke, dass du ein paar Minuten mit mir verbracht hast. Bis zum nächsten Mal, mach's gut.
3

DataFrames und Lazy Evaluation

4m 18s

Tauche ein in die grundlegende Abstraktion von PySpark: den DataFrame. Wir diskutieren das Konzept der Lazy Evaluation, den Unterschied zwischen Transformationen und Aktionen und warum Spark plant, bevor es ausführt.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 3 von 21. Was wäre, wenn dein Code nicht sofort ausgeführt wird, wenn du ihn schreibst, sondern stattdessen wartet, dein Endziel analysiert und die schnellstmögliche Route plant? Du verkettest Filter, Aggregationen und Joins, und deine Maschine kommt dabei kaum ins Schwitzen. Das liegt daran, dass sie gar nichts tut, bis du sie dazu zwingst. Dieser Mechanismus nennt sich Lazy Evaluation und ist die Core-Engine hinter PySpark DataFrames. Ein PySpark DataFrame ist eine verteilte Collection von Daten, die in benannten Columns organisiert ist. Wenn du mit pandas vertraut bist, fühlt sich das Konzept identisch an. Der Unterschied ist, dass ein PySpark DataFrame seine Daten auf mehrere Compute Nodes in einem Cluster aufteilt. Historisch gesehen war die grundlegende Struktur in Spark das Resilient Distributed Dataset, allgemein bekannt als RDD. Das Ökosystem hat sich stark von der direkten RDD-Manipulation wegbewegt. Tatsächlich wird die direkte RDD-Nutzung in Spark Connect ab Spark-Version 4.0 nicht mehr unterstützt. DataFrames sind jetzt der definitive Standard und bieten eine strikte API, die es Spark erlaubt, deine Queries automatisch zu optimieren. Diese Optimierung verlässt sich komplett auf Lazy Evaluation. Jede Operation, die du auf einem DataFrame ausführst, fällt in eine von zwei strikten Kategorien: eine Transformation oder eine Action. Transformations sind Befehle, die einen neuen DataFrame zurückgeben. Beispiele dafür sind das Auswählen bestimmter Columns, das Filtern von Rows basierend auf einer Condition, das Gruppieren von Records oder das Joinen von zwei separaten Tabellen. Wenn du eine Transformation anwendest, führt PySpark die Datenverarbeitung nicht aus. Es zeichnet die Operation einfach nur auf. Es updatet einen internen Blueprint, den sogenannten Logical Execution Plan. Du kannst fünfzig Transformations hintereinander schreiben, und Spark wird einfach nur schnell die Syntax validieren und seinen Graph updaten. Hier ist die entscheidende Erkenntnis. Indem die tatsächliche Execution verzögert wird, gibt PySpark seiner zugrundeliegenden Query-Engine, dem Catalyst Optimizer, das komplette Bild deiner Data Pipeline. Der Optimizer inspiziert deine gesamte Chain an Transformations, ordnet sie für maximale Effizienz neu an und wirft unnötige Schritte komplett raus, bevor auch nur ein einziges Byte an Daten von der Disk gelesen wird. Dieser Blueprint bleibt komplett inaktiv, bis du eine Action aufrufst. Eine Action ist ein Befehl, der ein konkretes Ergebnis verlangt. Sie gibt entweder Daten an dein Driver-Programm zurück oder schreibt Daten in den Storage. Gängige Actions sind das Zählen der Gesamtzahl der Rows, das Collecten der Daten zurück in eine lokale Python-Liste oder der Befehl an das System, die Top-20-Records auf deinem Bildschirm anzuzeigen. In dem Moment, in dem du eine Action triggerst, legt die Engine los. Sie übersetzt deinen optimierten Logical Plan in einen Physical Plan, verteilt die Tasks an die Cluster-Worker und führt die Computation aus. Stell dir einen Standard-Data-Workflow vor. Zuerst erstellst du einen DataFrame, indem du auf eine riesige Datei verweist. Dann joinst du ihn mit einer separaten Tabelle mit User-Details. Nach dem Join filterst du die Results, um nur User aus einer bestimmten Stadt zu behalten. Schließlich bittest du Spark, den Output anzuzeigen. Dank der Lazy Evaluation lädt Spark nicht einfach die komplette Datei, führt einen massiven verteilten Join durch und filtert die Results erst ganz am Ende. Stattdessen schaut sich der Optimizer deinen finalen Request an, bemerkt den Filter und pusht diese Filter-Operation in der Chain nach oben, lange bevor der Join passiert. Er liest selektiv nur die relevanten Records und reduziert so die Memory Usage und den Network Traffic im gesamten Cluster drastisch. Dein PySpark-Skript ist niemals eine Sequenz von sofortigen Befehlen. Es ist ein Set von Instructions, das einen Architektur-Blueprint entwirft, und das System beginnt erst mit der Konstruktion, wenn du schließlich das Endresultat verlangst. Das war's für heute. Danke fürs Zuhören – leg los und bau etwas Cooles.
4

Erstellen und Anzeigen von DataFrames

4m 06s

Lerne, wie man DataFrames aus rohen Python-Objekten, Dictionaries und Dateien instanziiert und wie du deine verteilten Daten sicher überprüfen kannst, ohne deinen Driver-Node zum Absturz zu bringen.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 4 von 21. Eine bestimmte Methode auf einem riesigen Dataset aufzurufen, ist ein garantierter Weg, deine gesamte Anwendung sofort mit einem Out-of-Memory Error zum Absturz zu bringen. Es ist extrem wichtig zu wissen, wie du Daten sicher in und aus Spark bewegst, ohne deinen Driver Node zu überlasten. Genau darum geht es in dieser Folge: das Erstellen und Anzeigen von DataFrames. Jede PySpark-Anwendung braucht Daten, mit denen sie arbeiten kann. Generell erstellst du DataFrames auf drei Arten. Erstens kannst du sie direkt aus In-Memory Python-Strukturen erstellen. Du definierst einfach eine Liste von Dictionaries, wobei jedes Dictionary eine Zeile repräsentiert und die Keys die Spaltennamen sind, und übergibst sie an die createDataFrame Methode deiner SparkSession. Zweitens: Wenn du bereits ein pandas DataFrame In-Memory hast, kannst du genau dieses pandas Objekt an dieselbe createDataFrame Methode übergeben. PySpark übernimmt die Konvertierung automatisch. Der dritte und häufigste Weg ist das Lesen aus externen Files. Dafür nutzt du das read Attribut deiner SparkSession, gefolgt von dem Format, das du haben willst, wie csv oder json, und gibst den File Path an. Sobald deine Daten geladen sind, musst du sie überprüfen. PySpark DataFrames sind distributed. Das heißt, du kannst nicht einfach die Variable printen und die Daten sehen, wie du es in einem normalen Python-Skript machen würdest. Um die Struktur deiner Daten zu sehen, rufst du die printSchema Methode auf. Das gibt einen textbasierten Tree aus, der jeden Spaltennamen und den dazugehörigen Datentyp zeigt. Das ist der schnellste Weg, um zu checken, ob dein File richtig geladen wurde. Um den tatsächlichen Inhalt zu sehen, nutzt du die show Methode. By default zeigt der Aufruf von show die ersten zwanzig Zeilen in einem tabellarischen Format an. Pass an dieser Stelle gut auf. Wenn deine Spalten lange Strings enthalten, schneidet die show Methode sie ab. Du kannst das deaktivieren, indem du ein truncate Argument übergibst und auf false setzt, oder es auf eine bestimmte Anzahl von Zeichen festlegst. Wenn dein DataFrame Dutzende von Spalten hat, bricht die Standard-Tabellenansicht auf dem Bildschirm um und wird unlesbar. In dem Fall kannst du das vertical Argument übergeben und auf true setzen. Das printet jede Zeile als vertikalen Block von Key-Value-Paaren, was breite Datasets im Terminal viel leichter lesbar macht. Jetzt kommen wir zu dem vorhin erwähnten Out-of-Memory Crash. Manchmal musst du die distributed Daten zurück in ganz normale Python-Objekte holen. Die Methode dafür heißt collect. Hier ist die wichtigste Erkenntnis: Die collect Methode nimmt jede einzelne Zeile von jedem Executor in deinem gesamten Cluster und zwingt sie in den Memory deines einzelnen Driver Nodes. Wenn dein DataFrame eine Milliarde Zeilen enthält, läuft deinem Driver der Memory voll und er stürzt sofort ab. Du solltest collect wirklich nur dann aufrufen, wenn du deine Daten aggregiert oder auf eine kleine Größe heruntergefiltert hast. Wenn du mit großen Datasets arbeitest, extrahiere immer kleinere Samples. Nutze anstelle von collect die take Methode und übergib die Anzahl der Zeilen, die du haben willst. Das gibt eine Standard-Python-Liste zurück, die nur diese ersten paar Zeilen enthält. Wenn du das Ende deines Datasets checken musst, nutze die tail Methode, um dir die letzten paar Zeilen zu schnappen. Beide Methoden limitieren sicher die Datenmenge, die an deinen Driver übertragen wird. Die Regel für distributed Daten ist simpel: Schiebe Computations raus in den Cluster, aber limitiere streng die Anzahl der Zeilen, die du zurück zum Driver ziehst. Das war's für diese Folge. Danke fürs Zuhören und keep building!
5

Grundlegende Datentypen meistern

4m 23s

Ein Rundgang durch die grundlegenden numerischen und String-Typen von PySpark. Wir untersuchen, wie man Schemata mit StructType und StructField für robuste Datenpipelines explizit definiert.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark-Grundlagen, Folge 5 von 21. Sich auf automatische Schema Inference zu verlassen, spart dir vielleicht ein paar Zeilen Code, aber es kostet dich in der Production-Performance richtig viel. Der Cluster muss oft deinen gesamten Datensatz lesen, nur um zu erraten, was drin steckt, bevor er überhaupt echte Arbeit leistet. Das löst du, indem du grundlegende Datentypen und explizite Schemas beherrschst. Es passiert oft, dass man Standard-Python-Typen mit PySpark-Datentypen verwechselt. Wenn du in Standard-Python einen Integer oder einen String deklarierst, lebt dieses Objekt im Memory deiner lokalen Maschine. PySpark-Typen arbeiten auf einer völlig anderen Ebene. Sie sind Mapping-Anweisungen für den Catalyst Optimizer und die zugrunde liegende Java Virtual Machine. Wenn du PySpark-Datentypen verwendest, definierst du eine strikte, cluster-aware Struktur. Das garantiert Datenkonsistenz über Hunderte von verteilten Worker Nodes hinweg und diktiert genau, wie Daten über das Netzwerk serialisiert werden. PySpark bietet für jede Standard-Datenform einen spezifischen Typ, und die Auswahl des richtigen Typs ist entscheidend für die Performance. Für Zahlen hast du ByteType für sehr kleine Integer, IntegerType für Standardzahlen und LongType für große Werte. ByteType statt LongType für einen einfachen Statuscode auszuwählen, spart massiv Memory, wenn sich diese Entscheidung über Milliarden von Rows multipliziert. Für Text und Logik verwendest du StringType und BooleanType. Zeit korrekt zu handhaben, ist ein weiterer Bereich, in dem exaktes Typing wichtig ist. PySpark unterteilt Zeitdaten in DateType und TimestampType. Du verwendest DateType, wenn dich nur das Kalenderdatum interessiert, wie beim Geburtstag eines Users. Du verwendest TimestampType, wenn du exakte Zeitpunkte brauchst und sowohl das Datum als auch die genaue Stunde, Minute und Sekunde eines Events tracken musst. Diese Typen zu kennen, ist nur die Grundlage. Du musst sie über ein explizites Schema direkt auf deinen Data Ingestion Prozess anwenden. Du baust dieses Schema mit zwei spezifischen Objekten: StructType und StructField. Du kannst dir einen StructType als Blueprint für eine komplette Row in deinem Dataframe vorstellen. Ein StructField ist der Blueprint für eine einzelne Column innerhalb dieser Row. Um ein explizites Schema zu bauen, instanziierst du einen StructType und übergibst ihm eine Collection von StructFields. Jedes StructField benötigt drei spezifische Argumente. Erstens übergibst du den Column-Namen als Standard-String. Zweitens übergibst du den spezifischen PySpark-Datentyp, den du erzwingen willst, wie IntegerType oder StringType. Drittens übergibst du ein Boolean-Flag, das angibt, ob diese Column Null-Values enthalten darf. Zum Beispiel baust du ein Schema, das mit einem StructField namens user identifier beginnt, einem StringType zugewiesen wird und das Null-Flag auf false setzt. Danach folgt ein StructField namens account age, das einem IntegerType zugewiesen wird, wobei das Null-Flag auf true gesetzt ist. Sobald dieses StructType-Objekt komplett zusammengebaut ist, übergibst du es über die schema-Methode direkt an deinen Dataframe-Reader, bevor du den load-Befehl aufrufst, um deine Files zu lesen. Das ist der Teil, auf den es ankommt. Wenn du dieses explizite Schema vorab bereitstellst, überspringt PySpark die Data-Scanning-Phase komplett. Es wendet deinen Blueprint direkt auf den eingehenden Data Stream an. Das reduziert die Zeit, um ein File zu lesen, drastisch. Es fungiert auch als sofortiges Quality Gate. Wenn ein fehlerhaftes File mit Text in deiner Integer-Column ankommt, verarbeitet die Pipeline es basierend auf deiner definierten Struktur, anstatt das inferred Schema stillschweigend downstream zu verschieben und deine Transformations kaputt zu machen. Dein Schema explizit zu definieren, verwandelt eine fragile, teure Read-Operation in einen berechenbaren, hochoptimierten Pipeline-Schritt. Danke fürs Zuhören, Happy Coding zusammen!
6

Die Tücken der Präzision

4m 49s

Entdecke die entscheidenden Unterschiede zwischen FloatType, DoubleType und DecimalType. Lerne, warum die Wahl des falschen numerischen Typs zu katastrophalen Rundungsfehlern in Finanzdaten führen kann.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 6 von 21. Einen Standard-Float zu verwenden, mag harmlos erscheinen, bis deine Aggregation Query unbemerkt Millionen bei Finanztransaktionen falsch berechnet. Code, der perfekt läuft, kann Zahlen erzeugen, die leicht, aber gefährlich falsch sind. Genau deshalb müssen wir über die Tücken der Precision sprechen. In PySpark hast du drei Hauptmöglichkeiten, Zahlen mit Nachkommastellen zu speichern. Du hast FloatType, DoubleType und DecimalType. Sie sind nicht austauschbar. Ein häufiger Fehler ist es, PySpark ein Schema aus deinen Rohdaten ableiten zu lassen. Diese Inference weist normalerweise jeder Zahl mit einem Dezimalpunkt den DoubleType zu. Wenn du finanzielle Umsätze berechnest, ist es ein ernsthaftes operationelles Risiko, dich auf dieses Default-Verhalten zu verlassen. Um zu verstehen, warum das so ist, müssen wir uns ansehen, wie FloatType und DoubleType unter der Haube funktionieren. FloatType verwendet 32-Bit IEEE 754 Floating-Point-Mathematik. DoubleType nutzt die 64-Bit-Version desselben Standards. Beide stellen Zahlen als binäre Brüche dar. Denk mal daran, wie der Bruch ein Drittel nicht perfekt als Dezimalzahl im Zehnersystem geschrieben werden kann. Er wird zu einem endlosen String von Dreien. Genau dieselbe Einschränkung gibt es im Binärsystem. Gängige Dezimalzahlen wie null Komma eins oder null Komma zwei können im Zweiersystem nicht perfekt dargestellt werden. Der Computer speichert eine winzige Approximation. Mit DoubleType hast du 64 Bit Platz, was bedeutet, dass die Approximation unglaublich nah an der echten Zahl liegt. Wenn du eine einzelne Datenzeile abfragst, wirst du den Unterschied kaum bemerken. Hier ist die entscheidende Erkenntnis: Der Fehler summiert sich bei Aggregationen auf. Wenn du die gesamten finanziellen Umsätze berechnest, indem du Milliarden einzelner Zeilen aufsummierst, addieren sich diese mikroskopischen Ungenauigkeiten. Ein Bruchteil eines Cents, der bei jeder Transaktion verloren geht oder dazukommt, verfälscht am Ende das finale Aggregat um Tausende oder sogar Millionen von Dollar. Deine Aggregation-Logik ist mathematisch einwandfrei, aber der zugrundeliegende Datentyp verfälscht das Ergebnis. Wenn dein System Physiksimulationen berechnet oder Machine-Learning-Modelle trainiert, sind FloatType und DoubleType genau das, was du willst. Sie tauschen Exaktheit gegen hohe Hardware-Verarbeitungsgeschwindigkeit. Aber in dem Moment, in dem du mit Geld hantierst, brauchst du strikte, kompromisslose Genauigkeit. Das bringt uns zum DecimalType. DecimalType verwendet keine Floating-Point-Approximationen. Er speichert Zahlen exakt so, wie du sie definierst, mit einer festen Scale. Wenn du einen DecimalType konfigurierst, definierst du zwei verschiedene Parameter. Erstens gibst du die Precision an, also die maximale Gesamtzahl an Ziffern, die der Wert fassen kann. Zweitens gibst du die Scale an, die die genaue Anzahl der erlaubten Ziffern rechts vom Komma diktiert. Wenn du einen DecimalType mit einer Precision von zehn und einer Scale von zwei einrichtest, reserviert PySpark den exakt benötigten Speicherplatz, um diesen Wert bis auf den Cent genau zu speichern. Es gibt keine binären Brüche und keine Rundungsschätzungen. In der Praxis implementierst du das, indem du die strikte Kontrolle über deine Schemas übernimmst. Wenn du Finanzdatensätze aus einer Quelldatei liest, lass PySpark die Typen nicht erraten. Zuerst erstellst du ein striktes Schema-Objekt. Dann definierst du deine Finanzfelder wie Umsatz oder Steuern. Schließlich weist du ihnen explizit einen DecimalType mit deiner gewählten Precision und Scale zu. Sobald dein DataFrame mit diesem Schema lädt, werden deine Standard-Summen- oder Durchschnitts-Aggregationen von der ersten bis zur milliardsten Zeile perfekt ausgeführt. Du opferst zwar ein bisschen Compute-Performance im Vergleich zu einem Standard-DoubleType, aber du garantierst, dass dein Finanz-Reporting absolut fehlerfrei ist. Die Regel ist einfach: Nutze Floating-Point-Typen für Geschwindigkeit und wissenschaftliche Approximationen, aber in dem Moment, in dem eine Zahl eine Währung darstellt, sichere sie mit einem DecimalType ab. Danke fürs Einschalten. Bis zum nächsten Mal!
7

Komplexe und verschachtelte Daten bändigen

4m 17s

Big Data ist nicht immer flach. Wir erkunden die komplexen Datentypen von PySpark, einschließlich ArrayType, StructType und MapType, mit denen du tief verschachteltes JSON nativ parsen kannst.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 7 von 21. In der Praxis ist Real-World Big Data selten ein flaches Spreadsheet. Manchmal brauchst du ein Array aus verschachtelten Dictionaries, nur um ein einziges JSON-Event zu parsen. Um das zu bewältigen, müssen wir darüber sprechen, wie wir komplexe und verschachtelte Daten bändigen. Relationale Workflows bevorzugen flache Tabellen, aber moderne Event-Daten kommen oft stark verschachtelt an. PySpark löst das, indem es drei komplexe Datentypen bereitstellt. Das sind ArrayType, StructType und MapType. Damit kannst du hierarchische Strukturen nativ in der Engine explizit modellieren. Schauen wir uns ein Standard-Kundenprofil an, um zu sehen, wie diese Typen funktionieren. Das erste Konzept ist der ArrayType. Er repräsentiert eine Collection von Elementen. Die strikte Regel ist, dass jedes Item innerhalb eines ArrayTypes exakt denselben zugrunde liegenden Datentyp haben muss. Du kannst Strings und Integers nicht im selben Array mischen. Wenn dein Kundenprofil eine Liste der letzten Order-IDs enthält, definierst du diese Column als ArrayType, der Integers enthält. Als Nächstes kommt der StructType. Ein StructType modelliert einen verschachtelten hierarchischen Record und funktioniert im Grunde wie eine Row, die in eine andere Row eingebettet ist. Er enthält spezifische, benannte Fields. Im Gegensatz zu einem Array kann jedes Field innerhalb eines StructTypes einen völlig anderen Datentyp haben. Angenommen, dein Kunde hat eine Adresse. Diese Adresse enthält einen Straßennamen als String, eine Postleitzahl als Integer und ein Boolean-Flag, das angibt, ob es sich um ein Gewerbeobjekt handelt. Du bündelst diese unterschiedlichen Fields zusammen in einen StructType. Hier ist die wichtigste Erkenntnis: Du kannst diese komplexen Typen beliebig tief verschachteln. Wenn ein Kunde mehrere Adressen hat, erstellst du keine flachen, nummerierten Columns. Stattdessen erstellst du einen ArrayType, bei dem der interne Elementtyp genau dieser Adress-StructType ist. Du hast jetzt ein Array von Structs, das perfekt auf ein Standard-JSON-Array von Objects mappt. Die dritte Struktur ist der MapType, der speziell für Key-Value-Pairs entwickelt wurde. Er unterscheidet sich von einem StructType darin, wie er Structure im Vergleich zum Schema behandelt. Bei einem StructType musst du die genauen Field-Namen im Voraus hardcoden. Ein MapType ist flexibel bei seinen Dateninhalten, aber strikt bei seinen Datentypen. Jeder Key in der Map muss von einem bestimmten Typ sein, und jeder Value muss von einem anderen bestimmten Typ sein. Du könntest einen MapType verwenden, um die Application-Preferences eines Kunden zu speichern. Die Keys könnten Strings sein, wie zum Beispiel Theme oder Language, und die Values könnten ebenfalls Strings sein, wie Dark oder English. Weil es ein MapType ist, kann die Upstream-Application später völlig neue Preference-Keys injecten, ohne dich zu zwingen, das Core-DataFrame-Schema zu ändern. Du fragst die Values einfach dynamisch über ihre Keys ab. Wenn du dieses komplexe Schema in deinem Code konstruierst, baust du es von innen nach außen auf. Zuerst definierst du die inneren Fields des Adress-StructTypes. Dann übergibst du dieses fertige Struct an eine ArrayType-Definition. Als Nächstes definierst du den MapType für die User-Preferences. Schließlich wrappst du all diese Komponenten, zusammen mit einfachen skalaren Typen wie dem Customer-Name-String, in einen Master-StructType, der die übergeordnete DataFrame-Row definiert. Anstatt verschachtelte Strukturen in unübersichtliche JSON-Strings zu flatten, erlaubt die explizite Definition dieser komplexen Schemas dem Spark-Optimizer, Daten zu prunen und tief in verschachtelten Fields zu filtern, ohne die gesamte Payload in den Memory zu deserialisieren. Danke fürs Zuhören – bis zum nächsten Mal.
8

Type Casting und Selektion

3m 56s

Lerne, wie du deine DataFrame-Schemata aktiv formst. Wir behandeln, wie man Teilmengen von Spalten auswählt und wie man Spalten sicher von einem Datentyp in einen anderen castet.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 8 von 21. Ein einfacher String-Wert, der sich in einer Integer-Column versteckt, kann einen Cluster mit tausend Nodes komplett lahmlegen. Du brauchst einen zuverlässigen Weg, um korrekte Datenstrukturen zu erzwingen und genau auszuwählen, welche Daten durch deine Pipeline fließen. Deshalb schauen wir uns heute Type Casting und Selection an. Um Daten in PySpark zu manipulieren, musst du zuerst verstehen, was eine Column eigentlich ist. Eine Column-Instanz ist kein physisches Array von Daten, das in den Memory geladen wird. Sie ist eine lazily evaluated Repräsentation einer Expression. Wenn du in deinem Code eine Column referenzierst, fasst du die zugrundeliegenden Daten nicht an. Du fügst lediglich einen Schritt zu Sparks Logical Plan hinzu. Die Daten bewegen sich erst, wenn später eine Action getriggert wird. Um diese Daten abzurufen und zu formen, verwendest du die select-Methode auf deinem DataFrame. Du hast zwei Hauptwege, der select-Methode mitzuteilen, welche Columns du haben möchtest. Der einfachste Weg ist, die Column-Namen als Standard-Text-Strings zu übergeben. Wenn du einen String an select übergibst, gibt Spark einen neuen DataFrame zurück, der genau diese Column komplett unverändert enthält. Das funktioniert gut für eine einfache Extraktion, bietet aber keinen Spielraum für Modifikationen. Um die Daten während der Selection zu modifizieren, musst du Column-Objekte anstelle von Strings verwenden. Du greifst auf ein Column-Objekt zu, indem du es direkt vom DataFrame aus referenzierst. Du kannst das mit der Dot Notation machen, wie zum Beispiel dataframe dot age, oder mit der Bracket Notation, bei der der Column-Name als String in den Brackets steht. Die Bracket Notation ist besonders nützlich, wenn deine Column-Namen Leerzeichen oder Sonderzeichen enthalten, die die Standard Dot Notation kaputt machen würden. Das ist der entscheidende Punkt. Wenn du ein Column-Objekt in die select-Methode übergibst, kannst du Methoden daran anhängen, um die Daten on the fly zu transformieren. Eine der wichtigsten Transformationen ist die Type Conversion. Daten kommen oft im falschen Format an. Zum Beispiel könntest du numerische Metriken erhalten, die als Text-Strings formatiert sind. Um das zu korrigieren, verwendest du die cast-Methode. PySpark bietet auch einen Alias namens astype, der exakt dieselbe Logik ausführt. Du rufst die cast-Methode direkt auf deinem Column-Objekt innerhalb des select-Statements auf. Die cast-Methode benötigt ein Argument, nämlich den Ziel-Datentyp. Du kannst dieses Ziel definieren, indem du eine String-Repräsentation des Typs übergibst, wie das Wort int, oder indem du ein spezifisches Spark-Datentyp-Objekt übergibst, wie IntegerType. So läuft das in einem echten Script ab. Du rufst die select-Methode auf deinem DataFrame auf. Innerhalb der Klammern dieser Methode referenzierst du deine Ziel-Column mit der Bracket Notation. Direkt neben dieser Column-Referenz rufst du dot cast auf und übergibst deinen neuen Typ. Wenn das evaluiert wird, gibt das einen komplett neuen DataFrame zurück, in dem deine ausgewählte Column nun sicher in den angegebenen Typ konvertiert ist. Der ursprüngliche DataFrame bleibt komplett unangetastet, weil DataFrames immutable sind. Das Key Takeaway ist, dass Type Casting in PySpark kein eigenständiger Prozess ist, der in place auf ein existierendes Dataset angewendet wird. Es ist eine lazily evaluated Column Expression, die untrennbar mit dem Vorgang der Selection von Daten verbunden ist, um einen neuen, strongly typed DataFrame zu bauen. Wenn dir der Podcast gefällt und du die Show unterstützen möchtest, kannst du auf Patreon nach DevStoriesEU suchen. Das war's für diese Folge. Danke fürs Zuhören und keep building!
9

Function Junction: Schmutzige Daten bereinigen

4m 03s

Garbage in, garbage out. Lerne die wesentlichen DataFrame-Transformationen kennen, um Nullwerte zu entfernen, fehlende Werte aufzufüllen und NaN-Datensätze nativ in verteilten Systemen zu behandeln.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 9 von 21. Garbage in, garbage out. Aber was machst du, wenn dein Garbage-Dataset hunderte Terabyte groß ist und du nicht eine einzige Row manuell überprüfen kannst? Du brauchst einen systematischen Weg, um es at scale zu bereinigen. Genau das behandeln wir heute in Function Junction: Cleaning Dirty Data. Der erste Schritt bei der Bereinigung ist normalerweise, dein Schema zu standardisieren. Du bekommst oft Raw-Files mit Leerzeichen, Sonderzeichen oder Tippfehlern in den Headern. Nutze dafür die Methode withColumnRenamed. Du übergibst ihr einfach den alten String-Namen und den gewünschten neuen String-Namen. Wenn du mehrere Columns fixen musst, chainst du diese Methode sequenziell für jede Column, bevor du komplexe Transformations downstream anwendest. Bevor wir Bad Data entfernen, müssen wir eine häufige Verwirrung bezüglich Null und NaN in PySpark klären. Null bedeutet, dass ein Datenpunkt komplett fehlt. NaN steht für Not a Number, was ein undefiniertes mathematisches Ergebnis repräsentiert, wie zum Beispiel null durch null zu teilen. In purem Python erfordern diese ein separates Handling. PySpark gruppiert sie jedoch der Einfachheit halber zusammen. Wenn du die DataFrame NA-Functions nutzt, wertet Spark NaN-Werte als Nulls aus, um sie zu droppen oder zu füllen. Um Rows mit fehlenden Werten zu eliminieren, nutzt du die Methode NA dot drop. Wenn du diese Function komplett leer aufrufst, droppt sie jede Row, die ein Null oder NaN in irgendeiner einzelnen Column enthält. Dieser Approach ist bei breiten Datasets extrem destruktiv. Ein einziger fehlender Wert in einer optionalen Metadata-Column löscht eine Row mit ansonsten perfekten Transaction-Data aus. Um das zu verhindern, übergibst du eine Liste von Column-Namen an den subset-Parameter. PySpark wertet dann nur diese spezifischen, kritischen Columns aus, wenn es entscheidet, ob die Row gedroppt wird. Rows zu droppen ist durch Business Rules nicht immer erlaubt. Oft musst du fehlende Werte durch sichere Defaults ersetzen. Das erreichst du mit NA dot fill. Während du einen einzelnen Wert übergeben kannst, um alle Columns zu füllen, ist der bessere Approach, ein Dictionary zu übergeben. Die Dictionary-Keys repräsentieren die spezifischen Column-Namen, und die Values repräsentieren deine gewählten Replacements. Dieses Pattern erlaubt es dir, eine fehlende numerische Metric mit einer Null zu füllen, während du gleichzeitig eine fehlende Category durch einen Text-String wie unknown ersetzt. Das über ein Dictionary zu machen, wird in einem einzigen Pass ausgeführt, was extrem effizient ist. Schließlich könnten deine Daten komplett befüllt, aber trotzdem invalid sein. Outliers und physikalisch unmögliche Werte erfordern logisches Filtering. Du isolierst gute Daten mit der where-Methode, um nur die Rows zu behalten, die eine bestimmte Condition erfüllen. Für numerische oder Date-Boundaries ist die between-Methode dein bestes Tool. Du wählst deine Column aus, rufst between auf und übergibst die Lower und Upper Limits. Das ersetzt ausführliche Greater-Than- und Less-Than-Logik und macht deinen Code leichter lesbar. Jede Row, die außerhalb dieser Limits liegt, wird aus dem resultierenden DataFrame herausgefiltert. Hier ist die Key Insight. Die Reihenfolge ist extrem wichtig, wenn man at scale bereinigt. Benenne Columns immer zuerst um, um dein Schema festzulegen, droppe oder fülle fehlende Werte als Nächstes, um deine Data Types zu stabilisieren, und filtere Outliers ganz zum Schluss, nur wenn du weißt, dass die zugrunde liegenden Daten strukturell sauber sind. Das war's für diese Folge. Danke fürs Zuhören und keep building!
10

Daten transformieren und umformen

4m 13s

Übernimm die Kontrolle über die Form deiner Daten. Wir untersuchen, wie man neue Spalten mit mathematischen Funktionen generiert, String-Manipulationen durchführt und verschachtelte Arrays mit explode abflacht.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 10 von 21. Manchmal enthält eine einzelne Daten-Row ein Array versteckter Records – und du musst dieses Array auflösen, um es richtig analysieren zu können. Durch das Transformieren und Reshapen von Daten entpackst, formatierst und strukturierst du diese Informationen für das Downstream-Processing. Wenn du einen Dataframe in PySpark ändern musst, veränderst du die Daten nicht in-place. Dataframes sind immutable. Stattdessen erstellst du neue Versionen mit einer Methode namens withColumn. Diese Methode nimmt zwei Argumente. Das erste ist ein String, der den Namen der Column repräsentiert, die du erstellen oder ersetzen willst. Das zweite ist eine Column Expression, die die eigentlichen Daten definiert. Wenn du einen Namen übergibst, der bereits im Dataframe existiert, überschreibt PySpark die ursprüngliche Column. Wenn der Name komplett neu ist, hängt PySpark die neue Column rechts an dein Dataset an. Um zu definieren, was in diese neue Column kommt, nutzt du normalerweise die Built-in Functions von PySpark. Diese werden aus dem SQL Functions Module importiert und bieten hochoptimierte Operationen, die über deinen gesamten Cluster ausgeführt werden. Nehmen wir zum Beispiel die String-Manipulation. Textdaten aus externen Quellen sind selten perfekt formatiert. Vielleicht hast du eine Column mit Usernamen, die in einer unvorhersehbaren Mischung aus Groß- und Kleinbuchstaben geschrieben sind. Das kannst du fixen, indem du deine bestehende Column an eine Built-in Function wie lower übergibst, die den gesamten Text in Kleinbuchstaben umwandelt. Alternativ kannst du eine Capitalization Function nutzen, um sicherzustellen, dass der erste Buchstabe groß und der Rest klein geschrieben wird. In der Praxis baust du diese Operationen direkt in deine Dataframe-Transformations ein. Du rufst withColumn auf, benennst deine Target Column und weist ihr das Ergebnis der lower Function zu, die auf deine Input Column angewendet wird. PySpark evaluiert diese Expression für jede einzelne Row. Du kannst mehrere withColumn Calls aneinanderreihen, um mehrere Transformations sequenziell anzuwenden, und dabei jedes Mal den fortlaufend aktualisierten Dataframe an den nächsten Step übergeben. Der zweite Teil davon ist das Reshaping. Das Bereinigen von Strings ändert die Values, aber was passiert, wenn der grundlegende Shape deiner Daten die Analyse verhindert? Hier wird es interessant. Vielleicht bekommst du ein Dataset, in dem der Identifier einer Person in einer Column steht und ihre monatlichen Einkommen für das ganze Jahr in ein einziges Array in der benachbarten Column gepackt sind. Du kannst keine standardmäßigen relationalen Aggregations auf einem nested Array ausführen. Du brauchst jeden einzelnen Einkommenswert in einer eigenen Row, um Averages zu berechnen oder Minimums zu finden. Du löst dieses strukturelle Problem mit einer Built-in Function namens explode. Die explode Function verarbeitet speziell Arrays und Maps. Du rufst withColumn auf, gibst den Column-Namen an, den du für den Output willst, und übergibst die explode Function, die deine Array Column wrappt. PySpark führt das aus, indem es die ursprüngliche einzelne Row nimmt und sie aufreißt. Wenn das Einkommens-Array zwölf verschiedene Values enthält, generiert explode zwölf komplett separate Rows. Im neuen Dataframe enthält die Target Column nun einen einzelnen, flachen Einkommenswert pro Row anstelle einer List. Entscheidend ist, dass PySpark alle anderen Columns aus der ursprünglichen Row dupliziert. Der User Identifier wird exakt in alle zwölf neuen Rows kopiert. Die logische Beziehung zwischen dem User und seinem Einkommen bleibt perfekt intakt, aber die Daten sind jetzt flat. Du hast eine nested Struktur in eine lange Tabelle gereshaped, die bereit für Standard-Grouping- und Filtering-Operations ist. Die wahre Power von PySpark Transformations ist, dass Functions wie explode und lower nicht nur einzelne Values manipulieren; sie definieren einen logischen Computation Plan, der sofort skaliert, egal ob du hundert Rows oder hundert Milliarden Rows hast, ohne dass du jemals einen einzigen manuellen Loop schreiben musst. Das war's für diese Folge. Bis zum nächsten Mal!
11

Die Mechanik von Gruppierung und Aggregation

3m 46s

Meistere die Split-Apply-Combine-Strategie. Wir tauchen ein in das Gruppieren von Daten nach Schlüsseln und die Anwendung leistungsstarker Aggregationsfunktionen, um riesige Datensätze zusammenzufassen.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 11 von 21. Wenn du Milliarden von einzelnen Datensätzen vor dir hast, ist es unmöglich, sie Zeile für Zeile zu lesen. Um daraus echte Erkenntnisse zu ziehen, musst du sie zusammenfassen. Heute schauen wir uns genau an, wie das funktioniert: Die Mechanik von Grouping und Aggregation. Unter der Haube verarbeitet PySpark Aggregationen mit einer klassischen Datenstrategie namens Split-Apply-Combine. Dieses Pattern ist genau das, wonach es klingt. Zuerst teilt PySpark den riesigen Datensatz anhand eines von dir gewählten Keys in separate logische Buckets auf. Als Nächstes wendet es eine spezifische Berechnung auf jeden einzelnen Bucket unabhängig über den gesamten Cluster an. Schließlich kombiniert es diese unabhängigen Ergebnisse wieder zu einem einzigen, zusammengefassten Resultat. In deinem Code triggerst du die Split-Phase, indem du die group by Methode auf deinem DataFrame aufrufst. Du übergibst einfach den Namen der Spalte, die du als Grouping Key verwenden willst. Wenn du zum Beispiel eine riesige Tabelle mit historischen Transaktionen hast, könntest du nach der User Name Spalte gruppieren. Hier ist die wichtigste Erkenntnis. Der Aufruf von group by gibt keinen neuen DataFrame zurück. Stattdessen gibt er ein Übergangskonstrukt namens GroupedData Objekt zurück. Weil PySpark deinen Code lazy evaluiert, hat es bisher nur den Execution Plan für die Organisation dieser Buckets erstellt. Es wird nicht wirklich Daten bewegen, bis du ihm sagst, welche mathematische Operation auf diesen Buckets ausgeführt werden soll. Um diese mathematische Operation bereitzustellen, chainst du die aggregate Methode, meistens als agg geschrieben, direkt an deine gruppierten Daten. Das übernimmt die Apply- und Combine-Phasen. Innerhalb der aggregate Methode sagst du PySpark, was berechnet werden soll, indem du Tools aus dem PySpark SQL functions Modul verwendest. Dieses Modul enthält dutzende optimierte Aggregationsoperationen. Sagen wir mal, du willst das durchschnittliche Einkommen für jeden dieser User berechnen. Dafür würdest du die average Funktion importieren, die normalerweise avg genannt wird. Du übergibst den Namen deiner Einkommensspalte an die average Funktion und packst das in die aggregate Methode. Wenn das ausgeführt wird, berechnet PySpark gleichzeitig das durchschnittliche Einkommen für jeden einzelnen User-Bucket. Dann setzt die Combine-Phase ein und gibt einen standardmäßigen, lesbaren DataFrame zurück. Dieser neue DataFrame enthält nur eine Zeile pro User, gepaart mit seinem neu berechneten durchschnittlichen Einkommen. An diesem Punkt hast du eine perfekt zusammengefasste Tabelle. Da die Berechnung jedoch parallel auf einem verteilten Cluster passiert ist, werden die finalen Zeilen in einer völlig zufälligen Reihenfolge zurückgegeben, je nachdem, wann die Processing Nodes ihre Arbeit beendet haben. Wenn du die Spitzenverdiener sehen willst, ist eine zufällige Reihenfolge nutzlos. Um das zu beheben, chainst du die order by Methode an das Ende deines Aggregationsschritts. Du übergibst der order by Methode die Spalte mit deinen neuen Durchschnittswerten und sagst ihr, dass sie absteigend sortieren soll. PySpark nimmt die kombinierten Ergebnisse, rankt sie und liefert eine saubere, sortierte Tabelle. Das Split-Apply-Combine Pattern ist genau deshalb so mächtig, weil es sich perfekt auf verteilte Hardware mappen lässt, wodurch riesige Datensätze in Sekundenschnelle zusammengefasst werden können. Aber denk daran, dass das Gruppieren von Daten nur die halbe Miete ist. Grouping erfordert eine Aggregation, um den Job abzuschließen, sonst hast du nur einen Cluster voller leerer Buckets, die auf Anweisungen warten. Danke, dass du ein paar Minuten mit mir verbracht hast. Bis zum nächsten Mal, mach's gut.
12

Wenn DataFrames kollidieren: Die Kunst des Joins

3m 59s

Navigiere durch die Nuancen der Kombination von Datensätzen. Wir schlüsseln die sieben verschiedenen Join-Typen in PySpark auf und erklären, wie man DataFrames sicher zusammenführt.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 12 von 21. Das Mergen von zwei riesigen Tabellen ist die mit Abstand teuerste Operation im Distributed Computing. Wenn du die falsche Matching-Logik anwendest, ist das der schnellste Weg, deinen Cluster mit einem Out of Memory zum Absturz zu bringen. Genau darum geht es in When DataFrames Collide: The Art of Joining: zu wissen, wie du Datensätze sicher kombinierst. Der primäre Mechanismus zum Kombinieren von Daten in PySpark ist die Join-Methode. Du rufst sie auf deinem Base-DataFrame auf und übergibst den DataFrame, den du anhängen willst, die spezifische Spalte oder die Spalten, über die gematcht werden soll, und die Join-Methode. Wenn du gar keine Join-Methode angibst, verwendet PySpark standardmäßig einen Inner Join. Schauen wir uns ein konkretes Szenario an. Du hast einen DataFrame, der die Körpergröße von Personen erfasst, und einen zweiten DataFrame für deren Einkommen. Beide Datensätze haben eine gemeinsame Spalte namens name. Bei einem Inner Join schaut sich PySpark die Spalte name in beiden Datensätzen an und behält nur die Rows, in denen der Name an beiden Stellen existiert. Wenn eine Person in den Heights-Daten auftaucht, aber in den Incomes-Daten fehlt, wird ihr Record komplett aus dem Ergebnis gedroppt. Um Records ohne Match zu behalten, änderst du den Join-Typ. Ein Left Join behält jede Row aus deinem Start-DataFrame, was in diesem Fall die Heights-Daten sind. Findet PySpark einen passenden Namen in den Incomes-Daten, hängt es dieses Einkommen an. Findet es keinen Match, behält es die Height-Row, setzt aber einen Null-Wert in die Income-Spalte. Ein Right Join macht genau das Gegenteil: Er behält alle Incomes und füllt fehlende Heights mit Null-Werten auf. Wenn du absolut alles brauchst, verwendest du einen Full Join. PySpark behält jeden Record aus beiden DataFrames. Gematchte Namen werden in eine einzige Row gemerged, und alle Namen, die nur in einem Datensatz existieren, werden behalten, wobei Null-Werte die fehlenden Daten von der anderen Seite auffüllen. Hier ist der entscheidende Punkt. Ein Cross Join funktioniert anders, weil er die Join-Condition komplett ignoriert. Er paart jede einzelne Row im Heights-DataFrame mit jeder einzelnen Row im Incomes-DataFrame und erzeugt so ein kartesisches Produkt. Wenn beide Tabellen nur tausend Rows haben, gibt ein Cross Join eine Million Rows aus. Dieses explosive Wachstum ist der Grund, warum Cross Joins by default stark eingeschränkt sind und oft eine explizite Configuration erfordern, um ausgeführt zu werden, ohne einen Error zu werfen. Die letzten beiden Join-Typen sind eigentlich Filter-Operationen und keine echten Data-Merges. Ein Left Semi Join sucht nach Matches und gibt Rows aus dem Heights-DataFrame nur dann zurück, wenn der Name auch im Incomes-DataFrame auftaucht. Der entscheidende Unterschied zu einem Inner Join ist, dass ein Left Semi Join keine Columns von der rechten Seite rüberzieht. Du behältst exakt dieselben Columns, mit denen du angefangen hast, nur runtergefiltert auf die Records, die einen entsprechenden Match haben. Ein Left Anti Join macht genau das Gegenteil. Er gibt Rows aus dem Heights-DataFrame nur dann zurück, wenn der Name in den Incomes-Daten nicht existiert. Er droppt die Columns der rechten Seite komplett. Das macht den Left Anti Join zur effizientesten Methode, um fehlende Daten zu identifizieren oder Records zu finden, die downstream nicht verarbeitet werden konnten. Die Wahl des Joins bestimmt nicht nur, welche Daten du zurückbekommst, sondern auch, wie viele Daten physisch über dein Netzwerk bewegt werden müssen, um das Resultat zu generieren. Danke fürs Einschalten. Bis zum nächsten Mal!
13

Altes SQL, neue Tricks

3m 37s

Warum eine neue API lernen, wenn du rohes SQL verwenden kannst? Lerne, wie du Standard-SQL-Abfragen direkt gegen verteilte PySpark-DataFrames ausführst.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 13 von 21. Du hast ein Team von Analysten, die exzellentes SQL schreiben, aber deine Daten liegen auf einem riesigen verteilten Cluster. Du könntest sie zwingen, eine komplett neue Python-Syntax zu lernen, oder du lässt sie die Sprache nutzen, die sie bereits kennen. Genau hier kommt das Ausführen von Raw SQL Strings direkt in PySpark ins Spiel – Altes SQL, neue Tricks. PySpark bietet dir eine direkte Brücke zu Standard-SQL über eine einzige Methode in deiner Spark Session, die einfach sql heißt. Du übergibst dieser Methode einen Raw SQL String. Der Output ist kein Plain Text. Es ist ein Standard-PySpark-DataFrame. Das bedeutet, du kannst eine Standard-Datenbank-Query ausführen, ein DataFrame zurückbekommen und es sofort an eine andere Python-Funktion übergeben. Es ist komplett interoperabel. Bevor du Daten mit SQL abfragen kannst, muss PySpark wissen, welche Tabellen existieren. Du hast zwei Hauptwege, deine Daten für die SQL-Engine verfügbar zu machen. Erstens: Wenn du bereits ein DataFrame in Python hast, kannst du eine Methode aufrufen, um es als Temporary View zu registrieren. Du gibst ihm einen String-Namen, und plötzlich verhält es sich in deinen SQL-Queries wie eine Tabelle. Zweitens: Du kannst Tabellen komplett innerhalb deines SQL-Strings erstellen. Du übergibst ein Create Table Statement an die sql-Methode. Innerhalb dieses Strings definierst du das Schema und sagst PySpark genau, wo die zugrunde liegenden Data Files liegen, wie zum Beispiel ein Cloud Storage Path, der Parquet-Dateien enthält. PySpark registriert das in seinem internen Catalog. Von da an kannst du sie einfach über ihren Namen abfragen, genau wie eine traditionelle Datenbanktabelle. Vergleiche mal, wie dieselbe Logik in beiden Ansätzen aussieht. Angenommen, du musst Kundennamen abrufen, alle mit einer Balance von null rauswerfen und das Ergebnis mit einer Orders-Tabelle mergen. In der DataFrame API baust du eine Chain aus Python-Methoden. Du rufst select auf deinem Customer-Dataset auf, um die Name-Column auszuwählen. Dann chainst du eine filter-Methode, die prüft, ob die Balance größer als null ist. Zum Schluss hängst du eine join-Methode an, die das Orders-Dataset über einen passenden Key referenziert. Das ist sehr programmatisch. Beim SQL-Ansatz schreibst du ein Standard-Select-Statement, das die Name-Column zieht, fügst eine Where-Clause für die Balance hinzu und schreibst einen Inner Join für die Orders-Tabelle. Das Ganze steht in deinem Skript als ein einziger, lesbarer String-Block. Hier ist die wichtigste Erkenntnis. Es gibt den weit verbreiteten Irrglauben, dass das Schreiben von SQL innerhalb von Python-Strings langsamer oder weniger nativ sein muss als die Nutzung der strukturierten DataFrame-Methoden. Das ist falsch. Egal, ob du Python-Methoden chainst oder einen Raw SQL String übergibst – PySpark behandelt beides identisch. Beide Inputs werden sofort geparst, in exakt denselben Logical Plan übersetzt und an den Catalyst Optimizer übergeben. Die Execution Engine weiß nicht und es interessiert sie auch nicht, welche API du genutzt hast, um deine Absicht auszudrücken. Die Performance ist exakt dieselbe. Bei der Wahl zwischen der DataFrame API und Raw SQL geht es nie um die Cluster-Performance. Es geht rein darum, was dein Team schneller und deine Codebase leichter wartbar macht. Danke fürs Reinhören. Ich hoffe, du konntest etwas Neues mitnehmen.
14

Wechselspiel zwischen DataFrames und SQL

3m 36s

Mische SQL und Python nahtlos. Entdecke, wie du temporäre Views aus DataFrames erstellst, selectExpr verwendest und programmatische Operationen an SQL-Abfrageergebnisse anhängst.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 14 von 21. Vielleicht steckst du in einer Debatte fest, ob du deine Data Transformations in Python oder SQL schreiben sollst. Eine strikte Entscheidung zwischen den beiden zu erzwingen, lässt extrem viel Potenzial auf der Strecke. Der wahre Vorteil liegt darin, DataFrames und SQL nahtlos innerhalb genau derselben Pipeline auszutauschen. Manchmal ist ein komplexes Set aus nested Joins in Raw SQL für dein Team viel einfacher zu lesen und zu pflegen. In anderen Fällen musst du dynamisch über Column-Namen iterieren, was in purem SQL unmöglich, in Python aber trivial ist. PySpark erlaubt es dir, beide Ansätze zu vermischen, ohne deinen Data Flow zu unterbrechen. Um anzufangen, SQL gegen einen bestehenden Python-DataFrame zu schreiben, musst du diesen DataFrame zuerst der Spark SQL Engine zugänglich machen. Das erreichst du, indem du die Methode create or replace temp view direkt auf deinem DataFrame aufrufst. Du übergibst ein einzelnes String-Argument, das zum Tabellennamen wird. Diese Operation verschiebt keine Daten. Sie schreibt nichts auf die Festplatte. Sie registriert einfach nur einen temporären Pointer in deiner aktuellen Spark Session. Die SQL Engine weiß jetzt, wie sie diesen Tabellennamen zurück zu deinem Python-DataFrame auflösen kann. Jetzt kannst du ihn abfragen. Du rufst spark dot sql auf und übergibst dein Standard-Select-Statement als String, das auf den Tabellennamen referenziert, den du gerade erstellt hast. Hier ist die entscheidende Erkenntnis. Der Output von diesem spark dot sql Aufruf ist kein statisches Textergebnis, und auch kein anderer Objekttyp. Er gibt einen Standard-PySpark-DataFrame zurück. Das bedeutet, dass du normale Python-DataFrame-Methoden direkt an das Ende deines SQL-Aufrufs chainen kannst. Du kannst einen fünfzigzeiligen SQL-String schreiben, um eine komplexe Window-Function zu handhaben, die Klammer von spark dot sql schließen und sofort eine dot filter oder dot group by Methode anhängen. Du wechselst von Python zu SQL und zurück zu Python in einem einzigen Codeblock. Wenn du SQL nur für eine spezifische Column-Berechnung brauchst, ist das Registrieren einer kompletten Temporary View unnötig. Stattdessen nutzt du die select expression Methode. Diese Methode fungiert als Brücke. Sie funktioniert exakt wie eine Standard-DataFrame select Methode, aber sie akzeptiert Raw SQL String Expressions anstelle von Python Column-Objekten. Wenn du ein case-when Statement ausführen, mathematische Funktionen anwenden oder einen Data Type mit nativer SQL-Syntax casten musst, übergibst du genau diese SQL-Strings an select expression. Spark nimmt diese Strings, parst sie und führt sie exakt so aus, wie es das innerhalb einer vollständigen SQL Query tun würde. Das erlaubt es dir, komplett innerhalb der chainbaren DataFrame API zu bleiben, während du dich für komplexe Row-Level-Logik auf SQL-Syntax verlässt. Die Grenze zwischen diesen beiden Paradigmen ist komplett künstlich. Egal, ob du Python-Methoden chainst, Raw SQL Queries schreibst oder select expression Strings verwendest, Spark kompiliert alles auf exakt denselben optimierten Execution Plan herunter. Wenn du uns dabei helfen möchtest, weiterhin diese Episoden zu machen, kannst du auf Patreon nach DevStoriesEU suchen, um die Show zu supporten. Das war's für diese Folge. Danke fürs Zuhören, und keep building!
15

Spark mit Python UDFs erweitern

4m 24s

Wenn integrierte Funktionen nicht ausreichen, kommen User-Defined Functions ins Spiel. Wir untersuchen, wie man benutzerdefinierte Python-Logik für DataFrames schreibt und warum standardmäßige skalare UDFs Leistungseinbußen verbergen.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 15 von 21. Du schreibst eine Custom Function in Python, baust sie in deine Data Pipeline ein, und sie funktioniert fehlerfrei mit einem kleinen Sample. Aber wenn du sie auf dem kompletten Dataset ausführst, kommt der Job fast zum Stillstand, während die CPU-Auslastung sprunghaft ansteigt. Der Code selbst ist völlig in Ordnung, aber du zahlst eine versteckte Execution Tax. Heute sprechen wir darüber, wie du Spark mit Python UDFs erweiterst. Eine User Defined Function, oder UDF, ermöglicht es dir, Custom Python-Logik direkt auf einem Spark DataFrame auszuführen. Du nutzt das, wenn die Built-in Spark SQL Functions deine spezifische Business Logic nicht abdecken. Der Prozess ist ganz einfach. Du fängst damit an, eine Standard-Python-Funktion zu schreiben. Zum Beispiel schreibst du eine Funktion, die einen Text-String nimmt, eine komplexe Custom-Formatierungsregel anwendet und den modifizierten String zurückgibt. Damit Spark diese Funktion erkennt, importierst du die udf-Funktion aus dem PySpark SQL Functions Module und wendest sie als Decorator direkt über deiner Python-Funktionsdefinition an. Du übergibst dem Decorator auch einen Return Type, wie zum Beispiel einen String Type oder einen Integer Type. Wenn du keinen Return Type angibst, fällt Spark standardmäßig auf einen String Type zurück, was zu unbemerkten Datenproblemen führen kann, wenn deine Funktion eigentlich eine Zahl zurückgibt. Sobald sie dekoriert ist, verhält sich deine Custom Python-Funktion genau wie eine native Spark-Funktion. Du kannst sie in DataFrame-Operationen, wie ein Select Statement, übergeben und ihr Column Names als Argumente mitgeben. Hier ist der entscheidende Punkt. Eine Standard Scalar Python UDF arbeitet strikt Row für Row. Sie nimmt einen oder mehrere Column-Werte aus einer einzelnen Row als Input, wertet deine Custom Python-Logik aus und gibt genau einen Output-Wert für diese spezifische Row zurück. Wenn dein DataFrame zehn Millionen Rows enthält, wird deine Python-Funktion zehn Millionen Mal separat aufgerufen. Diese Row-by-Row-Operation ist leicht nachzuvollziehen, aber sie erzeugt den massiven Performance Bottleneck, den wir am Anfang erwähnt haben. Um zu verstehen, warum das so langsam ist, musst du dir ansehen, wie Spark den Code unter der Haube ausführt. Spark ist in Scala gebaut, was bedeutet, dass seine Core Engine in einer Java Virtual Machine, oder JVM, läuft. Deine Custom UDF ist in Python geschrieben. Die JVM kann Python-Code nicht nativ ausführen. Um deine UDF anzuwenden, ist Spark gezwungen, separate Python Worker-Prozesse neben seinen eigenen Executors hochzufahren. Dann muss es die Daten physisch aus dem JVM Memory Space heraus und in den Python-Prozess verschieben. Spark verlässt sich auf eine Python Serialization Library namens cloudpickle, um diesen komplexen Transfer zu handhaben. Genau hier wird die Performance Tax fällig. Für jede einzelne Row in deinem Dataset serialisiert Spark die Inputs in der JVM, sendet diese Binärdaten über einen lokalen Socket an den Python Worker und deserialisiert sie in Standard-Python-Objekte. Deine Custom Function läuft schließlich auf diesen Objekten. Dann läuft der ganze Zyklus in umgekehrter Richtung ab. Python serialisiert den Output-Wert mit cloudpickle, sendet ihn über den Socket zurück, und die JVM deserialisiert ihn wieder in das interne Memory-Format von Spark. Diese ständige Serialisierung und Deserialisierung zwischen Java und Python ist unglaublich teuer. Die wahren Kosten einer Standard Python UDF sind selten die Logik, die du schreibst; es ist der stille Overhead, Daten für jede einzelne Row zwischen zwei völlig unterschiedlichen Runtime Environments hin und her zu übersetzen. Danke, dass du ein paar Minuten mit mir verbracht hast. Bis zum nächsten Mal, mach's gut.
16

UDFs mit Apache Arrow beschleunigen

3m 41s

Beseitige den JVM-zu-Python-Serialisierungsengpass. Wir decken auf, wie Vectorized Pandas UDFs und Apache Arrow-Speicherformate deine benutzerdefinierten Transformationen beschleunigen.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 16 von 21. Was wäre, wenn du deine Custom Python Functions in Spark um das Zehnfache beschleunigen könntest, nur indem du einen einzigen Decorator änderst? Standard-Python-UDFs sind bekanntermaßen langsam, aber für die Lösung musst du deine Logik nicht in Scala neu schreiben. Heute geht es um das Turbocharging von UDFs mit Apache Arrow. Wenn du eine Standard-Python-UDF ausführst, stößt du an der Sprachgrenze auf eine massive Performance-Wand. Spark läuft innerhalb der Java Virtual Machine, aber deine Custom-Logik läuft in einem separaten Python Worker Process. Um Daten dazwischen auszutauschen, extrahiert Spark Rows aus seinem internen Memory, serialisiert sie mit einer Library namens cloudpickle und sendet sie an Python. Python verarbeitet die Daten Row für Row, serialisiert das Ergebnis und sendet es zurück. Das für Millionen einzelner Rows zu machen, führt zu einem unerträglichen Serialization Bottleneck. Apache Arrow ändert die Spielregeln für diesen Datenaustausch. Arrow ist ein sprachübergreifendes, columnar In-Memory-Datenformat. Es standardisiert, wie Daten im Memory aussehen, sodass sowohl die JVM als auch Python sie nativ und ohne aufwendige Übersetzung verstehen. Anstatt Daten Row für Row zu serialisieren, packt Spark die Daten in große, columnar Batches. Alle Werte für eine bestimmte Column liegen im zusammenhängenden Memory direkt nebeneinander. Spark sendet diese großen Blöcke in einem einzigen, effizienten Schritt an Python. Du kannst das auf zwei Arten nutzen. Erstens kannst du die Arrow-Optimierung für Standard-UDFs aktivieren. Das machst du, indem du die Spark Configuration Property für die Arrow Execution auf true setzt, oder indem du den Parameter useArrow equals true angibst, wenn du deine UDF registrierst. Spark nutzt dann Arrow, um die Daten in Batches zu übertragen. Das reduziert den Serialization Overhead drastisch, auch wenn deine Python-Funktion die Logik technisch gesehen immer noch Row für Row ausführt. Hier ist der entscheidende Punkt: Um den maximalen Speed Boost herauszuholen, sollte dein Python-Code diese Arrow Batches gleichzeitig verarbeiten. Hier kommen Pandas UDFs ins Spiel. Indem du deine Custom Function mit dem pandas UDF Decorator wrappst, änderst du die Art und Weise, wie die Funktion Daten empfängt. Anstatt einen einzelnen Wert für eine Row zu bekommen, empfängt deine Funktion eine Pandas Series, die einen kompletten Batch an Werten enthält. Deine Funktion wendet eine vectorized Operation auf diesen gesamten Batch an und gibt eine neue Pandas Series mit exakt derselben Länge zurück. Stell dir eine Funktion namens calculate tax vor. Du wendest den pandas UDF Decorator an und deklarierst, dass sie einen double Type zurückgibt. Die Funktion akzeptiert eine Pandas Series mit Produktpreisen. Innerhalb der Funktion schreibst du keine for-loop. Du schreibst einfach ein return Statement, das die Input Series mit 1,2 multipliziert. Weil Pandas unter der Haube auf hochoptimiertem C-Code basiert, multipliziert es den gesamten Preis-Block sofort. Spark nimmt dann diese zurückgegebene Series und mergt sie mithilfe von Arrow nahtlos zurück in den DataFrame. Die wahre Power einer Pandas UDF ist nicht nur, dass sie das cloudpickle Serialization Bottleneck umgeht, sondern dass sie deine eigentliche Computation von langsamen Python Loops in eine vectorized, native Execution verlagert. Danke fürs Zuhören. Macht's gut zusammen.
17

Zeilen mit Python UDTFs explodieren lassen

4m 33s

Standard-UDFs geben einen Wert pro Zeile zurück, aber was ist, wenn du mehrere Zeilen benötigst? Lerne, wie Python User-Defined Table Functions (UDTFs) komplexe One-to-Many-Generierungsprobleme lösen.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 17 von 21. Standard User-Defined Functions sind streng auf ein One-to-One Mapping beschränkt. Du übergibst einen Wert und bekommst genau einen Wert zurück. Aber was ist, wenn ein einzelner, kompakter Log-Eintrag in hundert separate Rows aufgeteilt werden muss? Um das zu lösen, nutzt du Python User-Defined Table Functions, oder UDTFs. Eine UDTF macht genau das, was der Name sagt. Sie gibt eine komplette Table aus einem einzigen Input zurück. Während eine Standard-UDF einen einzelnen Scalar-Wert berechnet, kann eine UDTF mehrere Rows und Columns ausgeben. Das ist das Tool, zu dem du greifst, wenn du einen nested JSON-String exploden, ein delimited Text-File Zeile für Zeile parsen oder eine Sequence von Dates aus einem einzigen Timestamp generieren musst. Um eine UDTF in PySpark zu erstellen, schreibst du keine einfache Standalone-Function. Stattdessen definierst du eine Python Class. Diese Class braucht eine spezifische Method namens eval. In der eval Method passiert die eigentliche Transformation. Wenn du die UDTF ausführst, ruft Spark diese Method für jeden Input-Wert auf. Hier ist der entscheidende Punkt. Innerhalb dieser eval Method nutzt du kein Standard return Statement. Stattdessen verwendest du das Python yield Keyword. Jedes Mal, wenn die Method einen Wert yieldet, übersetzt Spark das in eine neue Row in deiner Output Table. Wenn du einen einzelnen Input-String übergibst, loopt die eval Method vielleicht darüber und yieldet zehnmal. Spark nimmt diese zehn Yields und produziert zehn verschiedene Rows. Lass uns ein konkretes Beispiel durchgehen. Du baust eine Class namens ProcessWords. Dein Ziel ist es, einen ganzen Satz zu übergeben und eine Table zurückzubekommen, in der jedes Wort seine eigene Row hat. Du schreibst die eval Method so, dass sie einen Text-String akzeptiert. Innerhalb der Method splittest du den Satz an den Leerzeichen. Dann loopst du über die resultierenden Wörter. Für jedes Wort yieldest du ein Tuple, das das Wort selbst enthält. Bevor Spark diese Class nutzen kann, wendest du den PySpark UDTF Decorator darauf an. Der Decorator ist zwingend erforderlich, weil er dein Output Schema definiert. Du deklarierst explizit die Column Names und Data Types, die deine Function generiert. Wenn du einen String yieldest, sagst du dem Decorator, dass der Output eine String Column ist. Wenn du das Wort und seinen Character Count yielden willst, yieldest du ein Tuple mit zwei Elementen, und dein Decorator spezifiziert ein Schema mit einer String Column und einer Integer Column. Neben der eval Method kann eine UDTF Class auch eine optionale terminate Method enthalten. Spark ruft die terminate Method genau einmal für jede Data Partition auf, nachdem alle Input Rows von der eval Method verarbeitet wurden. Das ist extrem nützlich für Aggregation. Wenn deine eval Method einen internen Counter über mehrere Input Rows hinweg trackt, kann die terminate Method eine finale Row mit diesem Total Count yielden, bevor die Partition schließt. Wenn du eine UDTF in einer DataFrame Operation aufrufst, verhält sie sich wie eine Inline Table. Wenn du eine existierende DataFrame Column in die UDTF übergibst, wendet Spark die Table Function Row für Row an. Weil eine Table Function mehrere Rows für jede einzelne Input Row ausgibt, erfordert das Kombinieren dieses Outputs mit deinem originalen Dataset einen impliziten Lateral Join. Spark erledigt das im Hintergrund und dupliziert die originalen Row-Daten, damit sie zu den neu exploded Rows passen, die von deiner Python Class generiert wurden. Die wahre Power einer Python UDTF ist, dass sie dein Input Volume komplett von deinem Output Volume entkoppelt, wodurch ein einzelner Data Point zu einem vollen Multi-Column Dataset erblühen kann. Das war's für diese Folge. Danke fürs Zuhören, und keep building!
18

Die Pandas API auf Spark

4m 28s

Skaliere deine bestehenden Pandas-Skripte ins Unendliche. Entdecke, wie die pyspark.pandas API es dir ermöglicht, Standard-Pandas-Syntax nativ auf einem verteilten Spark-Cluster auszuführen.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 18 von 21. Du hast ein lokales Data-Script, das perfekt funktioniert, aber plötzlich vervierfacht sich die Größe deines Datasets und dein Rechner läuft out of memory. Du beherrschst die Syntax perfekt, aber alles für ein distributed Framework neu zu schreiben, dauert Tage. Die pandas API on Spark schließt genau diese Lücke. Mit der pandas API on Spark kannst du Standard-pandas-Workloads auf einem distributed Cluster ausführen. Sie emuliert pandas nicht einfach nur blind. Sie fängt deinen pandas-Code ab und übersetzt ihn under the hood in optimierte Spark Execution Plans. Um sie zu nutzen, importierst du das Modul namens pyspark dot pandas. Die Standardkonvention ist, ihm den Alias ps zuzuweisen, was direkt den bekannten Alias pd widerspiegelt, der in lokalen Data-Science-Workloads verwendet wird. Wenn du bereits ein lokales pandas DataFrame im Memory hast, ist der Übergang unkompliziert. Du rufst eine Funktion namens from pandas auf deinem ps-Modul auf und übergibst dein lokales DataFrame. Das konvertiert das Single-Node-Objekt in ein distributed pandas-on-Spark DataFrame. Ab diesem Punkt bleibt die Syntax, die du für die Interaktion mit diesem neuen Objekt verwendest, identisch mit dem, was du bereits kennst. Diese Konsistenz erstreckt sich auch darauf, wie die Daten intern verarbeitet werden. Die distributed API behandelt fehlende Daten nativ genau so, wie lokales pandas das tut. Wenn dein Dataset NumPy Not-a-Number-Werte enthält, managt die pandas API on Spark diese bei mathematischen Operationen oder strukturellen Transformationen völlig korrekt. Du musst keine neue Data-Cleaning-Logik für deine Spark-Jobs erfinden. Standardoperationen lassen sich direkt übertragen. Wenn du deine Daten nach einer bestimmten Column gruppieren willst, rufst du die Standard-Grouping-Funktion auf. Wenn du den Mean oder die Summe berechnen willst, chainst du die Aggregate-Funktion direkt dahinter. Du kannst sogar Plotting-Funktionen direkt auf dem distributed DataFrame aufrufen. Spark verarbeitet die schweren Computations über das Cluster hinweg, aggregiert die nötigen Datenpunkte und gibt die Visualisierung zurück, genau so, als würdest du auf einer einzelnen Maschine arbeiten. Hier ist die wichtigste Erkenntnis. Die Architektur darunter ist grundlegend anders, und das bringt einen kritischen Edge Case bei der Index-Generierung mit sich. Lokales pandas verlässt sich stark auf einen sequenziellen, streng geordneten Index für jede einzelne Row. Spark hingegen partitioniert Daten und verteilt sie auf mehrere unabhängige Maschinen. Einen strengen, global geordneten sequenziellen Index über ein distributed System hinweg zu erzwingen, erfordert ständige Kommunikation zwischen den Worker Nodes. Wenn du ein pandas-on-Spark DataFrame erstellst, ohne explizit eine Index-Column zu definieren, generiert die API automatisch einen Default-Index, um das Standard-pandas-Verhalten perfekt zu imitieren. Diesen Default-Index zu erstellen und zu maintainen, erfordert die Synchronisierung des States über das gesamte Cluster. Wenn du mit einem massiven Dataset arbeitest, führt diese Synchronisierung zu einem extremen Performance-Overhead. Die API gibt bei der Ausführung oft eine Warnung bezüglich dieses internen Overheads aus. Um diesen Bottleneck zu vermeiden, wird dringend empfohlen, sofort eine existierende Column als Index zuzuweisen oder die API so zu konfigurieren, dass sie einen distributed-friendly Index-Typ verwendet. Die pandas API on Spark gibt dir die exakte Syntax von pandas, angetrieben von der distributed Execution Engine von Spark. Aber wenn du dich daran erinnerst, dass strikte sequenzielle Indizes hohe Synchronisierungskosten mit sich bringen, bewahrst du dein Cluster vor unnötigen Slowdowns. Das war's für heute. Danke fürs Zuhören – geh und bau etwas Cooles.
19

Laden und Staunen: Speicherformate

4m 12s

Nicht alle Dateiformate sind gleich. Wir stellen zeilenbasierte CSVs spaltenbasierten Formaten wie Parquet und ORC gegenüber und untersuchen Lese-/Schreiboptionen sowie optimale Speichertechniken.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 19 von 21. Einen riesigen Datensatz als CSV zu speichern, ist das Einfachste auf der Welt, aber gleichzeitig auch einer der größten Performance-Killer für deinen Data Lake. Du zahlst für mehr Storage, du zahlst für mehr Compute, und jede Downstream-Query schleicht vor sich hin. Die Lösung liegt darin, wie du das Laden handhabst – Load and Behold: Storage Formats – und warum die Art, wie du deine Daten speicherst, genauso wichtig ist wie die Art, wie du sie transformierst. PySpark nutzt ein einheitliches Interface zum Lesen und Schreiben von Daten über Dutzende von Storage-Systemen hinweg. Du rufst das Read- oder Write-Attribut deiner Spark Session oder deines DataFrames auf, gibst ein Format an, übergibst eine Chain von Options und verweist auf einen Dateipfad. Das ist ein vorhersehbares Pattern, aber die Options, die du wählst, bestimmen, wie viel Arbeit dein Cluster später leisten muss. Fangen wir mit den human-readable Formaten an, CSV und JSON. Das sind row-based Formate. Wenn du eine CSV liest, parst Spark die Daten Zeile für Zeile. Oft musst du bestimmte Options chainen, damit der Text Sinn ergibt. Zum Beispiel könntest du eine Option chainen, um Spark zu sagen, dass die Datei einen Header hat. Eine weitere Option, um einen Custom Delimiter wie eine Pipe oder einen Tab zu setzen. Und eine dritte Option, um genau zu definieren, wie ein Null-Value aussieht – vielleicht übergibst du einen bestimmten String, damit Spark ihn korrekt auf einen leeren Wert mappt, anstatt ihn als literalen Text zu behandeln. JSON ist etwas besser, weil es nested Structures nativ verarbeitet, aber es wiederholt die Schema-Keys für jeden einzelnen Record, was die Dateigröße massiv aufbläht. Beide Formate zwingen Spark dazu, die gesamte Row von der Disk zu lesen, selbst wenn deine Query nur eine einzige Column abfragt. Hier kommen Columnar Formats wie Parquet und ORC ins Spiel. Pass jetzt gut auf. Analytical Queries brauchen selten jede Column in einer breiten Tabelle. Meistens brauchen sie bestimmte Columns über Millionen von Rows hinweg, um Aggregations auszuführen. Parquet und ORC speichern Daten nach Column organisiert, nicht nach Row. Wenn du drei von hundert Columns abfragst, liest Spark nur die Chunks der Datei, die diese drei Columns enthalten. Den Rest überspringt es komplett, was den Disk I/O auf einen Bruchteil dessen reduziert, was eine CSV erfordert. Weil Daten vom selben Typ zusammen gespeichert werden, lassen sich Columnar Formats auch wunderbar komprimieren. Ein Directory mit JSON-Dateien kann um siebzig Prozent oder mehr schrumpfen, wenn es in Parquet konvertiert wird. Außerdem betten sie das exakte Schema und die Data Types in die File-Metadata ein. Das heißt, Spark muss die Types beim Load nicht erraten oder ableiten. Wenn du bereit bist, diese Daten wieder rauszuschreiben, musst du den State an der Destination managen. By default wirft Spark einen Error, wenn du versuchst, in einen Pfad zu schreiben, in dem bereits Daten existieren, um versehentlichen Datenverlust zu verhindern. Du steuerst das über die mode-Methode, bevor du den Save triggerst. Wenn du den String overwrite übergibst, löscht Spark die vorhandenen Daten im Target Path und ersetzt sie durch deinen aktuellen DataFrame. Wenn du append übergibst, fügt Spark deine neuen Part Files einfach dem bestehenden Directory hinzu. Es gibt auch einen ignore-Mode, der stillschweigend gar nichts macht, wenn das Directory bereits befüllt ist. Heute saubere, getypte Columnar Data zu schreiben, spart deinem Cluster morgen Stunden an verschwendeter Processing Time. Wenn du dabei helfen willst, dass diese Episoden weiterhin erscheinen, kannst du die Show unterstützen, indem du auf Patreon nach DevStoriesEU suchst. Danke, dass du dir ein paar Minuten für mich Zeit genommen hast. Bis zum nächsten Mal, mach's gut.
20

Bug Busting: Physische Pläne und Joins

3m 23s

Wirf einen Blick unter die Haube der Ausführungs-Engine von Spark. Lerne, wie man Abfragen mit DataFrame.explain() debuggt und wie man kostspielige Shuffles durch die Verwendung von Broadcast Joins eliminiert.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 20 von 21. Dein PySpark Job ist nicht langsam, weil er Daten berechnet. Er ist langsam, weil er seine ganze Zeit damit verbringt, Daten über das Netzwerk zu schieben. Wenn ein einfacher Join deinen Cluster in die Knie zwingt, liegt die Lösung im Bug Busting: Physical Plans und Joins. Wenn du ein PySpark Script schreibst, definierst du logische Operationen. Du sagst Spark, was du willst, nicht wie es gemacht werden soll. Aber wenn ein Job schlecht performt, musst du genau wissen, wie Spark deine Anfrage ausgeführt hat. Das machst du, indem du die explain Methode auf deinem DataFrame aufrufst. Der Aufruf von explain gibt den Physical Plan aus. Das ist der Bauplan der tatsächlichen Tasks, die Spark auf deinem Cluster ausführt. Du liest diesen Plan von unten nach oben und verfolgst die Daten von den Source Files bis zum finalen Output. Wenn du dir den Physical Plan für einen Standard Join zwischen zwei DataFrames ansiehst, siehst du wahrscheinlich einen Step namens SortMergeJoin. Um einen SortMergeJoin auszuführen, muss Spark sicherstellen, dass Rows mit denselben Join Keys physisch auf demselben Executor liegen. Um das zu erreichen, macht Spark einen Exchange. Exchange ist der Physical Plan Begriff für einen Network Shuffle. Das bedeutet, dass Spark Daten aus Partitions reißt, sie über das Netzwerk schiebt und auf Disk schreibt, damit die anderen Executors sie lesen können. Shuffling ist die mit Abstand teuerste Operation im Distributed Computing. Hier ist die entscheidende Erkenntnis. Wenn du eine riesige Fact Table mit einer kleinen Lookup Table joinst, ist das Shuffling der großen Tabelle eine massive Ressourcenverschwendung. Anstatt beide Tabellen zu shuffeln, um die Keys anzugleichen, kannst du einfach die komplette kleine Tabelle an jeden Executor schicken. Das machst du mit der broadcast Funktion aus dem PySpark SQL functions Modul. Wenn du deine join Methode aufrufst, wrappst du den kleineren DataFrame einfach in die broadcast Funktion. Indem du die kleine Tabelle wrappst, gibst du Spark eine strikte Anweisung. Spark sammelt den kleinen DataFrame auf dem Driver Node und überträgt dann eine komplette Kopie davon in den Memory von jedem einzelnen Executor. Wenn nun der große DataFrame verarbeitet wird, haben die Executors bereits alle Lookup Daten, die sie brauchen, direkt im RAM. Sie streamen einfach durch ihre bestehenden Partitions und matchen die Rows lokal. Es ist kein Sorting nötig, und es bewegen sich keine Daten aus der großen Tabelle über das Netzwerk. Wenn du explain auf diesem neuen broadcasted Join aufrufst, sieht der Physical Plan völlig anders aus. Der SortMergeJoin ist weg. Der teure Exchange Step fehlt komplett. An ihrer Stelle siehst du einen BroadcastExchange und einen BroadcastHashJoin. Der BroadcastExchange bewegt die kleine Tabelle nur einmal, und der Join selbst passiert komplett in place. Der einfachste Weg, die Geschwindigkeit eines Spark Jobs zu verdoppeln, ist aufzuhören, Daten zu bewegen, die nicht bewegt werden müssen. Lies deine Physical Plans, finde die Network Exchanges und broadcaste deine kleinen Tabellen. Das war's für heute. Danke fürs Zuhören – geh und bau etwas Cooles.
21

Profiling von PySpark-Speicher und -Leistung

4m 10s

Wir schließen unsere PySpark-Reise ab, indem wir native Profiling-Tools vorstellen. Lerne, wie man den Speicherverbrauch Zeile für Zeile verfolgt und versteckte interne Python-Tracebacks aufdeckt.

Herunterladen
Hallo, hier ist Alex von DEV STORIES DOT EU. PySpark Fundamentals, Folge 21 von 21. Das Debuggen von verteiltem Python-Code bedeutet normalerweise, Tausende von Zeilen sinnloser Java-Fehler zu durchforsten und zu versuchen zu erraten, warum deine Funktion fehlgeschlagen ist oder warum sie den gesamten Memory auf deinem Cluster belegt hat. Du musst jetzt nicht mehr raten. Heute schauen wir uns das Profiling von PySpark-Memory und -Performance an, zusammen mit der Vereinfachung von Stack Traces. Wenn du eine User Defined Function, oder UDF, in PySpark schreibst, läuft dein Python-Code auf einer Java Virtual Machine Infrastruktur. Wenn dein Python-Code durch Null teilt oder auf einen fehlenden Dictionary Key zugreift, wird diese einfache Python Exception geschluckt. Sie wird durch den PySpark-Daemon zurückgereicht, über das Netzwerk, und in riesige Java Exceptions verpackt. Den eigentlichen Python-Fehler in deinen Logs zu finden, ist mühsam. Du kannst das beheben, indem du Simplified Tracebacks aktivierst. Wenn du die Spark Configuration für Simplified Traceback auf true setzt, ändert PySpark die Art, wie es Fehler meldet. Es entfernt die ganzen Java-Interoperability-Logs und das Rauschen der Worker-Prozesse. Wenn das nächste Mal eine UDF fehlschlägt, gibt deine Konsole einen sauberen Standard-Python-Stack-Trace aus, der die genaue Zeilennummer in deiner Python-Datei anzeigt, in der die Exception aufgetreten ist. Abstürze zu beheben, ist nur die halbe Miete. Langsamen oder speicherhungrigen Code zu reparieren, ist viel schwieriger. Wenn du eine Pandas UDF schreibst, die Millionen von Zeilen verarbeitet, läuft sie vielleicht erfolgreich durch, braucht aber viel zu lange oder löst Out-of-Memory Errors auf deinen Executor Nodes aus. In der Vergangenheit erforderte das Finden des Flaschenhalses manuelles Logging oder das Raten, welche Pandas-Operation ineffizient war. Spark 4.0 ändert das durch die Einführung von eingebauten Python UDF Profilern. Hier ist der entscheidende Punkt. Du kannst deinen verteilten Python-Code jetzt Zeile für Zeile direkt in PySpark profilieren. Um das zu nutzen, setzt du die UDF Profiler Configuration auf einen von zwei Modi: Performance oder Memory. Wenn du die Profiler Configuration auf das Wort perf setzt, aktiviert Spark den Performance Profiler. Danach führst du deinen Spark Job ganz normal aus. Während die Worker Nodes deine Pandas UDF ausführen, trackt Spark die Execution Time jeder einzelnen Zeile deiner Python-Funktion. Sobald dein Job beendet ist, rufst du die show-Methode auf dem Spark Profile Object auf. Spark gibt dann einen detaillierten Report in deiner Konsole aus. Für jede Zeile deines Codes siehst du genau, wie oft sie aufgerufen wurde und wie viel Zeit insgesamt für die Ausführung aufgewendet wurde. Du kannst sofort sehen, ob eine bestimmte String-Manipulation oder mathematische Operation deine gesamte Pipeline verlangsamt. Wenn du mit Memory Limits zu tun hast, setzt du die UDF Profiler Configuration stattdessen auf das Wort memory. Der Workflow ist genau derselbe, aber der Output ändert sich. Wenn du dir den Profile Report ansiehst, zeigt Spark dir die genaue Megabyte-Zunahme an, die durch jede Zeile deines Python-Codes verursacht wird. Du kannst genau sehen, wo große Arrays allokiert werden und wo Memory nicht wieder freigegeben wird. Diese Zeile-für-Zeile-Sichtbarkeit macht Schluss mit dem Rätselraten bei der Optimierung komplexer Data Transformations. Du kannst die genaue Ursache deiner Performance Issues punktgenau bestimmen, ohne deine PySpark Environment zu verlassen. Da dies die letzte Folge unserer PySpark-Serie ist, ermutige ich dich, dir die offizielle Spark-Dokumentation anzusehen und diese Debugging Tools hands-on auszuprobieren. Wenn du Ideen hast, welche Technologien wir in unserer nächsten Serie behandeln sollten, schau auf devstories.eu vorbei und lass es uns wissen. Danke, dass du ein paar Minuten mit mir verbracht hast. Bis zum nächsten Mal, mach's gut.