v4.1 — Edizione 2026. Una guida completa a PySpark 4.1, che copre Spark Connect, DataFrames, tipi di dati complessi, trasformazioni dei dati, SQL, UDFs e profiling.
Stabiliamo la necessità fondamentale di PySpark. Scopri perché le librerie Python standard come Pandas falliscono su larga scala e come PySpark fornisce un motore di esecuzione distribuito per elaborare enormi dataset senza problemi.
4m 07s
2
La rivoluzione di Spark Connect
Esplora l'architettura Spark Connect. Spieghiamo come PySpark ha disaccoppiato il client e il server, permettendoti di eseguire applicazioni Spark ovunque senza ingombranti dipendenze JVM.
3m 28s
3
DataFrames e Lazy Evaluation
Immergiti nell'astrazione fondamentale di PySpark: il DataFrame. Discutiamo il concetto di lazy evaluation, la differenza tra trasformazioni e azioni, e perché Spark pianifica prima di eseguire.
4m 05s
4
Creare e visualizzare i DataFrames
Impara come istanziare DataFrames da oggetti Python grezzi, dizionari e file, e come ispezionare in modo sicuro i tuoi dati distribuiti senza far crashare il tuo nodo driver.
3m 34s
5
Padroneggiare i Data Types di base
Un tour dei tipi numerici e stringa fondamentali di PySpark. Esploriamo come definire esplicitamente gli schemi usando StructType e StructField per data pipelines robuste.
3m 42s
6
I pericoli della precisione
Scopri le differenze critiche tra FloatType, DoubleType e DecimalType. Impara perché scegliere il tipo numerico sbagliato può introdurre disastrosi errori di arrotondamento nei dati finanziari.
4m 21s
7
Domare dati complessi e annidati
I Big Data non sono sempre piatti. Esploriamo i tipi di dati complessi di PySpark, inclusi ArrayType, StructType e MapType, che ti permettono di analizzare nativamente JSON profondamente annidati.
4m 04s
8
Type Casting e selezione
Impara a modellare attivamente gli schemi dei tuoi DataFrame. Trattiamo come selezionare sottoinsiemi di colonne e come fare il cast sicuro delle colonne da un tipo di dato a un altro.
3m 34s
9
Function Junction: Pulire i dati sporchi
Garbage in, garbage out. Impara le trasformazioni essenziali dei DataFrame per eliminare i valori nulli, riempire i valori mancanti e gestire i record NaN in modo nativo nei sistemi distribuiti.
3m 58s
10
Trasformare e rimodellare i dati
Prendi il controllo della forma dei tuoi dati. Esploriamo come generare nuove colonne con funzioni matematiche, eseguire manipolazioni di stringhe e appiattire array annidati usando explode.
4m 19s
11
Le meccaniche di raggruppamento e aggregazione
Padroneggia la strategia split-apply-combine. Ci immergiamo nel raggruppamento dei dati per chiavi e nell'applicazione di potenti funzioni di aggregazione per riassumere enormi dataset.
3m 37s
12
Quando i DataFrames si scontrano: L'arte del Joining
Navigare tra le sfumature della combinazione di dataset. Analizziamo i sette diversi tipi di join in PySpark e spieghiamo come unire i DataFrames in modo sicuro.
3m 39s
13
Vecchio SQL, nuovi trucchi
Perché imparare una nuova API quando puoi usare SQL grezzo? Impara a eseguire query SQL standard direttamente sui DataFrames distribuiti di PySpark.
3m 35s
14
Interscambiare DataFrames e SQL
Mescola e abbina SQL con Python senza problemi. Scopri come creare viste temporanee dai DataFrames, usare selectExpr e concatenare operazioni programmatiche ai risultati delle query SQL.
3m 42s
15
Estendere Spark con le UDFs di Python
Quando le funzioni integrate non bastano, entrano in gioco le User-Defined Functions. Esploriamo come scrivere logica Python personalizzata per i DataFrames e perché le UDFs scalari standard nascondono una penalità di performance.
3m 38s
16
Potenziare le UDFs con Apache Arrow
Elimina il collo di bottiglia della serializzazione da JVM a Python. Scopriamo come le Vectorized Pandas UDFs e i formati di memoria di Apache Arrow potenziano le tue trasformazioni personalizzate.
3m 32s
17
Esplodere le righe con le UDTFs di Python
Le UDFs standard restituiscono un valore per riga, ma cosa succede se hai bisogno di più righe? Impara come le User-Defined Table Functions (UDTFs) di Python risolvono complessi problemi di generazione uno-a-molti.
4m 03s
18
La Pandas API su Spark
Scala i tuoi script Pandas esistenti all'infinito. Scopri come l'API pyspark.pandas ti permette di eseguire la sintassi standard di Pandas in modo nativo su un cluster Spark distribuito.
4m 06s
19
Carica e ammira: Formati di archiviazione
Non tutti i formati di file sono creati uguali. Confrontiamo i CSVs basati su righe con formati colonnari come Parquet e ORC, esplorando le opzioni di lettura/scrittura e le tecniche di archiviazione ottimali.
3m 46s
20
Bug Busting: Piani fisici e Joins
Dai un'occhiata sotto il cofano del motore di esecuzione di Spark. Impara a fare il debug delle query usando DataFrame.explain() e a eliminare i costosi shuffle usando i Broadcast joins.
3m 15s
21
Profiling della memoria e delle performance di PySpark
Concludiamo il nostro viaggio in PySpark introducendo strumenti di profiling nativi. Impara a tracciare il consumo di memoria riga per riga e a esporre i traceback interni nascosti di Python.
4m 14s
Episodi
1
Il problema dei Big Data e la promessa di PySpark
4m 07s
Stabiliamo la necessità fondamentale di PySpark. Scopri perché le librerie Python standard come Pandas falliscono su larga scala e come PySpark fornisce un motore di esecuzione distribuito per elaborare enormi dataset senza problemi.
Ciao, sono Alex di DEV STORIES DOT EU. PySpark Fundamentals, episodio 1 di 21. Il tuo script Python standard funziona perfettamente durante i test, ma nel momento in cui il tuo dataset raggiunge i cinquanta gigabyte, va in crash con un errore di OutOfMemory. Hai raggiunto i limiti fisici di una singola macchina. La soluzione a questo collo di bottiglia è il focus di questo episodio: il problema dei big data e la promessa di PySpark.
Gli strumenti Python standard per i dati sono progettati per l'esecuzione single-node. Librerie come pandas sono incredibilmente efficienti, ma richiedono che l'intero dataset risieda nella memoria locale. Se il tuo server ha sedici gigabyte di RAM e provi a caricare cinquanta gigabyte di log applicativi, il sistema operativo interviene e termina il processo. Fare scaling verticale noleggiando un server più grande e costoso non fa altro che rimandare l'inevitabile. I dati crescono più velocemente degli upgrade hardware. Alla fine, i dati superano la capacità della macchina.
PySpark risolve questa limitazione. È l'API Python per Apache Spark. Apache Spark stesso è un engine di calcolo distribuito che gira sulla Java Virtual Machine. PySpark funge da ponte, permettendoti di scrivere la tua logica interamente in Python sfruttando al contempo l'engine distribuito altamente ottimizzato di Spark.
Questo sposta la tua architettura dallo scaling verticale allo scaling orizzontale. Invece di affidarti a un'unica macchina enorme, PySpark partiziona i tuoi dati e distribuisce i calcoli su un cluster di molte macchine più piccole, note come nodi. Tu scrivi il tuo codice Python e PySpark lo traduce in un piano di esecuzione parallelo. Se il volume dei tuoi dati raddoppia il mese prossimo, non devi riscrivere una singola riga di codice. Ti basta aggiungere altri nodi al cluster.
L'ecosistema PySpark è organizzato in alcuni moduli principali progettati per diversi workload. Il primo è Spark SQL. Questo è il fondamento della maggior parte delle moderne applicazioni PySpark. Fornisce una struttura DataFrame per la gestione di dati tabellari distribuiti su più macchine. Ti permette inoltre di eseguire query SQL standard direttamente su questi dataset distribuiti.
Poi c'è Structured Streaming. Questo modulo gestisce le data pipeline in real-time. Invece di elaborare un enorme batch di dati durante la notte, Structured Streaming elabora continuamente flussi di record, come letture di sensori live o eventi di traffico web. Utilizza lo stesso identico modello di programmazione di Spark SQL, il che significa che la tua logica di elaborazione batch e la tua logica di streaming sono pressoché identiche.
Poi c'è MLlib, la libreria di machine learning. Addestrare modelli su enormi dataset su una singola macchina è un noto collo di bottiglia. MLlib fornisce algoritmi di machine learning distribuiti per task come classificazione, regressione e clustering. Distribuisce le pesanti operazioni matematiche sull'intero cluster, riducendo drasticamente i tempi di addestramento.
Ecco il punto chiave. La vera potenza di PySpark è l'astrazione. Non devi mai suddividere manualmente i tuoi file enormi in chunk. Non scrivi mai codice di rete per coordinare i server. Ti basta definire una sequenza logica di trasformazioni, e l'engine sottostante gestisce la distribuzione dei dati, l'esecuzione parallela e persino il processo di recovery se un nodo perde alimentazione a metà del calcolo.
PySpark non è semplicemente una utility per aprire file più grandi. È un cambiamento fondamentale: dal computing limitato da una singola scheda madre al computing limitato unicamente dalle dimensioni del tuo cluster.
Se trovi utili questi episodi e vuoi supportare lo show, puoi cercare DevStoriesEU su Patreon. Questo è tutto per questo episodio. Grazie per l'ascolto, e continua a sviluppare!
2
La rivoluzione di Spark Connect
3m 28s
Esplora l'architettura Spark Connect. Spieghiamo come PySpark ha disaccoppiato il client e il server, permettendoti di eseguire applicazioni Spark ovunque senza ingombranti dipendenze JVM.
Ciao, sono Alex di DEV STORIES DOT EU. PySpark Fundamentals, episodio 2 di 21. Per anni, scrivere codice PySpark in locale significava portarsi dietro una Java Virtual Machine enorme e pesante solo per testare un semplice script. Dovevi sincronizzare perfettamente le versioni di Python, le configurazioni Java e le dipendenze del cluster prima di scrivere una singola riga di logica. La rivoluzione di Spark Connect rende tutto questo completamente obsoleto.
Tradizionalmente, PySpark si basava su un'architettura strettamente accoppiata. Il tuo script Python e l'execution engine di Spark dovevano coesistere sull'esatta stessa macchina fisica o virtuale. Lanciare una sessione PySpark significava avviare una Java Virtual Machine in background usando una bridge library. Questa architettura gravava sul tuo ambiente di sviluppo locale con tutto il peso dell'execution engine di Spark. Rendeva l'integrazione di PySpark in applicazioni web, moderni editor di codice o dispositivi edge altamente impraticabile.
Spark Connect risolve questo problema introducendo un'architettura client-server disaccoppiata. Il tuo ambiente Python è ora rigorosamente separato dal server Spark. Il client PySpark locale diventa una libreria leggera. Non richiede più un'installazione locale di Java e non esegue direttamente i task di data processing. Funge puramente da interfaccia remota per il cluster Spark vero e proprio.
Ecco il punto chiave. Quando scrivi operazioni sui DataFrame con Spark Connect, il client leggero registra i tuoi method call e li traduce in un unresolved logical plan. Puoi immaginare questo piano come un blueprint astratto della tua query, che descrive rigorosamente quali dati elaborare senza preoccuparsi di come vengono elaborati. Il client impacchetta questo blueprint usando Protocol Buffers e lo trasmette tramite una connessione di rete gRPC al server Spark remoto. Il server spacchetta il piano, gestisce tutta la complessa query optimization, esegue il job sul cluster e infine invia in streaming i risultati calcolati al tuo script Python.
Il setup richiede una piccola modifica al modo in cui avvii la tua applicazione. Usi ancora il builder di SparkSession, ma invece di affidarti alle configurazioni locali, chiami il metodo remote. Fornisci una connection string che specifica dove si trova il server Spark. Questa stringa usa un connection scheme dedicato che inizia con le lettere s c. Quindi, se ti connetti a un server di test locale sulla porta di default, fornisci la stringa s c due punti barra barra localhost due punti uno cinque zero zero due. Dopo questo singolo step di connessione, scrivi il codice del tuo DataFrame nello stesso modo in cui hai sempre fatto.
Poiché l'esecuzione è completamente remota, puoi connettere simultaneamente più client Python diversi, da applicazioni diverse, all'esatto stesso server Spark. Il codice della tua applicazione richiede semplicemente delle data transformation, e il lavoro pesante rimane interamente server side.
Isolando completamente il client Python dall'execution runtime, Spark Connect elimina i famigerati dependency conflict che in passato rompevano i deploy, permettendoti di aggiornare gli ambienti delle tue applicazioni in modo completamente indipendente dal cluster Spark stesso.
Grazie per aver passato qualche minuto con me. Alla prossima, stammi bene.
3
DataFrames e Lazy Evaluation
4m 05s
Immergiti nell'astrazione fondamentale di PySpark: il DataFrame. Discutiamo il concetto di lazy evaluation, la differenza tra trasformazioni e azioni, e perché Spark pianifica prima di eseguire.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 3 di 21. Cosa succederebbe se il tuo codice non venisse eseguito nel momento in cui lo scrivi, ma invece aspettasse, analizzasse il tuo obiettivo finale e tracciasse il percorso più veloce possibile? Metti in chain filtri, aggregazioni e join, eppure la tua macchina non fa una piega. Questo perché non fa assolutamente nulla finché non la forzi. Questo meccanismo si chiama lazy evaluation, ed è il motore principale dietro ai DataFrame di PySpark.
Un DataFrame di PySpark è una collection distribuita di dati organizzata in colonne con nome. Se hai familiarità con pandas, il concetto è praticamente identico. La differenza è che un DataFrame di PySpark splitta i suoi dati su più compute node in un cluster. Storicamente, la struttura fondamentale in Spark era il Resilient Distributed Dataset, comunemente noto come RDD. L'ecosistema si è allontanato pesantemente dalla manipolazione raw degli RDD. Infatti, a partire dalla versione 4.0 di Spark, l'utilizzo diretto degli RDD non è più supportato su Spark Connect. I DataFrame sono ora lo standard definitivo, e forniscono un'API rigorosa che permette a Spark di ottimizzare automaticamente le tue query.
Questa ottimizzazione si basa interamente sulla lazy evaluation. Ogni operazione che esegui su un DataFrame rientra in una di due categorie ben definite: una transformation o un'action. Le transformation sono comandi che restituiscono un nuovo DataFrame. Alcuni esempi includono selezionare colonne specifiche, filtrare le righe in base a una condizione, raggruppare i record o fare una join tra due tabelle separate.
Quando applichi una transformation, PySpark non esegue il processing dei dati. Si limita a registrare l'operazione. Aggiorna un blueprint interno chiamato logical execution plan. Puoi scrivere cinquanta transformation consecutive, e Spark si limiterà a validare rapidamente la sintassi e ad aggiornare il suo graph. Ecco il punto chiave. Ritardando l'esecuzione effettiva, PySpark fornisce al suo query engine sottostante, il Catalyst Optimizer, un quadro completo della tua data pipeline. L'optimizer ispeziona l'intera chain di transformation, le riorganizza per la massima efficienza ed elimina completamente i passaggi non necessari prima ancora che un singolo byte di dati venga letto dal disco.
Questo blueprint rimane completamente inattivo finché non invochi un'action. Un'action è un comando che richiede un risultato concreto. O restituisce i dati al tuo driver program, oppure li scrive sullo storage. Le action più comuni includono contare il numero totale di righe, fare il collect dei dati in una lista Python locale, o comandare al sistema di mostrare i primi venti record a schermo. Nel momento in cui triggeri un'action, il motore si mette in moto. Traduce il tuo logical plan ottimizzato in un physical plan, distribuisce i task ai worker del cluster ed esegue la computation.
Considera un data workflow standard. Per prima cosa, crei un DataFrame puntando a un file enorme. Poi, fai una join con una tabella separata con i dettagli degli utenti. Dopo la join, filtri i risultati per includere solo gli utenti di una città specifica. Infine, chiedi a Spark di mostrare l'output. Grazie alla lazy evaluation, Spark non carica effettivamente l'intero file, non esegue una join distribuita massiva, per poi filtrare i risultati alla fine. Invece, l'optimizer analizza la tua richiesta finale, nota il filtro e fa un push di quell'operazione di filtro più in alto nella chain, molto prima che avvenga la join. Legge selettivamente solo i record rilevanti, riducendo drasticamente l'utilizzo di memoria e il traffico di rete nel cluster.
Il tuo script PySpark non è mai una sequenza di comandi immediati. È un set di istruzioni che delineano un blueprint architetturale, e il sistema inizia la costruzione solo quando richiedi finalmente il risultato finale.
Questo è tutto per oggi. Grazie per aver ascoltato — vai a creare qualcosa di fantastico.
4
Creare e visualizzare i DataFrames
3m 34s
Impara come istanziare DataFrames da oggetti Python grezzi, dizionari e file, e come ispezionare in modo sicuro i tuoi dati distribuiti senza far crashare il tuo nodo driver.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 4 di 21. Chiamare un metodo specifico su un dataset di grandi dimensioni è un modo sicuro per mandare in crash l'intera applicazione con un errore di out-of-memory. Sapere come spostare i dati da e verso Spark in modo sicuro senza mandare in crash il tuo driver node è fondamentale. Questo episodio tratta proprio di questo: creare e visualizzare DataFrame.
Ogni applicazione PySpark ha bisogno di dati con cui lavorare. Generalmente, puoi creare i DataFrame in tre modi. Innanzitutto, puoi crearli direttamente da strutture Python in-memory. Ti basta definire una list di dictionary, dove ogni dictionary rappresenta una riga e le chiavi sono i nomi delle colonne, e passarla al metodo createDataFrame sulla tua SparkSession. In secondo luogo, se hai già un DataFrame pandas in-memory, puoi passare esattamente quell'oggetto pandas allo stesso metodo createDataFrame. PySpark gestisce automaticamente la conversione. Il terzo e più comune modo consiste nel leggere da file esterni. Usi l'attributo read sulla tua SparkSession, seguito dal formato desiderato, come csv o json, e fornisci il percorso del file.
Una volta caricati i dati, devi verificarli. I DataFrame di PySpark sono distribuiti, il che significa che non puoi semplicemente stampare la variabile e visualizzare i dati come faresti in un normale script Python. Per visualizzare la struttura dei tuoi dati, chiami il metodo printSchema. Questo genera una struttura ad albero testuale che mostra il nome di ogni colonna e il relativo data type. È il modo più rapido per verificare che il tuo file sia stato caricato correttamente.
Per visualizzare il contenuto effettivo, usi il metodo show. Di default, chiamare show visualizza le prime venti righe in formato tabellare. Presta attenzione a questo aspetto. Se le tue colonne contengono string lunghe, il metodo show le tronca. Puoi disabilitare questa funzione passando un argomento truncate impostato su false, o impostarlo su un numero specifico di caratteri. Se il tuo DataFrame ha decine di colonne, la visualizzazione tabellare standard si estende oltre lo schermo e diventa illeggibile. In tal caso, puoi passare l'argomento vertical impostato su true. Questo stampa ogni riga come un blocco verticale di coppie key-value, rendendo i dataset molto larghi molto più facili da leggere in un terminal.
Ora arriviamo al crash per out-of-memory menzionato in precedenza. A volte devi riportare i dati distribuiti in normali oggetti Python. Il metodo per farlo si chiama collect. Ecco il punto chiave. Il metodo collect prende ogni singola riga da ogni executor dell'intero cluster e la forza nella memoria del tuo singolo driver node. Se il tuo DataFrame contiene un miliardo di righe, il tuo driver andrà in out-of-memory e crasherà immediatamente. Dovresti chiamare collect solo quando hai aggregato o filtrato i tuoi dati fino a ridurli a una dimensione ridotta.
Quando lavori con dataset di grandi dimensioni, estrai sempre campioni più piccoli. Invece di collect, usa il metodo take, passando il numero di righe che desideri. Questo restituisce una list Python standard contenente solo quelle prime righe. Se devi controllare la fine del tuo dataset, usa il metodo tail per recuperare le ultime righe. Entrambi i metodi limitano in modo sicuro la quantità di dati trasferiti al tuo driver.
La regola per i dati distribuiti è semplice: spingi i calcoli sul cluster, ma limita rigorosamente il numero di righe che riporti al driver.
Questo è tutto per questo episodio. Grazie per l'ascolto e continua a sviluppare!
5
Padroneggiare i Data Types di base
3m 42s
Un tour dei tipi numerici e stringa fondamentali di PySpark. Esploriamo come definire esplicitamente gli schemi usando StructType e StructField per data pipelines robuste.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 5 di 21. Affidarti alla schema inference automatica potrebbe farti risparmiare qualche riga di codice, ma ti costerà caro in termini di performance in produzione. Il cluster spesso deve leggere l'intero dataset solo per indovinare cosa c'è dentro prima di fare il vero lavoro. Puoi risolvere questo problema padroneggiando i data type di base e gli schemi espliciti.
È comune confondere i tipi standard di Python con i data type di PySpark. Quando dichiari un integer o una string in Python standard, quell'oggetto vive nella memoria della tua macchina locale. I type di PySpark operano su un livello completamente diverso. Sono istruzioni di mapping per il Catalyst optimizer e la Java Virtual Machine sottostante. Quando usi i data type di PySpark, stai definendo una struttura rigorosa e cluster-aware. Questo garantisce la data consistency su centinaia di worker node distribuiti e detta esattamente come i dati vengono serializzati sulla rete.
PySpark fornisce un type specifico per ogni forma di dato standard, e scegliere quello corretto è fondamentale per le performance. Per i numeri, hai ByteType per gli integer molto piccoli, IntegerType per i numeri standard e LongType per i valori grandi. Scegliere ByteType invece di LongType per un semplice status code fa risparmiare molta memoria quando questa scelta viene moltiplicata su miliardi di righe. Per il testo e la logica, usi StringType e BooleanType.
Gestire il tempo correttamente è un'altra area in cui il typing esatto fa la differenza. PySpark divide i dati temporali in DateType e TimestampType. Usi DateType quando ti interessa solo la data del calendario, come il compleanno di un utente. Usi TimestampType quando ti servono punti esatti nel tempo, tracciando sia la data che l'ora, il minuto e il secondo esatti in cui si è verificato un evento.
Conoscere questi type è solo la base. Devi applicarli direttamente al tuo processo di data ingestion usando uno schema esplicito. Costruisci questo schema usando due oggetti specifici: StructType e StructField. Puoi pensare a uno StructType come al blueprint per un'intera riga nel tuo dataframe. Uno StructField è il blueprint per una singola colonna all'interno di quella riga.
Per costruire uno schema esplicito, istanzi uno StructType e gli passi una collection di StructField. Ogni StructField richiede tre argomenti specifici. Primo, fornisci il nome della colonna come una normale string. Secondo, passi il data type specifico di PySpark che vuoi forzare, come IntegerType o StringType. Terzo, fornisci un flag boolean che indica se questa colonna può contenere valori null.
Ad esempio, costruisci uno schema partendo con uno StructField chiamato user identifier, assegnato a uno StringType, e imposti il flag null a false. Fai seguire questo da uno StructField chiamato account age, assegnato a un IntegerType, impostando il flag null a true. Una volta che questo oggetto StructType è completamente assemblato, lo passi direttamente al tuo dataframe reader usando il metodo schema prima di chiamare il comando load per leggere i tuoi file.
Questa è la parte che conta. Quando fornisci questo schema esplicito in anticipo, PySpark salta completamente la fase di data scanning. Applica il tuo blueprint direttamente al data stream in ingresso. Questo riduce drasticamente il tempo necessario per leggere un file. Funziona anche come un quality gate immediato. Se arriva un file malformato con del testo nella tua colonna integer, la pipeline lo gestisce in base alla struttura che hai definito, invece di passare silenziosamente lo schema inferito a valle e rompere le tue trasformazioni.
Definire il tuo schema esplicitamente trasforma un'operazione di lettura fragile e costosa in uno step della pipeline prevedibile e altamente ottimizzato.
Grazie per l'ascolto, happy coding a tutti!
6
I pericoli della precisione
4m 21s
Scopri le differenze critiche tra FloatType, DoubleType e DecimalType. Impara perché scegliere il tipo numerico sbagliato può introdurre disastrosi errori di arrotondamento nei dati finanziari.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 6 di 21. Usare un float standard potrebbe sembrare innocuo, finché la tua query di aggregazione non sbaglia silenziosamente il calcolo di milioni di transazioni finanziarie. Codice che gira perfettamente può produrre numeri leggermente, ma pericolosamente, sbagliati. Questo è esattamente il motivo per cui dobbiamo parlare dei pericoli della precisione.
In PySpark, hai tre modi principali per memorizzare numeri con parti frazionarie. Hai FloatType, DoubleType e DecimalType. Non sono intercambiabili. Un errore comune è lasciare che PySpark deduca uno schema dai tuoi dati grezzi. L'inferenza di solito assegna DoubleType a qualsiasi numero con un punto decimale. Se stai calcolando ricavi finanziari, affidarti a questo comportamento di default è un serio rischio operativo.
Per capire il perché, dobbiamo guardare come funzionano FloatType e DoubleType sotto il cofano. FloatType usa la matematica floating-point IEEE 754 a 32 bit. DoubleType usa la versione a 64 bit dello stesso standard. Entrambi rappresentano i numeri come frazioni binarie. Pensa a come la frazione un terzo non possa essere scritta perfettamente usando i decimali in base dieci. Diventa una stringa infinita di tre. La stessa identica limitazione esiste in binario. I numeri decimali comuni, come zero punto uno o zero punto due, non possono essere rappresentati perfettamente in base due. Il computer memorizza una minuscola approssimazione.
Con DoubleType, ottieni 64 bit di spazio, il che significa che l'approssimazione è incredibilmente vicina al numero reale. Se fai una query su una singola riga di dati, noterai raramente la differenza. Ecco il punto chiave. L'errore si accumula durante le aggregazioni. Quando calcoli i ricavi finanziari totali sommando miliardi di singole righe, quelle imprecisioni microscopiche si sommano. Una frazione di centesimo persa o guadagnata su ogni transazione finisce per falsare il totale aggregato finale di migliaia o addirittura milioni di dollari. La tua logica di aggregazione è matematicamente corretta, ma il data type sottostante corrompe il risultato.
Se il tuo sistema sta calcolando simulazioni fisiche o addestrando modelli di machine learning, FloatType e DoubleType sono esattamente quello che vuoi. Scambiano l'esattezza per un'elaborazione hardware ad alta velocità. Ma nel momento in cui gestisci denaro, hai bisogno di una precisione rigorosa e inflessibile.
Questo ci porta a DecimalType. DecimalType non usa approssimazioni floating-point. Memorizza i numeri esattamente come li definisci tu, usando una scala fissa. Quando configuri un DecimalType, definisci due parametri distinti. Primo, specifichi la precision, che è il numero massimo totale di cifre che il valore può contenere. Secondo, specifichi la scale, che detta il numero esatto di cifre consentite a destra del punto decimale.
Se imposti un DecimalType con una precision di dieci e una scale di due, PySpark alloca lo spazio esatto necessario per memorizzare quel valore fino al centesimo. Non ci sono frazioni binarie né arrotondamenti approssimativi.
In pratica, implementi questo prendendo il controllo rigoroso dei tuoi schema. Quando leggi record finanziari da un file sorgente, non lasciare che PySpark indovini i tipi. Primo, crei un oggetto schema rigoroso. Poi, definisci i tuoi campi finanziari come ricavi o tasse. Infine, assegni loro esplicitamente un DecimalType con la precision e la scale che hai scelto. Una volta che il tuo dataframe viene caricato con questo schema, le tue aggregazioni standard di somma o media verranno eseguite perfettamente dalla prima riga fino alla miliardesima. Sacrifichi una piccola quantità di performance di calcolo rispetto a un DoubleType standard, ma garantisci che il tuo reporting finanziario sia assolutamente impeccabile.
La regola è semplice: usa i tipi floating-point per velocità e approssimazioni scientifiche, ma nel momento in cui un numero rappresenta una valuta, bloccalo con un DecimalType. Grazie per averci ascoltato. Alla prossima!
7
Domare dati complessi e annidati
4m 04s
I Big Data non sono sempre piatti. Esploriamo i tipi di dati complessi di PySpark, inclusi ArrayType, StructType e MapType, che ti permettono di analizzare nativamente JSON profondamente annidati.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 7 di 21. I big data del mondo reale raramente si presentano come un semplice spreadsheet piatto. A volte, ti serve un array di dictionary annidati anche solo per fare il parse di un singolo evento JSON. Per gestire questa situazione, dobbiamo parlare di come domare dati complessi e annidati.
I workflow relazionali preferiscono tabelle piatte, ma i dati degli eventi moderni arrivano con una struttura fortemente annidata. PySpark gestisce questo problema fornendo tre tipi di dati complessi: ArrayType, StructType e MapType. Questi ti permettono di modellare esplicitamente le strutture gerarchiche in modo nativo nell'engine. Prendi un profilo cliente standard per vedere come funzionano questi tipi.
Il primo concetto è ArrayType. Rappresenta una collection di elementi. La regola fondamentale è che ogni elemento all'interno di un ArrayType deve condividere esattamente lo stesso tipo di dato sottostante. Non puoi mescolare string e integer nello stesso array. Se il tuo profilo cliente include una lista di ID degli ordini recenti, definisci quella colonna come un ArrayType che contiene integer.
Poi c'è lo StructType. Uno StructType modella un record gerarchico annidato, funzionando essenzialmente come una riga embedded in un'altra riga. Contiene campi specifici con un nome. A differenza di un array, ogni campo all'interno di uno StructType può avere un tipo di dato completamente diverso. Supponi che il tuo cliente abbia un indirizzo. Quell'indirizzo contiene il nome della via come string, il codice postale come integer e un flag boolean che indica se si tratta di un immobile commerciale. Tu raggruppi questi campi distinti in un unico StructType.
Ecco il punto chiave. Puoi annidare questi tipi complessi a qualsiasi livello di profondità. Se un cliente ha più indirizzi, non crei colonne piatte e numerate. Crei invece un ArrayType in cui il tipo di elemento interno è esattamente quello StructType dell'indirizzo. Ora hai un array di struct, che si mappa perfettamente a un array JSON standard di oggetti.
La terza struttura è MapType, progettata specificamente per coppie key-value. Si differenzia da uno StructType per il modo in cui gestisce la struttura rispetto allo schema. Uno StructType ti richiede di fare l'hardcode dei nomi esatti dei campi fin dall'inizio. Un MapType è flessibile per quanto riguarda il contenuto dei dati, ma rigoroso per i tipi di dato. Ogni key nella mappa deve essere di un tipo specifico e ogni value deve essere di un altro tipo specifico.
Potresti usare un MapType per memorizzare le preferenze dell'applicazione del cliente. Le key potrebbero essere delle string, come tema o lingua, e anche i value potrebbero essere delle string, come dark o inglese. Trattandosi di un MapType, l'applicazione upstream può iniettare key di preferenza completamente nuove in un secondo momento, senza costringerti a modificare lo schema principale del DataFrame. Ti basta fare una query sui value in modo dinamico tramite le rispettive key.
Quando costruisci questo schema complesso nel tuo codice, lo fai dall'interno verso l'esterno. Per prima cosa, definisci i campi interni dello StructType dell'indirizzo. Poi, passi quella struct completata a una definizione di ArrayType. Successivamente, definisci il MapType per le preferenze dell'utente. Infine, racchiudi tutti questi componenti, insieme a semplici tipi scalar come la string del nome del cliente, in un unico StructType master che definisce l'intera riga del DataFrame.
Invece di appiattire le strutture annidate in string JSON disordinate, definire esplicitamente questi schemi complessi permette all'optimizer di Spark di fare pruning dei dati e filtrare in profondità all'interno dei campi annidati, senza deserializzare l'intero payload in memoria.
Grazie per l'ascolto, alla prossima.
8
Type Casting e selezione
3m 34s
Impara a modellare attivamente gli schemi dei tuoi DataFrame. Trattiamo come selezionare sottoinsiemi di colonne e come fare il cast sicuro delle colonne da un tipo di dato a un altro.
Ciao, sono Alex di DEV STORIES DOT EU. PySpark Fundamentals, episodio 8 di 21. Un semplice valore string nascosto in una colonna di interi può mandare in tilt un cluster da mille nodi. Hai bisogno di un modo affidabile per forzare le corrette strutture dati e scegliere esattamente quali dati passano attraverso la tua pipeline, ed è per questo che oggi parliamo di Type Casting e Selection.
Per manipolare i dati in PySpark, devi prima capire cosa sia effettivamente una colonna. Un'istanza di colonna non è un array fisico di dati caricato in memoria. È una rappresentazione lazily evaluated di un'espressione. Quando fai riferimento a una colonna nel tuo codice, non stai toccando i dati sottostanti. Stai semplicemente aggiungendo un passaggio al logical plan di Spark. I dati si muovono solo quando viene attivata un'action in un secondo momento.
Per recuperare e modellare questi dati, usi il metodo select sul tuo DataFrame. Hai due modi principali per dire al metodo select quali colonne vuoi. Il modo più semplice è passare i nomi delle colonne come normali string di testo. Se passi una string a select, Spark restituisce un nuovo DataFrame che contiene esattamente quella colonna, completamente invariata. Questo funziona bene per l'estrazione di base, ma non offre spazio per le modifiche.
Per modificare i dati durante la selection, devi usare oggetti Column invece di string. Accedi a un oggetto Column facendovi riferimento direttamente dal DataFrame. Puoi farlo usando la dot notation, come dataframe dot age, oppure usando la bracket notation con il nome della colonna come string all'interno delle parentesi. La bracket notation è particolarmente utile quando i nomi delle tue colonne contengono spazi o caratteri speciali che romperebbero la normale dot notation.
Questa è la parte che conta. Quando passi un oggetto Column nel metodo select, puoi attaccarci dei metodi per trasformare i dati al volo. Una delle trasformazioni più critiche è la type conversion. I dati spesso arrivano nel formato sbagliato. Ad esempio, potresti ricevere metriche numeriche formattate come string di testo. Per correggere questo problema, usi il metodo cast. PySpark fornisce anche un alias chiamato astype, che esegue esattamente la stessa logica.
Chiami il metodo cast direttamente sul tuo oggetto Column all'interno del select statement. Il metodo cast richiede un argomento, che è il data type target. Puoi definire questo target passando una rappresentazione a string del tipo, come la parola int, oppure passando uno specifico oggetto data type di Spark, come IntegerType.
Ecco come funziona in un vero script. Chiami il metodo select sul tuo DataFrame. All'interno delle parentesi di quel metodo, fai riferimento alla tua colonna target usando la bracket notation. Subito accanto a quel riferimento alla colonna, chiami dot cast e fornisci il tuo nuovo tipo. Quando viene valutato, questo restituisce un DataFrame completamente nuovo in cui la tua colonna selezionata è ora convertita in modo sicuro al tipo specificato. Il DataFrame originale rimane del tutto intatto perché i DataFrame sono immutabili.
Il concetto chiave è che il type casting in PySpark non è un processo a sé stante applicato in place a un dataset esistente. È una column expression lazily evaluated, intrinsecamente legata all'atto di selezionare i dati per costruire un nuovo DataFrame strongly typed.
Se ti piace il podcast e vuoi aiutare a supportare lo show, puoi cercare DevStoriesEU su Patreon. Questo è tutto per questo episodio. Grazie per l'ascolto, e continua a sviluppare!
9
Function Junction: Pulire i dati sporchi
3m 58s
Garbage in, garbage out. Impara le trasformazioni essenziali dei DataFrame per eliminare i valori nulli, riempire i valori mancanti e gestire i record NaN in modo nativo nei sistemi distribuiti.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 9 di 21. Garbage in, garbage out. Ma cosa fai quando il tuo dataset spazzatura pesa centinaia di terabyte e non puoi ispezionare manualmente una singola riga? Hai bisogno di un metodo sistematico per sanitizzarlo su larga scala. È esattamente quello che vedremo oggi in Function Junction: Cleaning Dirty Data.
Il primo passo nella pulizia di solito è standardizzare il tuo schema. Spesso riceverai file raw con spazi, caratteri speciali o errori di battitura negli header. Usa il metodo chiamato with column renamed. Gli passi semplicemente il vecchio nome come string e il nuovo nome desiderato come string. Se hai diverse colonne da sistemare, fai il chain di questo metodo in sequenza per ogni colonna prima di applicare qualsiasi trasformazione complessa a valle.
Prima di rimuovere i bad data, dobbiamo chiarire una frequente confusione riguardo a null e NaN in PySpark. Null significa che un dato manca completamente. NaN sta per Not a Number, che rappresenta un risultato matematico non definito, come dividere zero per zero. In Python puro, questi richiedono una gestione separata. Tuttavia, PySpark li raggruppa per comodità. Quando usi le funzioni N A del dataframe, Spark valuta i valori NaN come null per quanto riguarda il drop o il fill.
Per eliminare le righe con valori mancanti, usi il metodo N A dot drop. Chiamare questa funzione completamente vuota fa il drop di qualsiasi riga che contenga un null o un NaN in una singola colonna. Questo approccio è altamente distruttivo sui dataset molto larghi. Un singolo valore mancante in una colonna di metadati opzionale cancellerà una riga di dati di transazione altrimenti perfetti. Per evitarlo, passa una lista di nomi di colonne al parametro subset. PySpark valuterà quindi solo quelle specifiche colonne critiche quando decide se fare il drop della riga.
Fare il drop delle righe non è sempre permesso dalle regole di business. Spesso devi sostituire i valori mancanti con dei default sicuri. Puoi farlo usando N A dot fill. Anche se puoi passare un singolo valore per fare il fill di tutte le colonne, l'approccio migliore è passare un dictionary. Le key del dictionary rappresentano i nomi specifici delle colonne, e i value rappresentano le sostituzioni che hai scelto. Questo pattern ti permette di fare il fill di una metrica numerica mancante con uno zero, sostituendo contemporaneamente una categoria mancante con una string di testo come unknown. Farlo tramite un dictionary viene eseguito in un singolo passaggio, il che è altamente efficiente.
Infine, i tuoi dati potrebbero essere completamente popolati ma comunque non validi. Gli outlier e i valori fisicamente impossibili richiedono un filtraggio logico. Isoli i dati buoni usando il metodo where per mantenere solo le righe che soddisfano una condizione specifica. Per i limiti numerici o di data, il metodo between è il tuo strumento migliore. Selezioni la tua colonna, chiami between e fornisci i limiti inferiore e superiore. Questo sostituisce la verbosa logica del maggiore e minore, rendendo il tuo codice più facile da leggere. Qualsiasi riga che cade fuori da quei limiti viene filtrata via dal dataframe risultante.
Ecco il punto chiave. L'ordine conta moltissimo quando fai pulizia su larga scala. Rinomina sempre prima le colonne per bloccare il tuo schema, fai il drop o il fill dei valori mancanti subito dopo per stabilizzare i tuoi data type, e filtra gli outlier per ultimi, solo quando sai che i dati sottostanti sono strutturalmente sani.
Questo è tutto per questo episodio. Grazie per l'ascolto, e continua a sviluppare!
10
Trasformare e rimodellare i dati
4m 19s
Prendi il controllo della forma dei tuoi dati. Esploriamo come generare nuove colonne con funzioni matematiche, eseguire manipolazioni di stringhe e appiattire array annidati usando explode.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 10 di 21. A volte una singola riga di dati contiene un array di record nascosti, e devi far esplodere quell'array per analizzarlo correttamente. Trasformare e fare il reshaping dei dati è il modo in cui estrai, formatti e strutturi quelle informazioni per il processing downstream.
Quando devi modificare un dataframe in PySpark, non modifichi i dati in place. I dataframe sono immutabili. Crei invece nuove versioni usando un metodo chiamato withColumn. Questo metodo accetta due argomenti. Il primo è una string che rappresenta il nome della colonna che vuoi creare o sostituire. Il secondo è una column expression che definisce i dati effettivi. Se fornisci un nome già presente nel dataframe, PySpark sovrascrive la colonna originale. Se il nome è completamente nuovo, PySpark fa l'append della nuova colonna sul lato destro del tuo dataset.
Per definire cosa va in quella nuova colonna, in genere usi le funzioni built-in di PySpark. Queste sono importate dal modulo SQL functions e forniscono operazioni altamente ottimizzate che vengono eseguite su tutto il tuo cluster. Pensa alla manipolazione delle string. I dati di testo provenienti da fonti esterne raramente sono formattati in modo perfetto. Potresti avere una colonna che contiene nomi utente scritti con un mix imprevedibile di lettere maiuscole e minuscole. Puoi risolvere questo problema passando la tua colonna esistente a una funzione built-in come lower, che forza tutto il testo in minuscolo. In alternativa, puoi usare una funzione di capitalizzazione per assicurarti che la prima lettera sia maiuscola e il resto sia minuscolo.
In pratica, integri queste operazioni direttamente nelle trasformazioni del tuo dataframe. Chiami withColumn, dai un nome alla tua colonna di destinazione e le assegni il risultato della funzione lower applicata alla tua colonna di input. PySpark valuta questa espressione per ogni singola riga. Puoi concatenare più chiamate a withColumn per applicare diverse trasformazioni in sequenza, passando ogni volta il dataframe progressivamente aggiornato allo step successivo.
Ora, il secondo aspetto di tutto questo è il reshaping. Pulire le string cambia i valori, ma cosa succede quando la forma fondamentale dei tuoi dati impedisce l'analisi? È qui che la cosa si fa interessante. Potresti ricevere un dataset in cui l'identificativo di una persona è in una colonna, e i suoi redditi mensili per l'intero anno sono impacchettati in un singolo array nella colonna adiacente. Non puoi eseguire aggregazioni relazionali standard su un array annidato. Ti serve che ogni singolo valore di reddito sia sulla propria riga per calcolare le medie o trovare i minimi.
Risolvi questo problema strutturale usando una funzione built-in chiamata explode. La funzione explode gestisce specificamente array e map. Chiami withColumn, specifichi il nome della colonna che vuoi per l'output e passi la funzione explode che racchiude la tua colonna array. PySpark esegue questa operazione prendendo la singola riga originale e aprendola. Se l'array dei redditi contiene dodici valori distinti, explode genera dodici righe completamente separate.
Nel nuovo dataframe, la colonna di destinazione ora contiene un singolo valore di reddito piatto per riga, invece di una list. Cosa fondamentale, PySpark duplica tutte le altre colonne dalla riga originale. L'identificativo dell'utente viene copiato esattamente in tutte le dodici nuove righe. La relazione logica tra l'utente e il suo reddito rimane perfettamente intatta, ma i dati ora sono piatti. Hai rimodellato una struttura annidata in una tabella lunga, pronta per le operazioni standard di grouping e filtering.
Il vero potere delle trasformazioni PySpark è che funzioni come explode e lower non si limitano a manipolare i singoli valori; definiscono un piano di computazione logico che scala istantaneamente, sia che tu abbia cento righe o cento miliardi di righe, senza mai richiederti di scrivere un singolo loop manuale. Per questo episodio è tutto. Alla prossima!
11
Le meccaniche di raggruppamento e aggregazione
3m 37s
Padroneggia la strategia split-apply-combine. Ci immergiamo nel raggruppamento dei dati per chiavi e nell'applicazione di potenti funzioni di aggregazione per riassumere enormi dataset.
Ciao, sono Alex di DEV STORIES DOT EU. PySpark Fundamentals, episodio 11 di 21. Quando ti trovi davanti a miliardi di record singoli, leggerli riga per riga è impossibile. Per estrarne un vero significato, devi riassumerli. Oggi vedremo esattamente come funziona: i meccanismi di grouping e aggregazione.
Dietro le quinte, PySpark elabora le aggregazioni usando una classica strategia per i dati chiamata split-apply-combine. Questo pattern è esattamente quello che sembra. Per prima cosa, PySpark divide l'enorme dataset in bucket logici distinti, basati su una chiave che tu scegli. Poi, applica un calcolo specifico a ogni singolo bucket in modo indipendente su tutto il cluster. Infine, combina queste risposte indipendenti in un unico risultato riassuntivo.
Nel tuo codice, attivi la fase di split chiamando il metodo group by sul tuo DataFrame. Ti basta fornire il nome della colonna che vuoi usare come chiave di grouping. Per esempio, se hai una tabella enorme di transazioni storiche, potresti fare un group by sulla colonna del nome utente.
Ecco il punto chiave. Chiamare group by non restituisce un nuovo DataFrame. Restituisce invece un costrutto transitorio chiamato oggetto GroupedData. Dato che PySpark valuta il tuo codice in modo lazy, ha solo costruito l'execution plan per organizzare questi bucket. Non sposterà effettivamente nessun dato finché non gli dirai quale operazione matematica eseguire su quei bucket.
Per fornire questa operazione matematica, metti in chain il metodo aggregate, di solito scritto come agg, direttamente sui tuoi dati raggruppati. Questo gestisce le fasi di apply e combine. All'interno del metodo aggregate, dici a PySpark cosa calcolare usando gli strumenti del modulo PySpark SQL functions. Questo modulo contiene decine di operazioni di aggregazione ottimizzate.
Diciamo che vuoi calcolare il reddito medio per ciascuno di quegli utenti. Importeresti la funzione average, di solito chiamata avg. Passi il nome della tua colonna del reddito alla funzione average, e la inserisci all'interno del metodo aggregate. Quando viene eseguito, PySpark calcola simultaneamente il reddito medio per ogni distinto bucket di utenti. A questo punto entra in gioco la fase di combine, che restituisce un DataFrame standard e leggibile. Questo nuovo DataFrame contiene una sola riga per utente, abbinata al suo reddito medio appena calcolato.
A questo punto, hai una tabella riassuntiva perfetta. Tuttavia, dato che il calcolo è avvenuto in parallelo su un cluster distribuito, le righe finali vengono restituite nell'ordine casuale in cui i nodi di elaborazione hanno finito il loro lavoro. Se hai bisogno di vedere i redditi più alti, l'ordine casuale è inutile.
Per risolvere questo problema, metti in chain il metodo order by alla fine del tuo step di aggregazione. Passi al metodo order by la colonna che contiene le tue nuove medie, e gli dici di ordinare in modo decrescente. PySpark prenderà i risultati combinati, li classificherà e ti consegnerà una tabella pulita e ordinata.
Il pattern split-apply-combine è potente proprio perché si adatta perfettamente all'hardware distribuito, permettendo di riassumere enormi dataset in pochi secondi. Ma ricordati che fare grouping dei dati è solo metà dell'operazione. Il grouping richiede un'aggregazione per finire il lavoro, altrimenti ti ritrovi solo con un cluster pieno di bucket vuoti in attesa di istruzioni.
Grazie per aver passato qualche minuto con me. Alla prossima, stammi bene.
12
Quando i DataFrames si scontrano: L'arte del Joining
3m 39s
Navigare tra le sfumature della combinazione di dataset. Analizziamo i sette diversi tipi di join in PySpark e spieghiamo come unire i DataFrames in modo sicuro.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 12 di 21. Unire due tabelle enormi è l'operazione in assoluto più costosa nel calcolo distribuito. Applica la logica di matching sbagliata, e diventa il modo più facile per far crashare il tuo cluster esaurendo la memoria. Sapere esattamente come combinare i dataset in modo sicuro è proprio l'argomento di When DataFrames Collide: The Art of Joining.
Il meccanismo principale per combinare i dati in PySpark è il metodo join. Lo chiami sul tuo DataFrame di base, passandogli il DataFrame che vuoi attaccare, la colonna o le colonne specifiche su cui fare matching, e il metodo di join. Se non specifichi nessun metodo di join, PySpark usa di default un inner join.
Considera uno scenario concreto. Hai un DataFrame che registra le altezze delle persone, e un secondo DataFrame che registra i loro redditi. Entrambi i dataset condividono una colonna chiamata name.
Con un inner join, PySpark guarda la colonna name in entrambi i dataset e tiene solo le righe in cui il name esiste in tutti e due i posti. Se una persona appare nei dati delle altezze ma manca in quelli dei redditi, il suo record viene completamente scartato dal risultato.
Per mantenere i record senza match, cambi il tipo di join. Un left join mantiene ogni riga del tuo DataFrame di partenza, che in questo caso sono i dati delle altezze. Se PySpark trova un name corrispondente nei dati dei redditi, aggiunge quel reddito. Se non trova un match, mantiene la riga dell'altezza ma inserisce un valore null nella colonna del reddito. Un right join fa l'esatto opposto, mantenendo tutti i redditi e riempiendo le altezze mancanti con dei null.
Quando ti serve assolutamente tutto, usi un full join. PySpark mantiene ogni record da entrambi i DataFrame. I name che fanno match vengono uniti in una singola riga, e tutti i name che esistono in un solo dataset vengono mantenuti, con valori null a riempire i dati mancanti dall'altra parte.
Ecco il punto chiave. Un cross join funziona diversamente perché ignora del tutto la condizione di join. Accoppia ogni singola riga nel DataFrame delle altezze con ogni singola riga nel DataFrame dei redditi, creando un prodotto cartesiano. Se entrambe le tabelle hanno solo mille righe, un cross join restituisce in output un milione di righe. Questa crescita esplosiva è il motivo per cui i cross join sono fortemente limitati di default, e spesso richiedono una configurazione esplicita per essere eseguiti senza lanciare un errore.
Gli ultimi due tipi di join sono in realtà operazioni di filtraggio, più che veri e propri merge di dati. Un left semi join cerca dei match, restituendo le righe dal DataFrame delle altezze solo se il name appare anche nel DataFrame dei redditi. La differenza cruciale rispetto a un inner join è che un left semi join non si porta dietro nessuna colonna dal lato destro. Ti ritrovi con le stesse identiche colonne con cui sei partito, solo filtrate per i record che hanno un match corrispondente.
Un left anti join fa l'esatto opposto. Restituisce le righe dal DataFrame delle altezze solo se il name non esiste nei dati dei redditi. Scarta completamente le colonne del lato destro. Questo rende il left anti join il modo più efficiente per identificare i dati mancanti o trovare i record che hanno fallito l'elaborazione in downstream. La scelta del join determina non solo quali dati ti tornano indietro, ma anche quanti dati si devono muovere fisicamente attraverso la tua rete per generare il risultato.
Grazie per l'ascolto. Alla prossima!
13
Vecchio SQL, nuovi trucchi
3m 35s
Perché imparare una nuova API quando puoi usare SQL grezzo? Impara a eseguire query SQL standard direttamente sui DataFrames distribuiti di PySpark.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 13 di 21. Hai un team di analisti che scrivono eccellente SQL, ma i tuoi dati si trovano su un enorme cluster distribuito. Potresti costringerli a imparare una sintassi Python completamente nuova, oppure potresti lasciar loro usare il linguaggio che già conoscono. È qui che entra in gioco l'esecuzione di stringhe SQL raw direttamente in PySpark, insegnando al vecchio SQL nuovi trucchi.
PySpark ti offre un ponte diretto con l'SQL standard tramite un singolo metodo nella tua Spark session, chiamato semplicemente sql. A questo metodo passi una stringa SQL raw. L'output non è plain text. È un DataFrame standard di PySpark. Ciò significa che puoi eseguire una query di database standard, ottenere un DataFrame in risposta e passarlo immediatamente a un'altra funzione Python. È completamente interoperabile.
Prima di poter interrogare i dati con SQL, PySpark deve sapere quali tabelle esistono. Hai due modi principali per esporre i tuoi dati al motore SQL. Primo, se hai già un DataFrame in Python, puoi chiamare un metodo per registrarlo come temporary view. Gli dai un nome come stringa, e all'improvviso si comporta come una tabella nelle tue query SQL. Secondo, puoi creare tabelle interamente all'interno della tua stringa SQL. Passi uno statement create table al metodo sql. All'interno di quella stringa, definisci lo schema e dici a PySpark esattamente dove si trovano i file di dati sottostanti, come ad esempio un percorso di cloud storage che contiene file Parquet. PySpark lo registra nel suo catalog interno. Da quel momento in poi, puoi interrogarla per nome proprio come una tabella di database tradizionale.
Confronta come appare la stessa logica in entrambi gli approcci. Mettiamo che tu debba recuperare i nomi dei clienti, scartare chiunque abbia un balance pari a zero, e fare il merge del risultato con una tabella orders. Nella DataFrame API, costruisci una chain di metodi Python. Chiami select sul tuo dataset dei clienti per prendere la colonna name. Poi metti in chain un metodo filter, controllando se il balance è maggiore di zero. Infine, aggiungi un metodo join che fa riferimento al dataset orders su una chiave corrispondente. È altamente programmatico. Nell'approccio SQL, scrivi uno statement select standard che estrae la colonna name, aggiungi una clausola where per il balance, e scrivi una inner join per la tabella orders. Rimane nel tuo script come un singolo blocco stringa leggibile.
Ecco il punto chiave. C'è un malinteso comune secondo cui scrivere SQL all'interno di stringhe Python debba essere più lento o meno nativo rispetto all'uso dei metodi strutturati dei DataFrame. Questo è falso. Sia che tu metta in chain metodi Python o che tu passi una stringa SQL raw, PySpark li tratta in modo identico. Entrambi gli input vengono immediatamente parsati, tradotti nell'esatto stesso logical plan, e passati al Catalyst optimizer. L'execution engine non sa e non gli importa quale API hai usato per esprimere il tuo intento. Le performance sono esattamente le stesse.
La scelta tra la DataFrame API e l'SQL raw non riguarda mai le performance del cluster. Riguarda puramente ciò che rende il tuo team più veloce e la tua codebase più facile da mantenere.
Grazie per essere stato con me. Spero tu abbia imparato qualcosa di nuovo.
14
Interscambiare DataFrames e SQL
3m 42s
Mescola e abbina SQL con Python senza problemi. Scopri come creare viste temporanee dai DataFrames, usare selectExpr e concatenare operazioni programmatiche ai risultati delle query SQL.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 14 di 21. Potresti ritrovarti bloccato in un dibattito su se scrivere le tue trasformazioni dei dati in Python o in SQL. Forzare una scelta rigida tra i due significa lasciare sul tavolo un'enorme quantità di potenziale. Il vero vantaggio sta nell'alternare DataFrame e SQL in modo fluido all'interno della stessa identica pipeline.
A volte, un set complesso di nested join è molto più facile da leggere e mantenere per il tuo team in raw SQL. Altre volte, hai bisogno di iterare dinamicamente sui nomi delle colonne, cosa impossibile in pure SQL ma banale in Python. PySpark ti permette di combinare entrambi gli approcci senza interrompere il tuo data flow.
Per iniziare a scrivere SQL su un DataFrame Python esistente, devi prima esporre quel DataFrame all'engine Spark SQL. Lo ottieni chiamando il metodo create or replace temp view direttamente sul tuo DataFrame. Passi un singolo argomento string, che diventa il nome della tabella. Questa operazione non sposta alcun dato. Non scrive su disco. Registra semplicemente un puntatore temporaneo nella tua Spark session corrente. L'engine SQL ora sa come risolvere quel nome della tabella riportandolo al tuo DataFrame Python.
Ora puoi fargli una query. Chiami spark dot sql e passi il tuo statement select standard come string, facendo riferimento al nome della tabella che hai appena creato.
Ecco il punto chiave. L'output di quella chiamata spark dot sql non è un risultato di testo statico, né è un tipo diverso di oggetto. Restituisce un DataFrame PySpark standard. Questo significa che puoi mettere in chain immediatamente i normali metodi del DataFrame Python direttamente alla fine della tua chiamata SQL. Puoi scrivere una string SQL di cinquanta righe per gestire una window function complessa, chiudere la parentesi di spark dot sql e appendere immediatamente un metodo dot filter o dot group by. Passi da Python a SQL e di nuovo a Python in un singolo blocco di codice.
Se ti serve SQL solo per il calcolo di una colonna specifica, registrare una temporary view completa non è necessario. Invece, usi il metodo select expression. Questo metodo fa da ponte. Funziona esattamente come un metodo select standard del DataFrame, ma accetta espressioni string in raw SQL invece di oggetti column Python.
Se devi eseguire uno statement case-when, applicare funzioni matematiche o fare il cast di un data type usando la sintassi SQL nativa, passi quelle esatte string SQL dentro select expression. Spark prende quelle string, ne fa il parse e le esegue esattamente come farebbe all'interno di una query SQL completa. Questo ti permette di rimanere interamente all'interno dell'API DataFrame chainable, pur affidandoti alla sintassi SQL per la logica complessa a livello di riga.
Il confine tra questi due paradigmi è completamente artificiale. Che tu metta in chain metodi Python, scriva query in raw SQL o utilizzi string di select expression, Spark compila tutto nello stesso identico execution plan ottimizzato.
Se ti va di aiutarci a continuare a creare questi episodi, puoi cercare DevStoriesEU su Patreon per supportare lo show. Questo è tutto per questo episodio. Grazie per l'ascolto, e continua a sviluppare!
15
Estendere Spark con le UDFs di Python
3m 38s
Quando le funzioni integrate non bastano, entrano in gioco le User-Defined Functions. Esploriamo come scrivere logica Python personalizzata per i DataFrames e perché le UDFs scalari standard nascondono una penalità di performance.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 15 di 21. Scrivi una funzione personalizzata in Python, la integri nella tua data pipeline e funziona perfettamente su un piccolo campione. Ma quando la esegui sull'intero dataset, il job rallenta drasticamente mentre l'utilizzo della CPU schizza alle stelle. Il codice in sé è corretto, ma stai pagando una tassa di esecuzione nascosta. Oggi parliamo di come estendere Spark con le Python UDF.
Una User Defined Function, o UDF, ti permette di eseguire logica Python personalizzata direttamente su uno Spark DataFrame. La usi quando le funzioni built-in di Spark SQL non coprono la tua specifica logica di business. Il processo è semplice. Inizi scrivendo una funzione Python standard. Ad esempio, scrivi una funzione che prende una string di testo, applica una complessa regola di formattazione personalizzata e restituisce la string modificata. Per far sì che Spark riconosca questa funzione, importi la funzione udf dal modulo PySpark SQL functions e la applichi come decorator direttamente sopra la definizione della tua funzione Python. Passi anche un return type al decorator, come ad esempio uno string type o un integer type. Se non fornisci un return type, Spark usa di default uno string type, il che può causare problemi di dati silenziosi se la tua funzione restituisce effettivamente un numero. Una volta decorata, la tua funzione Python personalizzata si comporta esattamente come una funzione nativa di Spark. Puoi passarla alle operazioni del DataFrame, come un select statement, passandole i nomi delle colonne come argomenti.
Ecco il punto chiave. Una scalar Python UDF standard opera rigorosamente una riga alla volta. Prende in input uno o più valori di colonna da una singola riga, valuta la tua logica Python personalizzata e restituisce esattamente un valore di output per quella specifica riga. Se il tuo DataFrame contiene dieci milioni di righe, la tua funzione Python viene invocata dieci milioni di volte separate. Questa operazione riga per riga è facile da comprendere, ma crea l'enorme collo di bottiglia prestazionale che abbiamo menzionato all'inizio.
Per capire perché è così lenta, devi guardare come Spark esegue il codice dietro le quinte. Spark è costruito in Scala, il che significa che il suo core engine gira all'interno di una Java Virtual Machine, o JVM. La tua UDF personalizzata è scritta in Python. La JVM non può eseguire codice Python in modo nativo. Per applicare la tua UDF, Spark è costretto ad avviare processi Python worker separati, affiancati ai propri executor. Deve quindi spostare fisicamente i dati fuori dal memory space della JVM e dentro il processo Python. Spark si affida a una serialization library Python chiamata cloudpickle per gestire questo complesso trasferimento.
È qui che si paga la tassa sulle prestazioni. Per ogni singola riga nel tuo dataset, Spark serializza gli input nella JVM, invia quei dati binari attraverso un local socket al Python worker, e li deserializza in oggetti Python standard. La tua funzione personalizzata finalmente gira su quegli oggetti. Poi, l'intero ciclo avviene al contrario. Python serializza il valore di output usando cloudpickle, lo rimanda indietro attraverso il socket, e la JVM lo deserializza di nuovo nell'internal memory format di Spark. Questa costante serializzazione e deserializzazione tra Java e Python è incredibilmente costosa.
Il vero costo di una Python UDF standard raramente è la logica che scrivi; è il silenzioso overhead di tradurre i dati avanti e indietro tra due runtime environment completamente diversi per ogni singola riga. Grazie per aver passato qualche minuto con me. Alla prossima, stammi bene.
16
Potenziare le UDFs con Apache Arrow
3m 32s
Elimina il collo di bottiglia della serializzazione da JVM a Python. Scopriamo come le Vectorized Pandas UDFs e i formati di memoria di Apache Arrow potenziano le tue trasformazioni personalizzate.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 16 di 21. E se potessi decuplicare la velocità delle tue funzioni Python custom in Spark, semplicemente cambiando un singolo decorator? Le UDF Python standard sono notoriamente lente, ma la soluzione non richiede di riscrivere la tua logica in Scala. Oggi parliamo di come mettere il turbo alle UDF con Apache Arrow.
Quando esegui una UDF Python standard, ti scontri con un enorme muro di prestazioni al confine tra i linguaggi. Spark opera all'interno della Java Virtual Machine, ma la tua logica custom gira in un worker process Python separato. Per scambiare dati tra i due, Spark estrae le righe dalla sua memoria interna, le serializza usando una libreria chiamata cloudpickle e le invia a Python. Python elabora i dati una riga alla volta, serializza il risultato e lo rimanda indietro. Fare questo per milioni di singole righe crea un bottleneck di serializzazione insopportabile.
Apache Arrow cambia le regole di questo scambio di dati. Arrow è un formato dati cross-language, colonnare e in-memory. Standardizza l'aspetto dei dati in memoria, così sia la JVM che Python lo capiscono nativamente senza traduzioni complesse. Invece di serializzare i dati riga per riga, Spark impacchetta i dati in grandi batch colonnari. Tutti i valori per una colonna specifica si trovano uno accanto all'altro in memoria contigua. Spark invia questi grandi blocchi a Python in un unico passaggio efficiente.
Puoi sfruttare questo vantaggio in due modi. Per prima cosa, puoi abilitare l'ottimizzazione Arrow per le UDF standard. Lo fai impostando la proprietà di configurazione di Spark per l'esecuzione di Arrow a true, oppure specificando il parametro useArrow uguale a true quando registri la tua UDF. Spark userà Arrow per trasferire i dati in batch, riducendo drasticamente l'overhead di serializzazione, anche se la tua funzione Python tecnicamente esegue ancora la logica una riga alla volta.
Ecco il punto chiave. Per ottenere il massimo boost di velocità, vuoi che il tuo codice Python elabori quei batch di Arrow simultaneamente. È qui che entrano in gioco le Pandas UDF. Wrappando la tua funzione custom con il decorator pandas UDF, cambi il modo in cui la funzione riceve i dati. Invece di ottenere un singolo valore per una riga, la tua funzione riceve una Pandas Series che contiene un intero batch di valori. La tua funzione applica un'operazione vectorized a quell'intero batch e restituisce una nuova Pandas Series della stessa identica lunghezza.
Pensa a una funzione chiamata calculate tax. Applichi il decorator pandas UDF e dichiari che restituisce un tipo double. La funzione accetta una Pandas Series contenente i prezzi dei prodotti. All'interno della funzione, non scrivi un for-loop. Scrivi semplicemente un return statement che moltiplica la Series di input per uno punto due. Poiché Pandas è basato su codice C altamente ottimizzato sotto il cofano, moltiplica l'intero blocco di prezzi istantaneamente. Spark poi prende quella Series restituita e ne fa il merge senza problemi nel DataFrame usando Arrow.
Il vero potere di una Pandas UDF non è solo che aggira il bottleneck di serializzazione di cloudpickle, ma che sposta il tuo calcolo effettivo da lenti loop Python a un'esecuzione nativa e vectorized.
Grazie per l'ascolto. Statemi bene, tutti.
17
Esplodere le righe con le UDTFs di Python
4m 03s
Le UDFs standard restituiscono un valore per riga, ma cosa succede se hai bisogno di più righe? Impara come le User-Defined Table Functions (UDTFs) di Python risolvono complessi problemi di generazione uno-a-molti.
Ciao, sono Alex di DEV STORIES DOT EU. PySpark Fundamentals, episodio 17 di 21. Le User-Defined Function standard sono strettamente limitate a un mapping uno a uno. Passi un valore in input e ottieni esattamente un valore in output. Ma cosa succede se una singola entry di log densa deve essere espansa in cento righe separate? Per risolvere questo problema, usi le User-Defined Table Function di Python, o UDTF.
Una UDTF fa esattamente ciò che suggerisce il nome. Restituisce un'intera tabella da un singolo input. Mentre una UDF standard calcola un singolo valore scalare, una UDTF può generare più righe e più colonne. Questo è il tool che usi quando devi fare l'explode di una string JSON annidata, fare il parsing di un file di testo delimitato riga per riga, o generare una sequenza di date da un singolo timestamp.
Per creare una UDTF in PySpark, non scrivi una semplice funzione standalone. Definisci invece una classe Python. Questa classe richiede un metodo specifico chiamato eval. Il metodo eval è dove avviene la trasformazione vera e propria. Quando esegui la UDTF, Spark chiama questo metodo per ogni valore di input.
Ecco il punto chiave. All'interno del metodo eval, non usi uno statement return standard. Al suo posto, usi la keyword yield di Python. Ogni volta che il metodo fa yield di un valore, Spark lo traduce in una nuova riga nella tua tabella di output. Se passi una singola string in input, il metodo eval potrebbe iterare su di essa e fare yield dieci volte. Spark prende questi dieci yield e produce dieci righe distinte.
Vediamo un esempio concreto. Crei una classe chiamata ProcessWords. Il tuo obiettivo è passare una frase completa in input e ottenere in cambio una tabella in cui ogni parola ha la sua riga. Scrivi il metodo eval per accettare una string di testo. All'interno del metodo, fai lo split della frase in base agli spazi. Quindi, fai un loop sulle parole risultanti. Per ogni parola, fai yield di una tuple contenente la parola stessa.
Prima che Spark possa usare questa classe, le applichi il decorator UDTF di PySpark. Il decorator è obbligatorio perché definisce il tuo schema di output. Dichiari esplicitamente i nomi delle colonne e i data type che la tua funzione genera. Se fai yield di una string, dici al decorator che l'output è una colonna string. Se vuoi fare yield della parola e del suo conteggio di caratteri, fai yield di una tuple di due elementi, e il tuo decorator specifica uno schema con una colonna string e una colonna integer.
Oltre al metodo eval, una classe UDTF può includere anche un metodo terminate opzionale. Spark chiama il metodo terminate esattamente una volta per ogni partizione di dati, dopo che tutte le righe di input sono state elaborate dal metodo eval. Questo è molto utile per l'aggregazione. Se il tuo metodo eval tiene traccia di un contatore interno su più righe di input, il metodo terminate può fare yield di una riga finale contenente quel conteggio totale prima che la partizione si chiuda.
Quando chiami una UDTF in un'operazione su un DataFrame, si comporta come una tabella inline. Se passi una colonna esistente del DataFrame alla UDTF, Spark applica la table function riga per riga. Poiché una table function produce in output più righe per ogni singola riga di input, combinare questo output con il tuo dataset originale richiede un lateral join implicito. Spark gestisce tutto questo dietro le quinte, duplicando i dati della riga originale per farli corrispondere alle nuove righe esplose generate dalla tua classe Python.
Il vero potere di una UDTF Python è svincolare completamente il tuo volume di input dal tuo volume di output, permettendo a un singolo data point di sbocciare in un intero dataset multi-colonna.
Questo è tutto per questo episodio. Grazie per aver ascoltato, e continua a sviluppare!
18
La Pandas API su Spark
4m 06s
Scala i tuoi script Pandas esistenti all'infinito. Scopri come l'API pyspark.pandas ti permette di eseguire la sintassi standard di Pandas in modo nativo su un cluster Spark distribuito.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 18 di 21. Hai uno script locale per i dati che funziona perfettamente, ma all'improvviso la dimensione del tuo dataset quadruplica e la tua macchina esaurisce la memoria. Conosci la sintassi alla perfezione, ma riscrivere tutto per un framework distribuito richiede giorni. La pandas API su Spark colma esattamente questa lacuna.
La pandas API su Spark ti permette di eseguire workload pandas standard su un cluster distribuito. Non si limita a emulare ciecamente pandas. Intercetta il tuo codice pandas e lo traduce in execution plan ottimizzati di Spark dietro le quinte. Per usarla, importi il modulo chiamato pyspark punto pandas. La convenzione standard è assegnargli l'alias ps, rispecchiando direttamente il familiare alias pd usato nei workload locali di data science.
Se hai già un DataFrame pandas standard in memoria, la transizione è semplice. Invochi una funzione chiamata from pandas sul tuo modulo ps e gli passi il tuo DataFrame locale. Questo converte l'oggetto single-node in un DataFrame pandas-on-Spark distribuito. Da quel momento in poi, la sintassi che usi per interagire con questo nuovo oggetto rimane identica a quella che già conosci.
Questa coerenza si estende a come i dati vengono processati internamente. L'API distribuita gestisce nativamente i dati mancanti esattamente come fa il pandas locale. Se il tuo dataset contiene valori Not-a-Number di NumPy, la pandas API su Spark li gestisce correttamente durante le operazioni matematiche o le trasformazioni strutturali. Non devi inventare una nuova logica di data cleaning per i tuoi job Spark.
Le operazioni standard si traducono direttamente. Se vuoi raggruppare i tuoi dati per una colonna specifica, chiami la funzione di grouping standard. Se vuoi calcolare la media o la somma, metti in chain la funzione aggregate subito dopo. Puoi persino chiamare le funzioni di plotting direttamente sul DataFrame distribuito. Spark processa le computazioni pesanti su tutto il cluster, aggrega i data point necessari e restituisce la visualizzazione proprio come se stessi lavorando su una singola macchina.
Ecco il punto cruciale. L'architettura sottostante è fondamentalmente diversa, e questo introduce un edge case critico riguardo la generazione dell'indice. Il pandas locale si basa pesantemente su un indice sequenziale e strettamente ordinato per ogni singola riga. Spark, invece, partiziona i dati e li distribuisce su più macchine indipendenti. Forzare un indice sequenziale rigoroso e globalmente ordinato in un sistema distribuito richiede una comunicazione costante tra i worker node.
Quando crei un DataFrame pandas-on-Spark senza definire esplicitamente una colonna indice, l'API genera automaticamente un indice di default per imitare perfettamente il comportamento standard di pandas. Creare e mantenere questo indice di default richiede la sincronizzazione dello stato sull'intero cluster. Se stai operando su un dataset enorme, questa sincronizzazione introduce un grave overhead di performance. L'API spesso emetterà un warning riguardo a questo overhead interno durante l'esecuzione. Per evitare questo bottleneck, ti consiglio vivamente di assegnare subito una colonna esistente come indice o di configurare l'API per usare un tipo di indice distributed-friendly.
La pandas API su Spark ti offre l'esatta sintassi di pandas alimentata dall'execution engine distribuito di Spark, ma ricordare che gli indici sequenziali rigorosi comportano un pesante costo di sincronizzazione salverà il tuo cluster da rallentamenti inutili. Per oggi è tutto. Grazie per aver ascoltato — vai a creare qualcosa di fantastico.
19
Carica e ammira: Formati di archiviazione
3m 46s
Non tutti i formati di file sono creati uguali. Confrontiamo i CSVs basati su righe con formati colonnari come Parquet e ORC, esplorando le opzioni di lettura/scrittura e le tecniche di archiviazione ottimali.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 19 di 21. Salvare un dataset enorme come CSV è la cosa più semplice del mondo, ed è anche una delle cose più distruttive che puoi fare alle performance del tuo data lake. Paghi per più storage, paghi per più compute, e ogni query a valle arranca. La soluzione sta in come gestisci i formati di storage, e nel perché il modo in cui salvi i tuoi dati conta tanto quanto il modo in cui li trasformi.
PySpark utilizza un'interfaccia unificata per leggere e scrivere dati su decine di sistemi di storage. Chiami l'attributo read o write sulla tua Spark session o sul tuo DataFrame, specifichi un formato, fornisci una chain di opzioni e lo punti a un file path. È un pattern prevedibile, ma le opzioni che scegli dettano quanto lavoro dovrà fare il tuo cluster in seguito.
Iniziamo con i formati human-readable, CSV e JSON. Sono formati row-based. Quando leggi un CSV, Spark fa il parsing dei dati riga per riga. Spesso devi mettere in chain opzioni specifiche per dare un senso al testo. Ad esempio, potresti mettere in chain un'opzione per dire a Spark che il file ha un header, un'altra opzione per impostare un delimiter custom come un pipe o un tab, e una terza opzione per definire esattamente che aspetto ha un valore null, magari passando una string specifica in modo che Spark la mappi correttamente a un valore vuoto invece di trattarla come testo letterale. JSON è leggermente migliore perché gestisce nativamente le strutture annidate, ma ripete le chiavi dello schema per ogni singolo record, gonfiando enormemente le dimensioni del file. Entrambi i formati costringono Spark a leggere l'intera riga dal disco, anche se la tua query richiede solo una singola colonna.
È qui che entrano in gioco i formati columnar come Parquet e ORC. Fai attenzione a questa parte. Le query analitiche raramente hanno bisogno di tutte le colonne in una tabella molto larga. Di solito hanno bisogno di colonne specifiche su milioni di righe per lanciare aggregazioni. Parquet e ORC memorizzano i dati organizzati per colonna, non per riga. Se fai una query su tre colonne su cento, Spark legge solo i chunk del file che contengono quelle tre colonne. Salta il resto completamente, riducendo l'input e l'output su disco a una frazione di quello che richiede un CSV. Dato che i dati dello stesso tipo vengono archiviati insieme, i formati columnar si comprimono anche a meraviglia. Una directory di file JSON potrebbe ridursi del settanta percento o più quando viene convertita in Parquet. Inoltre, incorporano lo schema esatto e i tipi di dato nei metadata del file, il che significa che Spark non deve indovinare o inferire i tipi al caricamento.
Quando sei pronto a scrivere di nuovo questi dati, devi gestire lo stato alla destinazione. Di default, se provi a scrivere in un path dove i dati esistono già, Spark lancia un errore per evitare la perdita accidentale di dati. Puoi controllare questo comportamento usando il metodo mode prima di triggerare il salvataggio. Se passi la string overwrite, Spark elimina i dati esistenti nel path di destinazione e li sostituisce con il tuo DataFrame corrente. Se passi append, Spark aggiunge semplicemente i tuoi nuovi part file alla directory esistente. C'è anche un mode ignore, che silenziosamente non fa nulla se la directory è già popolata.
Scrivere dati puliti, tipizzati e columnar oggi fa risparmiare al tuo cluster ore di tempo di processing sprecato domani. Se vuoi aiutare a far continuare questi episodi, puoi supportare lo show cercando DevStoriesEU su Patreon. Grazie per aver passato qualche minuto con me. Alla prossima, stammi bene.
20
Bug Busting: Piani fisici e Joins
3m 15s
Dai un'occhiata sotto il cofano del motore di esecuzione di Spark. Impara a fare il debug delle query usando DataFrame.explain() e a eliminare i costosi shuffle usando i Broadcast joins.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 20 di 21. Il tuo job PySpark non è lento perché sta elaborando i dati. È lento perché passa tutto il tempo a spostare i dati sulla rete. Quando una semplice join mette in ginocchio il tuo cluster, la soluzione si trova nel Bug Busting: Physical Plan e Join.
Quando scrivi uno script PySpark, definisci delle operazioni logiche. Dici a Spark cosa vuoi, non come farlo. Ma quando un job ha prestazioni scarse, devi sapere esattamente come Spark ha eseguito la tua richiesta. Lo fai chiamando il metodo explain sul tuo DataFrame. Chiamare explain stampa il physical plan. Questo è il progetto dei task effettivi che Spark esegue sul tuo cluster. Leggi questo physical plan dal basso verso l'alto, seguendo i dati dai file sorgente fino all'output finale.
Se guardi il physical plan di una join standard tra due DataFrame, probabilmente vedrai uno step chiamato SortMergeJoin. Per eseguire una SortMergeJoin, Spark deve assicurarsi che le righe con le stesse join key si trovino fisicamente sullo stesso executor. Per ottenere questo risultato, Spark esegue un Exchange. Exchange è il termine del physical plan per indicare uno shuffle di rete. Significa che Spark sta estraendo i dati dalle partizioni, li sta spingendo sulla rete e li sta scrivendo su disco in modo che gli altri executor possano leggerli. Lo shuffle è in assoluto l'operazione più costosa nel calcolo distribuito.
Ecco il punto chiave. Se stai facendo una join tra una fact table enorme e una piccola lookup table, fare lo shuffle della tabella grande è un enorme spreco di risorse. Invece di fare lo shuffle di entrambe le tabelle per allineare le chiavi, puoi semplicemente inviare l'intera tabella piccola a ogni executor. Puoi farlo usando la funzione broadcast del modulo PySpark SQL functions. Quando chiami il tuo metodo join, ti basta racchiudere il DataFrame più piccolo nella funzione broadcast.
Racchiudendo la tabella piccola, dai a Spark una direttiva precisa. Spark farà una collect del DataFrame piccolo sul driver node, e poi ne trasmetterà una copia completa nella memoria di ogni singolo executor. Ora, quando il DataFrame grande viene elaborato, gli executor hanno già tutti i dati di lookup di cui hanno bisogno direttamente lì in RAM. Scorrono semplicemente le loro partizioni esistenti e fanno il match delle righe localmente. Non serve alcun ordinamento, e nessun dato della tabella grande si muove sulla rete.
Se chiami explain su questa nuova broadcast join, il physical plan ha un aspetto completamente diverso. La SortMergeJoin è sparita. Il costoso step di Exchange è completamente assente. Al loro posto, vedrai un BroadcastExchange e una BroadcastHashJoin. Il BroadcastExchange sposta la tabella piccola una sola volta, e la join stessa avviene interamente in place. Il modo più semplice per raddoppiare la velocità di un job Spark è smettere di spostare dati che non devono essere spostati. Leggi i tuoi physical plan, individua i network exchange e fai il broadcast delle tue tabelle piccole.
Questo è tutto per oggi. Grazie per aver ascoltato: vai a creare qualcosa di fantastico.
21
Profiling della memoria e delle performance di PySpark
4m 14s
Concludiamo il nostro viaggio in PySpark introducendo strumenti di profiling nativi. Impara a tracciare il consumo di memoria riga per riga e a esporre i traceback interni nascosti di Python.
Ciao, sono Alex di DEV STORIES DOT EU. Fondamenti di PySpark, episodio 21 di 21. Fare debug di codice Python distribuito di solito significa dover spulciare migliaia di righe di errori Java senza senso, cercando di indovinare perché la tua funzione ha fallito o perché ha consumato tutta la memoria sul tuo cluster. Non devi più tirare a indovinare. Oggi vedremo come fare profiling della memoria e delle performance in PySpark, oltre a semplificare gli stack trace.
Quando scrivi una User Defined Function, o UDF, in PySpark, il tuo codice Python gira su un'infrastruttura Java Virtual Machine. Se il tuo codice Python divide per zero o fa riferimento a una chiave mancante in un dictionary, quella semplice exception Python viene inghiottita. Viene passata indietro attraverso il daemon PySpark, viaggia sulla rete, e viene wrappata in enormi exception Java. Trovare il vero errore Python nei tuoi log è un'operazione noiosa.
Puoi risolvere questo problema abilitando i traceback semplificati. Quando imposti la configurazione Spark per i traceback semplificati a true, PySpark cambia il modo in cui riporta gli errori. Rimuove tutti i log di interoperabilità Java e il rumore dei processi worker. La prossima volta che una UDF fallisce, la tua console mostrerà uno stack trace Python standard e pulito, indicando il numero di riga esatto nel tuo file Python in cui si è verificata l'exception.
Risolvere i crash è solo metà dell'opera. Sistemare codice lento o che consuma troppa memoria è molto più difficile. Se scrivi una Pandas UDF che processa milioni di righe, potrebbe girare con successo ma impiegare troppo tempo o scatenare errori di out-of-memory sui tuoi nodi executor. Storicamente, trovare il collo di bottiglia richiedeva l'aggiunta di log manuali o di tirare a indovinare quale operazione Pandas fosse inefficiente. Spark 4.0 cambia le cose introducendo dei profiler integrati per le Python UDF.
Ecco la novità principale. Ora puoi fare profiling del tuo codice Python distribuito riga per riga, direttamente all'interno di PySpark. Per usarlo, imposti la configurazione del profiler UDF su una di due modalità: performance o memory.
Se imposti la configurazione del profiler sulla parola "perf", Spark attiva il performance profiler. Dopodiché fai girare il tuo job Spark normalmente. Mentre i nodi worker eseguono la tua Pandas UDF, Spark tiene traccia del tempo di esecuzione di ogni singola riga della tua funzione Python. Una volta terminato il job, chiami il metodo show sull'oggetto profile di Spark. Spark stamperà un report dettagliato sulla tua console. Per ogni riga del tuo codice, vedrai esattamente quante volte è stata chiamata e il tempo totale speso per la sua esecuzione. Puoi vedere all'istante se una specifica manipolazione di string o un'operazione matematica sta rallentando l'intera pipeline.
Se hai a che fare con limiti di memoria, imposti invece la configurazione del profiler UDF sulla parola "memory". Il workflow è esattamente lo stesso, ma l'output cambia. Quando guardi il report del profile, Spark ti mostra l'esatto incremento in megabyte causato da ogni riga del tuo codice Python. Puoi vedere esattamente dove vengono allocati grandi array e dove la memoria non riesce a essere rilasciata.
Questa visibilità riga per riga ti evita di dover tirare a indovinare quando ottimizzi complesse trasformazioni di dati. Puoi individuare la causa esatta dei tuoi problemi di performance senza uscire dal tuo ambiente PySpark.
Dato che questo è l'ultimo episodio della nostra serie su PySpark, ti incoraggio a dare un'occhiata alla documentazione ufficiale di Spark e a provare questi tool di debug in prima persona. Se hai idee su quali tecnologie dovremmo trattare nella nostra prossima serie, fai un salto su devstories.eu e faccelo sapere. Grazie per aver passato qualche minuto con me. Alla prossima, stammi bene.
Tap to start playing
Browsers block autoplay
Share this episode
Episode
—
Copy this episode in another language:
Questo sito non utilizza cookie. Il nostro fornitore di hosting potrebbe registrare il tuo indirizzo IP a fini statistici. Scopri di più.