Înapoi la catalog
Season 14 21 Episoade 1h 19m 2026

PySpark Fundamentals

v4.1 — Ediția 2026. Un ghid complet pentru PySpark 4.1, care acoperă Spark Connect, DataFrames, tipuri de date complexe, transformări de date, SQL, UDFs și profiling.

Big Data Calcul distribuit Știința datelor
PySpark Fundamentals
Se redă acum
Click play to start
0:00
0:00
1
Problema Big Data și promisiunea PySpark
Stabilim nevoia fundamentală pentru PySpark. Descoperă de ce bibliotecile Python standard precum Pandas eșuează la scară largă și cum PySpark oferă un motor de execuție distribuit pentru a procesa seturi masive de date fără probleme.
3m 59s
2
Revoluția Spark Connect
Explorează arhitectura Spark Connect. Explicăm modul în care PySpark a decuplat clientul de server, permițându-ți să rulezi aplicații Spark oriunde, fără dependențe JVM voluminoase.
2m 52s
3
DataFrames și Lazy Evaluation
Aprofundează abstracția fundamentală a PySpark: DataFrame. Discutăm despre conceptul de lazy evaluation, diferența dintre transformări și acțiuni, și de ce Spark își face un plan înainte de a executa.
4m 07s
4
Crearea și vizualizarea DataFrames
Învață cum să instanțiezi DataFrames din obiecte Python brute, dicționare și fișiere, și cum să inspectezi în siguranță datele distribuite fără a bloca nodul driver.
3m 41s
5
Stăpânirea tipurilor de date de bază
Un tur al tipurilor fundamentale numerice și de șiruri de caractere din PySpark. Explorăm cum să definim explicit schemele folosind StructType și StructField pentru conducte de date (data pipelines) robuste.
4m 01s
6
Pericolele preciziei
Descoperă diferențele critice dintre FloatType, DoubleType și DecimalType. Învață de ce alegerea unui tip numeric greșit poate introduce erori de rotunjire dezastruoase în datele financiare.
4m 10s
7
Îmblânzirea datelor complexe și imbricate
Datele masive nu sunt întotdeauna plate. Explorăm tipurile de date complexe din PySpark, inclusiv ArrayType, StructType și MapType, care îți permit să analizezi nativ structuri JSON profund imbricate.
4m 08s
8
Conversia tipurilor (Type Casting) și selecția
Învață cum să modelezi activ schemele DataFrame. Acoperim modul de selectare a subseturilor de coloane și cum să convertești în siguranță coloanele de la un tip de date la altul.
3m 29s
9
Intersecția funcțiilor: Curățarea datelor murdare
Garbage in, garbage out. Învață transformările esențiale DataFrame pentru eliminarea valorilor nule, completarea valorilor lipsă și gestionarea înregistrărilor NaN în mod nativ în sistemele distribuite.
3m 42s
10
Transformarea și remodelarea datelor
Preia controlul asupra formei datelor tale. Explorăm cum să generezi coloane noi cu funcții matematice, să efectuezi manipulări de șiruri de caractere și să aplatizezi array-uri imbricate folosind explode.
4m 28s
11
Mecanica grupării și agregării
Stăpânește strategia split-apply-combine. Aprofundăm gruparea datelor după chei și aplicarea unor funcții puternice de agregare pentru a rezuma seturi masive de date.
3m 44s
12
Când DataFrames se ciocnesc: Arta asocierii (Joining)
Navigăm prin nuanțele combinării seturilor de date. Detaliem cele șapte tipuri diferite de join din PySpark și explicăm cum să îmbini DataFrames în siguranță.
3m 55s
13
SQL vechi, trucuri noi
De ce să înveți un API nou când poți folosi SQL brut? Învață cum să execuți interogări SQL standard direct pe PySpark DataFrames distribuite.
3m 07s
14
Interschimbarea DataFrames și SQL
Combină SQL cu Python fără probleme. Descoperă cum să creezi vizualizări temporare (temporary views) din DataFrames, să folosești selectExpr și să înlănțui operațiuni programatice pe rezultatele interogărilor SQL.
3m 44s
15
Extinderea Spark cu Python UDFs
Când funcțiile încorporate nu sunt suficiente, intervin User-Defined Functions. Explorăm cum să scrii o logică Python personalizată pentru DataFrames și de ce UDFs scalare standard ascund o penalizare de performanță.
3m 51s
16
Accelerarea UDFs cu Apache Arrow
Elimină blocajul de serializare JVM-to-Python. Descoperim cum Vectorized Pandas UDFs și formatele de memorie Apache Arrow îți accelerează masiv transformările personalizate.
3m 59s
17
Explodarea rândurilor cu Python UDTFs
UDFs standard returnează o valoare pe rând, dar ce faci dacă ai nevoie de mai multe rânduri? Învață cum Python User-Defined Table Functions (UDTFs) rezolvă probleme complexe de generare one-to-many.
4m 25s
18
API-ul Pandas pe Spark
Scalează-ți scripturile Pandas existente la infinit. Descoperă cum API-ul pyspark.pandas îți permite să execuți sintaxa Pandas standard în mod nativ pe un cluster Spark distribuit.
3m 42s
19
Încarcă și privește: Formate de stocare
Nu toate formatele de fișiere sunt create la fel. Contrastăm fișierele CSV bazate pe rânduri cu formatele columare precum Parquet și ORC, explorând opțiunile de citire/scriere și tehnicile optime de stocare.
3m 58s
20
Vânătoarea de bug-uri: Planuri fizice și Joins
Aruncă o privire sub capota motorului de execuție Spark. Învață cum să depanezi interogările folosind DataFrame.explain() și cum să elimini amestecările (shuffles) costisitoare folosind Broadcast joins.
3m 05s
21
Profiling pentru memoria și performanța PySpark
Încheiem călătoria noastră PySpark prin introducerea instrumentelor native de profiling. Învață cum să urmărești consumul de memorie linie cu linie și să expui traceback-urile interne Python ascunse.
3m 41s

Episoade

1

Problema Big Data și promisiunea PySpark

3m 59s

Stabilim nevoia fundamentală pentru PySpark. Descoperă de ce bibliotecile Python standard precum Pandas eșuează la scară largă și cum PySpark oferă un motor de execuție distribuit pentru a procesa seturi masive de date fără probleme.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 1 din 21. Scriptul tău Python standard rulează perfect în teste, dar în momentul în care dataset-ul atinge cincizeci de gigabytes, crapă cu o eroare OutOfMemory. Ai atins limitele fizice ale unei singure mașini. Soluția pentru acest bottleneck este subiectul principal al acestui episod: problema big data și promisiunea PySpark. Tool-urile standard de date din Python sunt construite pentru execuție single-node. Librării precum pandas sunt incredibil de eficiente, dar cer ca întregul dataset să stea în memoria locală. Dacă serverul tău are șaisprezece gigabytes de RAM și încerci să încarci cincizeci de gigabytes de application logs, sistemul de operare intervine și omoară procesul. Scalarea verticală prin închirierea unui server mai mare și mai scump nu face decât să întârzie inevitabilul. Datele cresc mai repede decât upgrade-urile hardware. În cele din urmă, datele depășesc capacitatea mașinii. PySpark rezolvă această limitare. Este API-ul Python pentru Apache Spark. Apache Spark în sine este un motor de calcul distribuit care rulează pe Java Virtual Machine. PySpark acționează ca o punte, permițându-ți să îți scrii logica exclusiv în Python, profitând în același timp de motorul distribuit extrem de optimizat al lui Spark. Asta îți schimbă arhitectura de la scalare verticală la scalare orizontală. În loc să te bazezi pe o singură mașină masivă, PySpark îți partiționează datele și distribuie calculele pe un cluster de mai multe mașini mai mici, cunoscute sub numele de noduri. Tu scrii codul Python, iar PySpark îl traduce într-un plan de execuție paralelă. Dacă volumul de date se dublează luna viitoare, nu trebuie să rescrii nicio linie de cod. Pur și simplu adaugi mai multe noduri la cluster. Ecosistemul PySpark este organizat în câteva module de bază concepute pentru diferite workload-uri. Primul este Spark SQL. Aceasta este fundația pentru majoritatea aplicațiilor PySpark moderne. Oferă o structură DataFrame pentru gestionarea datelor tabelare răspândite pe mai multe mașini. De asemenea, îți permite să rulezi query-uri SQL standard direct pe aceste dataset-uri distribuite. Următorul este Structured Streaming. Acest modul gestionează data pipelines în timp real. În loc să proceseze un batch masiv de date peste noapte, Structured Streaming procesează continuu fluxuri de înregistrări, cum ar fi citirile senzorilor live sau evenimentele de trafic web. Folosește exact același model de programare ca Spark SQL, ceea ce înseamnă că logica ta de batch processing și logica de streaming arată aproape identic. Apoi, există MLlib, Machine Learning Library. Antrenarea modelelor pe dataset-uri masive pe o singură mașină este un bottleneck notoriu. MLlib oferă algoritmi de machine learning distribuit pentru task-uri precum clasificarea, regresia și clustering-ul. Distribuie operațiile matematice complexe pe întregul cluster, reducând drastic timpul de training. Iată ideea cheie. Adevărata putere a PySpark este abstractizarea. Nu trebuie niciodată să îți împarți manual fișierele masive în chunk-uri. Nu trebuie niciodată să scrii networking code pentru a coordona serverele. Pur și simplu definești o secvență logică de transformări, iar motorul din spate se ocupă de distribuția datelor, de execuția paralelă și chiar de procesul de recovery dacă un nod rămâne fără curent în mijlocul calculului. PySpark nu este doar un utilitar pentru deschiderea fișierelor mai mari. Este o trecere fundamentală de la computing-ul constrâns de o singură placă de bază la computing-ul constrâns doar de dimensiunea clusterului tău. Dacă găsești aceste episoade utile și vrei să susții emisiunea, poți căuta DevStoriesEU pe Patreon. Asta e tot pentru acest episod. Mulțumesc că m-ai ascultat și continuă să construiești!
2

Revoluția Spark Connect

2m 52s

Explorează arhitectura Spark Connect. Explicăm modul în care PySpark a decuplat clientul de server, permițându-ți să rulezi aplicații Spark oriunde, fără dependențe JVM voluminoase.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 2 din 21. Ani de zile, să scrii cod PySpark local însemna să tragi după tine un Java Virtual Machine masiv și greoi doar pentru a testa un simplu script. Trebuia să sincronizezi perfect versiunile de Python, configurațiile de Java și dependențele de cluster înainte să scrii o singură linie de logică. Revoluția Spark Connect face ca toate astea să devină complet depășite. În mod tradițional, PySpark se baza pe o arhitectură tightly coupled. Scriptul tău Python și execution engine-ul Spark trebuiau să coexiste pe exact aceeași mașină fizică sau virtuală. Lansarea unei sesiuni PySpark însemna pornirea unui Java Virtual Machine în background, folosind o librărie bridge. Această arhitectură îți îngreuna mediul local de development cu toată greutatea execution engine-ului Spark. Făcea ca integrarea PySpark în aplicații web, editoare de cod moderne sau device-uri edge să fie extrem de nepractică. Spark Connect rezolvă asta prin introducerea unei arhitecturi client-server decoupled. Environment-ul tău Python este acum strict separat de serverul Spark. Clientul local PySpark devine o librărie lightweight. Nu mai necesită o instalare locală de Java și nu execută el însuși task-uri de procesare a datelor. Acționează pur și simplu ca o interfață remote către clusterul Spark propriu-zis. Aici e ideea cheie. Când scrii operațiuni pe DataFrame cu Spark Connect, clientul lightweight îți înregistrează method call-urile și le traduce într-un unresolved logical plan. Poți să-ți imaginezi acest plan ca pe un blueprint abstract al query-ului tău, care descrie strict ce date să procesezi, fără să-ți faci griji despre cum sunt procesate. Clientul împachetează acest blueprint folosind Protocol Buffers și îl transmite printr-o conexiune de rețea gRPC către serverul Spark remote. Serverul despachetează planul, se ocupă de toată optimizarea complexă a query-ului, execută job-ul pe tot clusterul și, în final, face stream la rezultatele calculate înapoi către scriptul tău Python. Să setezi asta necesită o modificare minoră a modului în care îți pornești aplicația. Încă folosești builder-ul SparkSession, dar în loc să te bazezi pe configurații locale, apelezi metoda remote. Oferi un connection string care detaliază unde se află serverul Spark. Acest string folosește o schemă de conexiune dedicată care începe cu literele s c. Deci, dacă te conectezi la un server de test local pe portul default, oferi string-ul s c două puncte slash slash localhost două puncte unu cinci zero zero doi. După acest singur pas de conectare, îți scrii codul de DataFrame exact la fel cum ai făcut-o mereu. Pentru că execuția este complet remote, poți conecta simultan mai mulți clienți Python diferiți, din aplicații diferite, la exact același server Spark. Codul aplicației tale cere pur și simplu transformări de date, iar greul rămâne în întregime pe partea de server. Prin izolarea completă a clientului Python de execution runtime, Spark Connect elimină faimoasele conflicte de dependențe care obișnuiau să strice deploy-urile, permițându-ți să îți actualizezi environment-urile aplicației complet independent de clusterul Spark în sine. Mersi că ai petrecut câteva minute cu mine. Până data viitoare, numai bine.
3

DataFrames și Lazy Evaluation

4m 07s

Aprofundează abstracția fundamentală a PySpark: DataFrame. Discutăm despre conceptul de lazy evaluation, diferența dintre transformări și acțiuni, și de ce Spark își face un plan înainte de a executa.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 3 din 21. Cum ar fi dacă codul tău nu ar rula de fapt atunci când îl tastezi, ci ar aștepta, ți-ar analiza obiectivul final și ar trasa cea mai rapidă rută posibilă? Faci chain la filtre, agregări și join-uri, dar mașina ta abia dacă depune efort. Asta pentru că nu face absolut nimic până nu o forțezi. Acest mecanism se numește lazy evaluation și este motorul principal din spatele PySpark DataFrames. Un PySpark DataFrame este o colecție distribuită de date, organizată în coloane cu nume. Dacă ești familiarizat cu pandas, conceptul ți se va părea identic. Diferența este că un PySpark DataFrame își împarte datele pe mai multe compute nodes dintr-un cluster. Din punct de vedere istoric, structura fundamentală din Spark era Resilient Distributed Dataset, cunoscută în mod obișnuit sub numele de RDD. Ecosistemul s-a îndepărtat masiv de manipularea raw de RDD-uri. De fapt, începând cu versiunea 4.0 de Spark, utilizarea directă a RDD-urilor nu mai este suportată pe Spark Connect. DataFrame-urile sunt acum standardul definitiv, oferind un API strict care îi permite lui Spark să îți optimizeze automat query-urile. Această optimizare se bazează în întregime pe lazy evaluation. Fiecare operațiune pe care o efectuezi pe un DataFrame se încadrează într-una din două categorii stricte: un transformation sau un action. Transformation-urile sunt comenzi care returnează un nou DataFrame. Exemplele includ selectarea anumitor coloane, filtrarea rândurilor pe baza unei condiții, gruparea înregistrărilor sau un join între două tabele separate. Când aplici un transformation, PySpark nu execută procesarea datelor. Pur și simplu înregistrează operațiunea. Actualizează o schiță internă numită logical execution plan. Poți scrie cincizeci de transformation-uri consecutive, iar Spark doar va valida rapid sintaxa și își va actualiza graful. Aici este ideea de bază. Prin amânarea execuției propriu-zise, PySpark îi oferă motorului său de query-uri, Catalyst Optimizer, imaginea completă a data pipeline-ului tău. Optimizer-ul inspectează întregul tău chain de transformation-uri, le rearanjează pentru o eficiență maximă și elimină complet pașii inutili înainte ca un singur byte de date să fie citit de pe disk. Această schiță rămâne complet inactivă până când invoci un action. Un action este o comandă care cere un rezultat concret. Fie returnează date către driver program-ul tău, fie scrie datele în storage. Action-urile comune includ numărarea totalului de rânduri, colectarea datelor înapoi într-o listă Python locală sau comanda dată sistemului de a afișa primele douăzeci de înregistrări pe ecran. În momentul în care declanșezi un action, motorul se pune în mișcare. Acesta îți traduce logical plan-ul optimizat într-un physical plan, distribuie task-urile către workerii din cluster și rulează calculul. Ia în considerare un data workflow standard. Mai întâi, creezi un DataFrame care pointează către un fișier masiv. Apoi, îi faci join cu un tabel separat cu detalii despre useri. După join, filtrezi rezultatele pentru a include doar userii dintr-un anumit oraș. În cele din urmă, îi ceri lui Spark să afișeze output-ul. Datorită lazy evaluation, Spark nu încarcă de fapt întregul fișier, nu face un distributed join masiv ca apoi să filtreze rezultatele la final. În schimb, optimizer-ul se uită la request-ul tău final, observă filtrul și împinge acea operațiune de filtrare mai sus pe chain, cu mult înainte ca join-ul să aibă loc. Citește selectiv doar înregistrările relevante, reducând drastic utilizarea memoriei și traficul de rețea din cluster. Scriptul tău PySpark nu este niciodată o secvență de comenzi imediate. Este un set de instrucțiuni care desenează un plan arhitectural, iar sistemul începe construcția doar atunci când tu ceri în sfârșit rezultatul final. Asta e tot pentru astăzi. Mulțumesc pentru ascultare — du-te și construiește ceva cool.
4

Crearea și vizualizarea DataFrames

3m 41s

Învață cum să instanțiezi DataFrames din obiecte Python brute, dicționare și fișiere, și cum să inspectezi în siguranță datele distribuite fără a bloca nodul driver.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 4 din 21. Apelarea unei anumite metode pe un dataset masiv este o modalitate garantată de a da crash instantaneu întregii aplicații cu o eroare out-of-memory. Să știi cum să muți în siguranță datele în și din Spark fără să îți distrugi driver node-ul este esențial. Exact asta acoperă acest episod: crearea și vizualizarea DataFrame-urilor. Fiecare aplicație PySpark are nevoie de date cu care să lucreze. În general, creezi DataFrame-uri în trei moduri. În primul rând, le poți crea direct din structuri Python in-memory. Pur și simplu definești o listă de dicționare, unde fiecare dicționar reprezintă un rând, iar cheile sunt nume de coloane, și o pasezi metodei createDataFrame de pe SparkSession-ul tău. În al doilea rând, dacă ai deja un DataFrame pandas in-memory, poți pasa exact acel obiect pandas aceleiași metode createDataFrame. PySpark gestionează automat conversia. A treia și cea mai comună metodă este citirea din fișiere externe. Folosești atributul read de pe SparkSession-ul tău, urmat de formatul dorit, cum ar fi csv sau json, și furnizezi calea fișierului. După ce datele sunt încărcate, trebuie să le verifici. DataFrame-urile PySpark sunt distribuite, ceea ce înseamnă că nu poți pur și simplu să dai print la variabilă și să vezi datele așa cum ai face într-un script Python standard. Pentru a vedea structura datelor, apelezi metoda printSchema. Aceasta afișează un tree în format text care arată fiecare nume de coloană și tipul de date corespunzător. Este cea mai rapidă modalitate de a verifica dacă fișierul s-a încărcat corect. Pentru a vizualiza conținutul real, folosești metoda show. By default, apelarea metodei show afișează primele douăzeci de rânduri într-un format tabelar. Fii atent la partea asta. Dacă coloanele tale conțin string-uri lungi, metoda show le trunchiază. Poți dezactiva asta pasând un argument truncate setat pe false, sau setându-l la un anumit număr de caractere. Dacă DataFrame-ul tău are zeci de coloane, vizualizarea standard de tabel face wrap pe ecran și devine ilizibilă. În acest caz, poți pasa argumentul vertical setat pe true. Asta afișează fiecare rând ca un bloc vertical de perechi key-value, făcând dataset-urile late mult mai ușor de citit într-un terminal. Acum, ajungem la crash-ul out-of-memory menționat mai devreme. Uneori, trebuie să readuci datele distribuite în obiecte Python obișnuite. Metoda pentru a face acest lucru se numește collect. Iată ideea cheie. Metoda collect preia absolut fiecare rând de la fiecare executor din întregul tău cluster și îl forțează în memoria singurului tău driver node. Dacă DataFrame-ul tău conține un miliard de rânduri, driver-ul tău va rămâne fără memorie și va da crash instantaneu. Ar trebui să apelezi collect doar atunci când ai agregat sau ai filtrat datele la o dimensiune mică. Când lucrezi cu dataset-uri mari, extrage întotdeauna eșantioane mai mici. În loc de collect, folosește metoda take, pasând numărul de rânduri pe care le vrei. Asta returnează o listă Python standard care conține doar acele prime câteva rânduri. Dacă trebuie să verifici sfârșitul dataset-ului tău, folosește metoda tail pentru a prelua ultimele câteva rânduri. Ambele metode limitează în siguranță cantitatea de date transferate către driver. Regula pentru datele distribuite este simplă: trimite calculele către cluster, dar limitează strict numărul de rânduri pe care le tragi înapoi către driver. Asta e tot pentru acest episod. Mulțumesc că m-ai ascultat și continuă să construiești!
5

Stăpânirea tipurilor de date de bază

4m 01s

Un tur al tipurilor fundamentale numerice și de șiruri de caractere din PySpark. Explorăm cum să definim explicit schemele folosind StructType și StructField pentru conducte de date (data pipelines) robuste.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 5 din 21. Să te bazezi pe schema inference automată îți poate salva câteva linii de cod, dar te va costa scump la performanța în producție. Clusterul trebuie adesea să citească tot dataset-ul tău doar pentru a ghici ce se află înăuntru înainte de a face orice treabă efectivă. Rezolvi asta stăpânind data types de bază și schemele explicite. Este destul de comun să confunzi tipurile standard din Python cu data types din PySpark. Când declari un integer sau un string în Python standard, acel obiect trăiește în memoria mașinii tale locale. Tipurile din PySpark operează la un nivel complet diferit. Ele sunt instrucțiuni de mapping pentru optimizatorul Catalyst și pentru Java Virtual Machine din spate. Când folosești data types din PySpark, definești o structură strictă, cluster-aware. Asta garantează consistența datelor pe sute de worker nodes distribuite și dictează exact cum sunt serializate datele prin rețea. PySpark oferă un tip specific pentru fiecare formă standard de date, iar selectarea celui corect este crucială pentru performanță. Pentru numere, ai ByteType pentru integers foarte mici, IntegerType pentru numere standard și LongType pentru valori mari. Să alegi ByteType în loc de LongType pentru un simplu status code salvează memorie semnificativă atunci când acea alegere este multiplicată pe miliarde de rânduri. Pentru text și logică, folosești StringType și BooleanType. Gestionarea corectă a timpului este o altă zonă în care un typing exact contează. PySpark împarte datele temporale în DateType și TimestampType. Folosești DateType atunci când te interesează doar data calendaristică, cum ar fi ziua de naștere a unui user. Folosești TimestampType atunci când ai nevoie de puncte exacte în timp, urmărind atât data, cât și ora, minutul și secunda exactă în care a avut loc un eveniment. Să știi aceste tipuri este doar fundația. Trebuie să le aplici direct în procesul tău de data ingestion folosind o schemă explicită. Construiești această schemă folosind două obiecte specifice: StructType și StructField. Te poți gândi la un StructType ca la un blueprint pentru un rând întreg din dataframe-ul tău. Un StructField este un blueprint pentru o singură coloană din acel rând. Pentru a construi o schemă explicită, instanțiezi un StructType și îi oferi o colecție de StructFields. Fiecare StructField necesită trei argumente specifice. În primul rând, oferi numele coloanei ca un string standard. În al doilea rând, pasezi acel data type specific din PySpark pe care vrei să-l impui, cum ar fi IntegerType sau StringType. În al treilea rând, oferi un boolean flag care indică dacă această coloană are voie să conțină valori null. De exemplu, construiești o schemă începând cu un StructField numit user identifier, atribuit unui StringType, și setezi acel null flag pe false. Continui cu un StructField numit account age, atribuit unui IntegerType, setând acel null flag pe true. Odată ce acest obiect StructType este complet asamblat, îl pasezi direct către dataframe reader folosind metoda schema înainte să apelezi comanda load pentru a citi fișierele. Asta e partea care contează. Când oferi această schemă explicită de la început, PySpark sare complet peste faza de data scanning. Aplică blueprint-ul tău direct pe data stream-ul care intră. Asta reduce drastic timpul necesar pentru a citi un fișier. De asemenea, acționează ca un quality gate imediat. Dacă un fișier malformat ajunge cu text în coloana ta de integers, pipeline-ul îl gestionează pe baza structurii definite de tine, în loc să mute silențios schema inferată downstream și să-ți strice transformările. Să-ți definești schema explicit transformă o operațiune de citire fragilă și costisitoare într-un pas de pipeline predictibil și extrem de optimizat. Mulțumesc pentru audiție, happy coding tuturor!
6

Pericolele preciziei

4m 10s

Descoperă diferențele critice dintre FloatType, DoubleType și DecimalType. Învață de ce alegerea unui tip numeric greșit poate introduce erori de rotunjire dezastruoase în datele financiare.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 6 din 21. Folosirea unui float standard poate părea inofensivă, până când query-ul tău de agregare calculează greșit, în tăcere, milioane de tranzacții financiare. Codul care rulează perfect poate produce numere care sunt puțin, dar periculos de greșite. Tocmai de aceea trebuie să vorbim despre pericolele preciziei. În PySpark, ai trei modalități principale de a stoca numere cu părți fracționare. Ai FloatType, DoubleType și DecimalType. Nu sunt interschimbabile. O greșeală frecventă este să lași PySpark să deducă o schema din datele tale brute. Inferența atribuie de obicei DoubleType oricărui număr cu zecimale. Dacă calculezi venituri financiare, să te bazezi pe acest comportament default reprezintă un risc operațional serios. Pentru a înțelege de ce, trebuie să ne uităm la modul în care funcționează FloatType și DoubleType în spate. FloatType folosește matematică floating-point IEEE 754 pe 32 de biți. DoubleType folosește versiunea pe 64 de biți a aceluiași standard. Ambele reprezintă numere ca fracții binare. Gândește-te cum fracția o treime nu poate fi scrisă perfect folosind zecimale în baza zece. Devine un string nesfârșit de treiari. Exact aceeași limitare există și în sistemul binar. Numerele zecimale comune, cum ar fi zero virgulă unu sau zero virgulă doi, nu pot fi reprezentate perfect în baza doi. Calculatorul stochează o aproximare minusculă. Cu DoubleType, obții 64 de biți de spațiu, ceea ce înseamnă că aproximarea este incredibil de aproape de numărul real. Dacă faci un query pe un singur rând de date, rareori vei observa diferența. Iată ideea cheie. Eroarea se acumulează în timpul agregărilor. Când calculezi veniturile financiare totale prin însumarea a miliarde de rânduri individuale, aceste inexactități microscopice se adună. O fracțiune de cent pierdută sau câștigată la fiecare tranzacție denaturează în cele din urmă totalul agregat final cu mii sau chiar milioane de dolari. Logica ta de agregare este solidă din punct de vedere matematic, dar tipul de date subiacent corupe rezultatul. Dacă sistemul tău calculează simulări fizice sau antrenează modele de machine learning, FloatType și DoubleType sunt exact ceea ce îți dorești. Ele sacrifică exactitatea pentru o procesare hardware de mare viteză. Dar în momentul în care gestionezi bani, ai nevoie de o precizie strictă și neclintită. Acest lucru ne aduce la DecimalType. DecimalType nu folosește aproximări floating-point. Stochează numerele exact așa cum le definești, folosind o scală fixă. Când configurezi un DecimalType, definești doi parametri distincți. În primul rând, specifici precizia, care este numărul total maxim de cifre pe care le poate conține valoarea. În al doilea rând, specifici scala, care dictează numărul exact de cifre permise la dreapta punctului zecimal. Dacă configurezi un DecimalType cu o precizie de zece și o scală de doi, PySpark alocă spațiul exact necesar pentru a stoca acea valoare până la ultimul bănuț. Nu există fracții binare și nici aproximări de rotunjire. În practică, implementezi acest lucru preluând controlul strict asupra schemelor tale. Când citești înregistrări financiare dintr-un fișier sursă, nu lăsa PySpark să ghicească tipurile. Mai întâi, creezi un obiect schema strict. Apoi, definești câmpurile tale financiare, cum ar fi veniturile sau taxele. În cele din urmă, le atribui explicit un DecimalType cu precizia și scala alese. Odată ce dataframe-ul tău se încarcă cu această schema, agregările tale standard de sumă sau medie se vor executa perfect de la primul rând până la al miliardulea. Sacrifici o cantitate mică de performanță de compute în comparație cu un DoubleType standard, dar garantezi că raportarea ta financiară este absolut impecabilă. Regula este simplă: folosește tipuri floating-point pentru viteză și aproximări științifice, dar în momentul în care un număr reprezintă o monedă, blochează-l cu un DecimalType. Îți mulțumesc că ne-ai ascultat. Pe data viitoare!
7

Îmblânzirea datelor complexe și imbricate

4m 08s

Datele masive nu sunt întotdeauna plate. Explorăm tipurile de date complexe din PySpark, inclusiv ArrayType, StructType și MapType, care îți permit să analizezi nativ structuri JSON profund imbricate.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 7 din 21. În lumea reală, big data este rareori un spreadsheet plat. Uneori, ai nevoie de un array de dictionaries nested doar pentru a face parse la un singur event JSON. Pentru a gestiona asta, trebuie să vorbim despre îmblânzirea datelor complexe și nested. Workflow-urile relaționale preferă tabelele plate, dar datele moderne din event-uri vin puternic nested. PySpark gestionează asta oferind trei tipuri de date complexe. Acestea sunt ArrayType, StructType și MapType. Astea îți permit să modelezi explicit structurile ierarhice, nativ, direct în engine. Ia un profil standard de client ca să vezi cum funcționează aceste tipuri. Primul concept este ArrayType. Acesta reprezintă o colecție de elemente. Regula strictă este că fiecare item din interiorul unui ArrayType trebuie să aibă exact același data type la bază. Nu poți amesteca string-uri și integers în același array. Dacă profilul tău de client include o listă de ID-uri de comenzi recente, definești acea coloană ca un ArrayType care conține integers. Următorul este StructType. Un StructType modelează un record ierarhic nested, funcționând în esență ca un rând încorporat în alt rând. Conține field-uri specifice, cu nume. Spre deosebire de un array, fiecare field din interiorul unui StructType poate avea un data type complet diferit. Să presupunem că acel client are o adresă. Acea adresă conține un nume de stradă ca string, un cod poștal ca integer și un flag boolean care indică dacă este o proprietate comercială. Grupezi aceste field-uri distincte împreună într-un singur StructType. Aici e ideea cheie. Poți face nest la aceste tipuri complexe oricât de adânc. Dacă un client are mai multe adrese, nu creezi coloane plate, numerotate. În schimb, creezi un ArrayType unde tipul elementului intern este exact acel StructType de adresă. Acum ai un array de struct-uri, care se mapează perfect pe un array JSON standard de objects. A treia structură este MapType, concepută special pentru perechi key-value. Diferă de un StructType prin modul în care gestionează structura versus schema. Un StructType îți cere să faci hardcode la numele exacte ale field-urilor de la bun început. Un MapType este flexibil cu conținutul datelor, dar strict cu data types-urile sale. Fiecare key din map trebuie să fie de un anumit tip, iar fiecare value trebuie să fie de un alt tip specific. Ai putea folosi un MapType pentru a stoca preferințele de aplicație ale clientului. Key-urile ar putea fi string-uri, cum ar fi theme sau language, iar value-urile ar putea fi tot string-uri, cum ar fi dark sau English. Pentru că este un MapType, aplicația upstream poate injecta ulterior key-uri de preferințe complet noi, fără să te oblige să modifici schema de bază a DataFrame-ului. Pur și simplu faci query pe value-uri dinamic, după key-urile lor. Când construiești această schemă complexă în codul tău, o construiești din interior spre exterior. Mai întâi, definești field-urile interne ale StructType-ului de adresă. Apoi, pasezi acel struct completat într-o definiție de ArrayType. Mai departe, definești MapType-ul pentru preferințele utilizatorului. În cele din urmă, faci wrap la toate aceste componente, împreună cu tipuri scalare simple, cum ar fi string-ul pentru numele clientului, într-un singur StructType master care definește rândul general din DataFrame. În loc să aplatizezi structurile nested în string-uri JSON dezordonate, definirea explicită a acestor scheme complexe îi permite optimizer-ului Spark să facă prune la date și să filtreze adânc în field-urile nested, fără să facă deserialize la întregul payload în memorie. Mersi că m-ai ascultat — ne auzim data viitoare.
8

Conversia tipurilor (Type Casting) și selecția

3m 29s

Învață cum să modelezi activ schemele DataFrame. Acoperim modul de selectare a subseturilor de coloane și cum să convertești în siguranță coloanele de la un tip de date la altul.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 8 din 21. O simplă valoare de tip string ascunsă într-o coloană de tip integer poate opri complet un cluster cu o mie de noduri. Ai nevoie de o modalitate fiabilă de a impune structuri de date corecte și de a alege exact ce date se mișcă prin pipeline-ul tău, motiv pentru care astăzi analizăm Type Casting și Selection. Pentru a manipula date în PySpark, trebuie mai întâi să înțelegi ce este de fapt o coloană. O instanță de coloană nu este un array fizic de date încărcat în memorie. Este o reprezentare lazily evaluated a unei expresii. Când faci referire la o coloană în codul tău, nu atingi datele subiacente. Pur și simplu adaugi un pas la logical plan-ul lui Spark. Datele se mișcă doar atunci când un action este declanșat ulterior. Pentru a recupera și modela aceste date, folosești metoda select pe DataFrame-ul tău. Ai două modalități principale de a spune metodei select ce coloane vrei. Cea mai simplă modalitate este să pasezi numele coloanelor ca string-uri de text standard. Dacă pasezi un string către select, Spark returnează un nou DataFrame care conține exact acea coloană, complet neschimbată. Acest lucru funcționează bine pentru extragerea de bază, dar nu oferă loc pentru modificări. Pentru a modifica datele în timpul selecției, trebuie să folosești obiecte Column în loc de string-uri. Accesezi un obiect Column referindu-l direct din DataFrame. Poți face acest lucru folosind dot notation, cum ar fi dataframe dot age, sau folosind bracket notation cu numele coloanei ca string în interiorul parantezelor. Bracket notation este utilă în special atunci când numele coloanelor conțin spații sau caractere speciale care ar strica dot notation-ul standard. Asta e partea care contează. Când pasezi un obiect Column în metoda select, îi poți atașa metode pentru a transforma datele din mers. Una dintre cele mai critice transformări este conversia de tip. Datele ajung adesea în formatul greșit. De exemplu, ai putea primi metrici numerice formatate ca string-uri de text. Pentru a corecta asta, folosești metoda cast. PySpark oferă, de asemenea, un alias numit astype, care execută exact aceeași logică. Apelezi metoda cast direct pe obiectul tău Column în interiorul statement-ului select. Metoda cast necesită un argument, care este tipul de date țintă. Poți defini această țintă pasând o reprezentare de tip string a tipului, cum ar fi cuvântul int, sau pasând un obiect specific de tip de date Spark, cum ar fi IntegerType. Iată cum funcționează asta într-un script real. Apelezi metoda select pe DataFrame-ul tău. În interiorul parantezelor acelei metode, faci referire la coloana țintă folosind bracket notation. Chiar lângă acea referință de coloană, apelezi dot cast și furnizezi noul tip. Când este evaluată, asta returnează un DataFrame complet nou, unde coloana ta selectată este acum convertită în siguranță la tipul specificat. DataFrame-ul original rămâne complet neatins, deoarece DataFrame-urile sunt immutable. Concluzia cheie este că type casting-ul în PySpark nu este un proces independent aplicat in place unui dataset existent. Este o expresie de coloană lazily evaluated, inerent legată de actul de selectare a datelor pentru a construi un DataFrame nou, strongly typed. Dacă îți place podcastul și vrei să susții emisiunea, poți căuta DevStoriesEU pe Patreon. Asta e tot pentru acest episod. Mulțumesc că m-ai ascultat și continuă să construiești!
9

Intersecția funcțiilor: Curățarea datelor murdare

3m 42s

Garbage in, garbage out. Învață transformările esențiale DataFrame pentru eliminarea valorilor nule, completarea valorilor lipsă și gestionarea înregistrărilor NaN în mod nativ în sistemele distribuite.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 9 din 21. Garbage in, garbage out. Dar ce faci când acel dataset plin de gunoaie are sute de terabytes și nu poți inspecta manual nici măcar un singur rând? Ai nevoie de o metodă sistematică pentru a-l sanitiza at scale. Exact asta acoperim astăzi în Function Junction: Cleaning Dirty Data. Primul pas în curățare este, de obicei, standardizarea acelei schema. De multe ori vei primi fișiere raw cu spații, caractere speciale sau typo-uri în headere. Folosește metoda numită with column renamed. Pur și simplu îi pasezi vechiul string și noul string dorit. Dacă ai mai multe coloane de reparat, poți face chain la această metodă secvențial pentru fiecare coloană, înainte să aplici transformări complexe downstream. Înainte să eliminăm datele proaste, trebuie să clarificăm o confuzie frecventă legată de null și NaN în PySpark. Null înseamnă că un data point lipsește cu desăvârșire. NaN vine de la Not a Number, care reprezintă un rezultat matematic nedefinit, cum ar fi împărțirea lui zero la zero. În Python pur, acestea necesită o tratare separată. Totuși, PySpark le grupează pentru comoditate. Când folosești funcțiile N A pe un data frame, Spark evaluează valorile NaN ca null-uri pentru operațiunile de drop sau fill. Pentru a elimina rândurile cu valori lipsă, folosești metoda N A dot drop. Apelarea acestei funcții complet goale va da drop oricărui rând care conține un null sau NaN în absolut orice coloană. Această abordare este extrem de distructivă pe dataset-uri late. O singură valoare lipsă într-o coloană opțională de metadate va șterge un rând de date tranzacționale altfel perfecte. Pentru a preveni asta, pasează o listă de nume de coloane parametrului subset. PySpark va evalua apoi doar acele coloane specifice și critice atunci când decide dacă să dea drop rândului. Să dai drop la rânduri nu este mereu permis de regulile de business. De multe ori, trebuie să înlocuiești valorile lipsă cu default-uri sigure. Realizezi asta folosind N A dot fill. Deși poți pasa o singură valoare pentru a da fill tuturor coloanelor, abordarea superioară este să pasezi un dictionary. Keys din dictionary reprezintă numele specifice ale coloanelor, iar values reprezintă înlocuirile alese. Acest pattern îți permite să dai fill unei metrici numerice lipsă cu un zero, înlocuind simultan o categorie lipsă cu un string de text, cum ar fi unknown. Făcând asta printr-un dictionary, totul se execută într-un singur pass, ceea ce este extrem de eficient. În cele din urmă, datele tale ar putea fi complet populate, dar totuși invalide. Outliers și valorile fizic imposibile necesită o filtrare logică. Izolezi datele bune folosind metoda where pentru a păstra doar rândurile care îndeplinesc o anumită condiție. Pentru limite numerice sau de date calendaristice, metoda between este cel mai bun tool al tău. Îți selectezi coloana, apelezi between și oferi limitele inferioară și superioară. Asta înlocuiește logica verbose de greater-than și less-than, făcând codul tău mai ușor de citit. Orice rând care cade în afara acestor limite este filtrat din data frame-ul rezultat. Iată ideea cheie. Ordinea contează enorm atunci când cureți at scale. Întotdeauna redenumește coloanele mai întâi pentru a-ți bloca schema, dă drop sau fill valorilor lipsă apoi pentru a-ți stabiliza data types, și filtrează outliers la final, doar atunci când știi că datele de bază sunt solide din punct de vedere structural. Asta e tot pentru acest episod. Mersi de audiție și continuă să construiești!
10

Transformarea și remodelarea datelor

4m 28s

Preia controlul asupra formei datelor tale. Explorăm cum să generezi coloane noi cu funcții matematice, să efectuezi manipulări de șiruri de caractere și să aplatizezi array-uri imbricate folosind explode.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. PySpark Fundamentals, episodul 10 din 21. Uneori, un singur rând de date conține un array de înregistrări ascunse - și trebuie să detonezi acel array pentru a-l analiza corect. Transformarea și remodelarea datelor este felul în care despachetezi, formatezi și structurezi acele informații pentru procesarea ulterioară. Când trebuie să modifici un dataframe în PySpark, nu modifici datele in place. Dataframe-urile sunt imuabile. În schimb, creezi versiuni noi folosind o metodă numită withColumn. Această metodă primește două argumente. Primul este un string care reprezintă numele coloanei pe care vrei să o creezi sau să o înlocuiești. Al doilea este o expresie de coloană care definește datele efective. Dacă oferi un nume care există deja în dataframe, PySpark suprascrie coloana originală. Dacă numele este complet nou, PySpark adaugă noua coloană în partea dreaptă a dataset-ului tău. Pentru a defini ce intră în acea nouă coloană, folosești de obicei funcțiile built-in din PySpark. Acestea sunt importate din modulul SQL functions și oferă operațiuni extrem de optimizate care se execută pe întregul tău cluster. Să luăm ca exemplu manipularea de string-uri. Datele text din surse externe sunt rareori formatate perfect. Este posibil să ai o coloană care conține nume de utilizatori scrise într-un mix imprevizibil de litere mari și mici. Poți remedia acest lucru pasând coloana ta existentă către o funcție built-in precum lower, care forțează tot textul în lowercase. Alternativ, poți folosi o funcție de capitalizare pentru a te asigura că prima literă este majusculă, iar restul sunt mici. În practică, integrezi aceste operații direct în transformările dataframe-ului tău. Apelezi withColumn, denumești coloana țintă și îi atribui rezultatul funcției lower aplicate pe coloana ta de input. PySpark evaluează această expresie pentru fiecare rând în parte. Poți înlănțui mai multe apeluri withColumn pentru a aplica secvențial mai multe transformări, pasând dataframe-ul actualizat progresiv la pasul următor de fiecare dată. Acum, a doua parte a acestui proces este reshaping-ul. Curățarea string-urilor modifică valorile, dar ce se întâmplă când forma fundamentală a datelor tale împiedică analiza? Aici devine interesant. Este posibil să primești un dataset în care identificatorul unei persoane se află într-o singură coloană, iar veniturile sale lunare pentru întregul an sunt împachetate într-un singur array în coloana adiacentă. Nu poți rula agregări relaționale standard pe un nested array. Ai nevoie de fiecare valoare individuală a venitului pe propriul rând pentru a calcula mediile sau a găsi minimele. Rezolvi această problemă structurală folosind o funcție built-in numită explode. Funcția explode gestionează în mod specific array-urile și map-urile. Apelezi withColumn, specifici numele coloanei pe care îl vrei pentru output și pasezi funcția explode care înglobează coloana ta de tip array. PySpark execută acest lucru luând singurul rând original și desfăcându-l. Dacă array-ul de venituri conține douăsprezece valori distincte, explode generează douăsprezece rânduri complet separate. În noul dataframe, coloana țintă conține acum o singură valoare flat a venitului pe rând, în loc de o listă. Partea esențială este că PySpark duplică toate celelalte coloane din rândul original. Identificatorul utilizatorului este copiat exact pe toate cele douăsprezece rânduri noi. Relația logică dintre utilizator și venitul său rămâne perfect intactă, dar datele sunt acum flat. Ai remodelat o structură nested într-un tabel lung, gata pentru operațiuni standard de grupare și filtrare. Adevărata putere a transformărilor PySpark constă în faptul că funcții precum explode și lower nu manipulează doar valori individuale; ele definesc un plan de calcul logic care scalează instantaneu, indiferent dacă ai o sută de rânduri sau o sută de miliarde de rânduri, fără a-ți cere vreodată să scrii un singur loop manual. Asta e tot pentru acest episod. Ne auzim data viitoare!
11

Mecanica grupării și agregării

3m 44s

Stăpânește strategia split-apply-combine. Aprofundăm gruparea datelor după chei și aplicarea unor funcții puternice de agregare pentru a rezuma seturi masive de date.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 11 din 21. Când te uiți la miliarde de înregistrări individuale, este imposibil să le citești rând cu rând. Pentru a extrage un sens real, trebuie să le sumarizezi. Astăzi discutăm exact cum se întâmplă asta: Mecanica grupării și agregării. Sub capotă, PySpark procesează agregările folosind o strategie clasică de date numită split-apply-combine. Acest pattern este exact ceea ce pare. Mai întâi, PySpark împarte setul masiv de date în buckets logice distincte, pe baza unei chei alese de tine. Apoi, aplică un calcul specific pe fiecare bucket în mod independent, pe tot clusterul. În cele din urmă, combină aceste răspunsuri independente într-un singur rezultat sumarizat. În codul tău, declanșezi faza de split apelând metoda group by pe DataFrame-ul tău. Pur și simplu oferi numele coloanei pe care vrei să o folosești ca grouping key. De exemplu, dacă ai un tabel masiv de tranzacții istorice, ai putea să dai group by pe coloana cu numele utilizatorului. Iată ideea principală. Apelarea metodei group by nu returnează un nou DataFrame. În schimb, returnează o structură intermediară numită obiect GroupedData. Deoarece PySpark îți evaluează codul lazily, a construit doar planul de execuție pentru organizarea acestor buckets. Nu va muta efectiv nicio dată până când nu îi spui ce operație matematică să efectueze pe acele buckets. Pentru a oferi acea operație matematică, faci chain la metoda aggregate, scrisă de obicei agg, direct pe datele tale grupate. Asta se ocupă de fazele de apply și combine. În interiorul metodei aggregate, îi spui lui PySpark ce să calculeze folosind tool-uri din modulul PySpark SQL functions. Acest modul conține zeci de operațiuni de agregare optimizate. Să presupunem că vrei să calculezi venitul mediu pentru fiecare dintre acești utilizatori. Vei importa funcția average, cunoscută de obicei ca avg. Pasezi numele coloanei de venit în funcția average, pe care o pui în interiorul metodei aggregate. Când se execută asta, PySpark calculează venitul mediu pentru fiecare bucket distinct de utilizatori, în mod simultan. Apoi intră în acțiune faza de combine, returnând un DataFrame standard, ușor de citit. Acest nou DataFrame conține un singur rând per utilizator, asociat cu venitul mediu nou calculat. În acest moment, ai un tabel perfect sumarizat. Totuși, deoarece calculul a avut loc în paralel pe un cluster distribuit, rândurile finale sunt returnate în ordinea aleatorie în care nodurile de procesare și-au terminat munca. Dacă trebuie să îi vezi pe cei cu cele mai mari venituri, ordinea aleatorie este inutilă. Pentru a rezolva asta, faci chain la metoda order by la finalul pasului tău de agregare. Pasezi metodei order by coloana care conține noile medii și îi spui să sorteze în ordine descrescătoare. PySpark va lua rezultatele combinate, le va ordona și va livra un tabel curat, sortat. Pattern-ul split-apply-combine este puternic tocmai pentru că se mapează perfect pe hardware-ul distribuit, permițând sumarizarea seturilor masive de date în câteva secunde. Dar ține minte că gruparea datelor este doar jumătate din operațiune. Gruparea necesită o agregare pentru a finaliza treaba, altfel ai doar un cluster plin de buckets goale care așteaptă instrucțiuni. Îți mulțumesc că ai petrecut câteva minute cu mine. Până data viitoare, numai bine.
12

Când DataFrames se ciocnesc: Arta asocierii (Joining)

3m 55s

Navigăm prin nuanțele combinării seturilor de date. Detaliem cele șapte tipuri diferite de join din PySpark și explicăm cum să îmbini DataFrames în siguranță.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. PySpark Fundamentals, episodul 12 din 21. Îmbinarea a două tabele masive este cea mai costisitoare operațiune din distributed computing. Aplică o logică de matching greșită și devine cea mai ușoară metodă să-ți dai crash la cluster pentru că rămâi out of memory. Să știi exact cum să combini dataset-urile în siguranță este subiectul principal din When DataFrames Collide: The Art of Joining. Mecanismul principal pentru combinarea datelor în PySpark este metoda join. Apelezi această metodă pe DataFrame-ul tău de bază, pasând DataFrame-ul pe care vrei să-l atașezi, coloana sau coloanele specifice pentru matching și metoda de join. Dacă nu oferi nicio metodă de join, PySpark folosește by default un inner join. Ia în considerare un scenariu concret. Ai un DataFrame care înregistrează înălțimile oamenilor și un al doilea DataFrame care le înregistrează veniturile. Ambele dataset-uri împart o coloană numită name. La un inner join, PySpark se uită la coloana name din ambele dataset-uri și păstrează doar rândurile în care name există în ambele locuri. Dacă o persoană apare în datele cu înălțimi, dar lipsește din datele cu venituri, înregistrarea ei este complet dropped din rezultat. Pentru a păstra înregistrările fără match, schimbi tipul de join. Un left join păstrează fiecare rând din DataFrame-ul tău de pornire, care în acest caz sunt datele cu înălțimi. Dacă PySpark găsește un nume care face match în datele cu venituri, adaugă acel venit. Dacă nu găsește un match, păstrează rândul cu înălțimea, dar pune o valoare null în coloana de venit. Un right join face exact inversul, păstrând toate veniturile și completând înălțimile lipsă cu null-uri. Când ai nevoie de absolut tot, folosești un full join. PySpark păstrează fiecare înregistrare din ambele DataFrame-uri. Numele care fac match sunt îmbinate într-un singur rând, iar orice nume care există într-un singur dataset sunt păstrate, cu valori null completând datele lipsă din cealaltă parte. Iată ideea cheie. Un cross join funcționează diferit, deoarece ignoră complet condiția de join. Asociază fiecare rând din DataFrame-ul heights cu fiecare rând din DataFrame-ul incomes, creând un produs cartezian. Dacă ambele tabele au doar o mie de rânduri, un cross join produce un milion de rânduri. Această creștere explozivă este motivul pentru care cross join-urile sunt puternic restricționate by default și necesită adesea o configurație explicită pentru a se executa fără să arunce o eroare. Ultimele două tipuri de join sunt de fapt operațiuni de filtrare, mai degrabă decât îmbinări reale de date. Un left semi join caută match-uri, returnând rânduri din DataFrame-ul heights numai dacă numele apare și în DataFrame-ul incomes. Diferența crucială față de un inner join este că un left semi join nu aduce nicio coloană din partea dreaptă. Rămâi cu exact aceleași coloane cu care ai început, doar filtrate la înregistrările care au un match corespunzător. Un left anti join face exact opusul. Returnează rânduri din DataFrame-ul heights numai dacă numele nu există în datele incomes. Dă drop complet coloanelor din partea dreaptă. Acest lucru face ca left anti join-ul să fie cea mai eficientă modalitate de a identifica datele lipsă sau de a găsi înregistrări care au eșuat la procesarea downstream. Alegerea join-ului determină nu doar ce date primești înapoi, ci și câte date trebuie să se miște fizic prin rețeaua ta pentru a genera rezultatul. Mulțumesc că m-ai ascultat. Pe data viitoare!
13

SQL vechi, trucuri noi

3m 07s

De ce să înveți un API nou când poți folosi SQL brut? Învață cum să execuți interogări SQL standard direct pe PySpark DataFrames distribuite.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 13 din 21. Ai o echipă de analiști care scriu cod SQL excelent, dar datele tale stau pe un cluster distribuit masiv. Ai putea să-i forțezi să învețe o sintaxă Python complet nouă, sau ai putea să-i lași să dezlănțuie limbajul pe care îl știu deja. Aici intervine rularea de string-uri raw SQL direct în PySpark, aducând trucuri noi pentru un SQL vechi. PySpark îți oferă o punte directă către SQL standard printr-o singură metodă din sesiunea ta Spark, numită pur și simplu sql. Pasezi un string raw SQL în această metodă. Output-ul nu este plain text. Este un DataFrame PySpark standard. Asta înseamnă că poți rula un query standard de bază de date, primești înapoi un DataFrame și îl pasezi imediat unei alte funcții Python. Este complet interoperabil. Înainte să poți face query-uri pe date cu SQL, PySpark trebuie să știe ce tabele există. Ai două modalități principale de a-ți expune datele către motorul SQL. În primul rând, dacă ai deja un DataFrame în Python, poți apela o metodă pentru a-l înregistra ca un temporary view. Îi dai un nume de tip string și, dintr-o dată, se comportă ca un tabel în query-urile tale SQL. În al doilea rând, poți crea tabele în întregime în string-ul tău SQL. Pasezi un statement create table în metoda sql. În interiorul acelui string, definești schema și îi spui lui PySpark exact unde se află fișierele de date subiacente, cum ar fi un path de cloud storage care conține fișiere Parquet. PySpark înregistrează asta în catalogul său intern. De atunci încolo, îi faci query după nume, exact ca la un tabel tradițional de bază de date. Compară cum arată aceeași logică în ambele abordări. Să zicem că trebuie să extragi numele clienților, să elimini pe oricine are soldul zero și să faci merge la rezultat cu un tabel orders. În API-ul DataFrame, construiești un chain de metode Python. Apelezi select pe dataset-ul tău de clienți pentru a alege coloana name. Apoi adaugi în chain o metodă filter, verificând dacă soldul este mai mare ca zero. În cele din urmă, adaugi o metodă join care face referință la dataset-ul orders pe o cheie comună. Este o abordare extrem de programatică. În abordarea SQL, scrii un statement select standard care extrage coloana name, adaugi o clauză where pentru sold și scrii un inner join pentru tabelul orders. Stă în scriptul tău ca un singur bloc de string, ușor de citit. Iată ideea cheie. Există o concepție greșită comună conform căreia scrierea de SQL în string-uri Python ar fi mai lentă sau mai puțin nativă decât utilizarea metodelor structurate din DataFrame. Asta este fals. Indiferent dacă faci chain la metode Python sau pasezi un string raw SQL, PySpark le tratează identic. Ambele input-uri sunt imediat parsate, traduse în exact același plan logic și predate optimizatorului Catalyst. Motorul de execuție nu știe și nu-i pasă ce API ai folosit pentru a-ți exprima intenția. Performanța este exact aceeași. Alegerea dintre API-ul DataFrame și raw SQL nu este niciodată despre performanța cluster-ului. Este pur și simplu despre ce îți face echipa mai rapidă și codebase-ul mai ușor de întreținut. Mulțumesc că ai stat cu noi. Sper că ai învățat ceva nou.
14

Interschimbarea DataFrames și SQL

3m 44s

Combină SQL cu Python fără probleme. Descoperă cum să creezi vizualizări temporare (temporary views) din DataFrames, să folosești selectExpr și să înlănțui operațiuni programatice pe rezultatele interogărilor SQL.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 14 din 21. S-ar putea să te trezești prins într-o dezbatere despre cum să-ți scrii transformările de date: în Python sau SQL. Să forțezi o alegere strictă între cele două înseamnă să pierzi o grămadă de utilitate. Adevăratul avantaj constă în interschimbarea perfectă a DataFrame-urilor și a codului SQL în cadrul exact aceluiași pipeline. Uneori, un set complex de nested joins este mult mai ușor de citit și de întreținut pentru echipa ta în raw SQL. Alteori, trebuie să iterezi dinamic prin numele coloanelor, ceea ce este imposibil în pure SQL, dar banal în Python. PySpark îți permite să combini ambele abordări fără să-ți întrerupi fluxul de date. Ca să începi să scrii SQL pe un DataFrame Python existent, trebuie mai întâi să expui acel DataFrame către engine-ul Spark SQL. Obții asta apelând metoda create or replace temp view direct pe DataFrame-ul tău. Îi pasezi un singur argument de tip string, care devine numele tabelului. Această operațiune nu mută date. Nu scrie pe disk. Pur și simplu înregistrează un pointer temporar în sesiunea ta curentă de Spark. Engine-ul SQL știe acum cum să rezolve acel nume de tabel înapoi la DataFrame-ul tău din Python. Acum poți să-i faci query. Apelezi spark dot sql și îi pasezi statement-ul tău standard de select ca string, făcând referință la numele tabelului pe care tocmai l-ai creat. Iată ideea cheie. Output-ul acelui apel spark dot sql nu este un rezultat text static și nici un alt tip de obiect. Returnează un DataFrame PySpark standard. Asta înseamnă că poți să faci chain imediat cu metode normale de DataFrame din Python, direct la finalul apelului tău SQL. Poți scrie un string SQL de cincizeci de linii ca să gestionezi o window function complexă, închizi paranteza de la spark dot sql și adaugi imediat o metodă dot filter sau dot group by. Treci de la Python la SQL și înapoi la Python într-un singur bloc de cod. Dacă ai nevoie de SQL doar pentru un anumit calcul pe o coloană, înregistrarea unui temporary view complet nu este necesară. În schimb, folosești metoda select expression. Această metodă acționează ca o punte. Funcționează exact ca o metodă select standard de DataFrame, dar acceptă expresii string de raw SQL în loc de obiecte column din Python. Dacă trebuie să execuți un statement case-when, să aplici funcții matematice sau să faci cast la un data type folosind sintaxa nativă SQL, pasezi exact acele string-uri SQL în select expression. Spark ia acele string-uri, le face parse și le execută exact așa cum ar face-o într-un query SQL complet. Asta îți permite să rămâi în întregime în cadrul API-ului chainable de DataFrame, bazându-te în același timp pe sintaxa SQL pentru o logică complexă la nivel de rând. Granița dintre aceste două paradigme este complet artificială. Indiferent dacă faci chain la metode Python, scrii query-uri raw SQL sau folosești string-uri în select expression, Spark compilează totul în exact același execution plan optimizat. Dacă vrei să ne ajuți să continuăm să facem aceste episoade, poți căuta DevStoriesEU pe Patreon ca să susții emisiunea. Asta e tot pentru acest episod. Mulțumesc că ai ascultat și continuă să construiești!
15

Extinderea Spark cu Python UDFs

3m 51s

Când funcțiile încorporate nu sunt suficiente, intervin User-Defined Functions. Explorăm cum să scrii o logică Python personalizată pentru DataFrames și de ce UDFs scalare standard ascund o penalizare de performanță.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 15 din 21. Scrii o funcție custom în Python, o integrezi în data pipeline-ul tău și funcționează perfect pe un sample mic. Dar când o rulezi pe tot dataset-ul, job-ul abia se mai mișcă, în timp ce consumul de CPU crește brusc. Codul în sine este în regulă, dar plătești o taxă de execuție ascunsă. Astăzi vorbim despre extinderea Spark cu Python UDFs. O User Defined Function, sau UDF, îți permite să execuți logică Python custom direct pe un DataFrame Spark. O folosești atunci când funcțiile Spark SQL built-in nu acoperă logica ta specifică de business. Procesul este simplu. Începi prin a scrie o funcție Python standard. De exemplu, scrii o funcție care primește un string de text, aplică o regulă complexă de formatare custom și returnează string-ul modificat. Pentru ca Spark să recunoască această funcție, imporți funcția udf din modulul PySpark SQL functions și o aplici ca decorator direct deasupra definiției funcției tale Python. De asemenea, îi transmiți un return type decoratorului, cum ar fi un string type sau un integer type. Dacă nu specifici un return type, Spark face default la un string type, ceea ce poate cauza probleme silențioase de date dacă funcția ta returnează de fapt un număr. Odată decorată, funcția ta Python custom se comportă exact ca o funcție Spark nativă. O poți pasa în operațiuni pe DataFrame, cum ar fi un select statement, dându-i nume de coloane ca argumente. Iată ideea cheie. Un Python UDF scalar standard operează strict rând cu rând. Preia una sau mai multe valori de coloană dintr-un singur rând ca input, evaluează logica ta Python custom și returnează exact o valoare de output pentru acel rând specific. Dacă DataFrame-ul tău conține zece milioane de rânduri, funcția ta Python este invocată de zece milioane de ori separat. Această operațiune rând cu rând este ușor de înțeles, dar creează bottleneck-ul masiv de performanță pe care l-am menționat la început. Pentru a înțelege de ce este atât de lentă, trebuie să te uiți la modul în care Spark execută codul în spate. Spark este construit în Scala, ceea ce înseamnă că motorul său principal rulează în interiorul unei mașini virtuale Java, sau JVM. UDF-ul tău custom este scris în Python. JVM-ul nu poate executa cod Python nativ. Pentru a aplica UDF-ul, Spark este forțat să pornească procese worker Python separate alături de propriii executors. Apoi trebuie să mute fizic datele din spațiul de memorie JVM în procesul Python. Spark se bazează pe o librărie de serializare Python numită cloudpickle pentru a gestiona acest transfer complex. Aici se colectează taxa de performanță. Pentru fiecare rând din dataset-ul tău, Spark serializează inputurile în JVM, trimite acele date binare printr-un socket local către worker-ul Python și le deserializează în obiecte Python standard. Funcția ta custom rulează în cele din urmă pe aceste obiecte. Apoi, întregul ciclu se întâmplă în sens invers. Python serializează valoarea de output folosind cloudpickle, o trimite înapoi prin socket, iar JVM-ul o deserializează înapoi în formatul de memorie internă al Spark. Această serializare și deserializare constantă între Java și Python este incredibil de costisitoare. Costul real al unui Python UDF standard este rareori logica pe care o scrii; este overhead-ul silențios al traducerii datelor dus-întors între două runtime environments complet diferite, pentru fiecare rând în parte. Mulțumesc că ai petrecut câteva minute cu mine. Până data viitoare, numai bine.
16

Accelerarea UDFs cu Apache Arrow

3m 59s

Elimină blocajul de serializare JVM-to-Python. Descoperim cum Vectorized Pandas UDFs și formatele de memorie Apache Arrow îți accelerează masiv transformările personalizate.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 16 din 21. Ce-ar fi dacă ai putea accelera funcțiile tale custom de Python din Spark de zece ori, doar schimbând un singur decorator? UDF-urile Python standard sunt cunoscute ca fiind foarte lente, dar soluția nu necesită rescrierea logicii tale în Scala. Astăzi, vorbim despre accelerarea UDF-urilor cu Apache Arrow. Când rulezi un UDF Python standard, te lovești de un zid masiv de performanță la granița dintre limbaje. Spark operează în interiorul Java Virtual Machine, dar logica ta custom rulează într-un worker process Python separat. Pentru a transfera datele între ele, Spark extrage rândurile din memoria sa internă, le serializează folosind o librărie numită cloudpickle și le trimite către Python. Python procesează datele rând cu rând, serializează rezultatul și îl trimite înapoi. Să faci asta pentru milioane de rânduri individuale creează un bottleneck de serializare insuportabil. Apache Arrow schimbă regulile acestui schimb de date. Arrow este un format de date in-memory, columnar și cross-language. Standardizează felul în care datele arată în memorie, astfel încât atât JVM-ul, cât și Python să le înțeleagă nativ, fără o traducere complexă. În loc să serializeze datele rând cu rând, Spark împachetează datele în batch-uri mari, columnare. Toate valorile pentru o anumită coloană stau una lângă alta în memorie contiguă. Spark trimite aceste blocuri mari către Python într-un singur pas eficient. Poți profita de asta în două moduri. În primul rând, poți activa optimizarea Arrow pentru UDF-urile standard. Faci asta setând proprietatea de configurare Spark pentru execuția Arrow pe true, sau specificând parametrul useArrow equals true atunci când îți înregistrezi UDF-ul. Spark va folosi Arrow pentru a transfera datele în batch-uri, reducând dramatic overhead-ul de serializare, chiar dacă funcția ta Python încă execută tehnic logica rând cu rând. Aici e ideea esențială. Pentru a obține un speed boost maxim, vrei ca scriptul tău Python să proceseze simultan acele batch-uri Arrow. Aici intervin UDF-urile Pandas. Când faci wrap la funcția ta custom cu decoratorul pandas UDF, schimbi modul în care funcția primește datele. În loc să primească o singură valoare pentru un rând, funcția ta primește un Pandas Series care conține un batch întreg de valori. Funcția ta aplică o operație vectorizată pe acel întreg batch și returnează un nou Pandas Series de exact aceeași lungime. Gândește-te la o funcție numită calculate tax. Aplici decoratorul pandas UDF și declari că returnează un tip double. Funcția acceptă un Pandas Series care conține prețurile produselor. În interiorul funcției, nu scrii un for-loop. Pur și simplu scrii un return statement care înmulțește Series-ul de input cu unu virgulă doi. Pentru că Pandas se bazează under the hood pe cod C extrem de optimizat, înmulțește instantaneu întregul bloc de prețuri. Spark preia apoi acel Series returnat și îi face merge perfect înapoi în DataFrame folosind Arrow. Adevărata putere a unui Pandas UDF nu constă doar în faptul că evită bottleneck-ul de serializare cloudpickle, ci și în faptul că mută calculul propriu-zis din loop-uri Python lente într-o execuție nativă, vectorizată. Mulțumesc pentru audiție. Aveți grijă de voi, tuturor.
17

Explodarea rândurilor cu Python UDTFs

4m 25s

UDFs standard returnează o valoare pe rând, dar ce faci dacă ai nevoie de mai multe rânduri? Învață cum Python User-Defined Table Functions (UDTFs) rezolvă probleme complexe de generare one-to-many.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 17 din 21. User-Defined Functions standard sunt strict limitate la o mapare unu-la-unu. Pasezi o valoare, primești exact o valoare înapoi. Dar ce se întâmplă dacă un singur log entry dens trebuie extins în o sută de rânduri separate? Pentru a rezolva asta, folosești Python User-Defined Table Functions, sau UDTF-uri. Un UDTF face exact ceea ce sugerează numele. Returnează un tabel întreg dintr-un singur input. În timp ce un UDF standard calculează o singură valoare scalară, un UDTF poate emite mai multe rânduri și mai multe coloane. Acesta este tool-ul la care apelezi atunci când trebuie să dai explode unui string JSON nested, să parsezi un fișier text delimitat linie cu linie, sau să generezi o secvență de date calendaristice dintr-un singur timestamp. Pentru a crea un UDTF în PySpark, nu scrii o funcție standalone de bază. În schimb, definești o clasă Python. Această clasă necesită o metodă specifică numită eval. Metoda eval este locul unde are loc transformarea propriu-zisă. Când execuți UDTF-ul, Spark apelează această metodă pentru fiecare valoare de input. Iată ideea cheie. În interiorul acelei metode eval, nu folosești un return statement standard. În schimb, folosești keyword-ul yield din Python. De fiecare dată când metoda dă yield unei valori, Spark o traduce într-un rând nou în tabelul tău de output. Dacă pasezi un singur string de input, metoda eval ar putea să itereze prin el și să dea yield de zece ori. Spark ia acele zece yield-uri și produce zece rânduri distincte. Hai să trecem printr-un exemplu concret. Construiești o clasă numită ProcessWords. Scopul tău este să pasezi o propoziție completă și să primești înapoi un tabel în care fiecare cuvânt are propriul rând. Scrii metoda eval pentru a accepta un string de text. În interiorul metodei, dai split propoziției după spații. Apoi, iterezi prin cuvintele rezultate. Pentru fiecare cuvânt, dai yield unui tuple care conține cuvântul în sine. Înainte ca Spark să poată folosi această clasă, îi aplici decoratorul PySpark UDTF. Decoratorul este obligatoriu deoarece îți definește schema de output. Declari explicit numele coloanelor și tipurile de date pe care le generează funcția ta. Dacă dai yield unui string, îi spui decoratorului că output-ul este o coloană de tip string. Dacă vrei să dai yield cuvântului și numărului său de caractere, dai yield unui tuple cu două elemente, iar decoratorul tău specifică o schema cu o coloană de tip string și o coloană de tip integer. Dincolo de metoda eval, o clasă UDTF poate include și o metodă opțională terminate. Spark apelează metoda terminate exact o singură dată pentru fiecare partiție de date, după ce toate rândurile de input au fost procesate de metoda eval. Acest lucru este foarte util pentru agregare. Dacă metoda ta eval urmărește un contor intern pe mai multe rânduri de input, metoda terminate poate da yield unui rând final care conține acel număr total înainte ca partiția să se închidă. Când apelezi un UDTF într-o operațiune pe un DataFrame, acesta se comportă ca un tabel inline. Dacă pasezi o coloană existentă din DataFrame în UDTF, Spark aplică funcția de tabel rând cu rând. Deoarece o funcție de tabel generează ca output mai multe rânduri pentru fiecare rând de input, combinarea acestui output cu dataset-ul tău original necesită un lateral join implicit. Spark se ocupă de asta în culise, duplicând datele originale din rând pentru a se potrivi cu noile rânduri explodate generate de clasa ta Python. Puterea definitorie a unui UDTF Python este decuplarea completă a volumului de input de volumul de output, permițând unui singur punct de date să se transforme într-un dataset complet cu mai multe coloane. Asta e tot pentru acest episod. Mulțumesc pentru audiție și continuă să construiești!
18

API-ul Pandas pe Spark

3m 42s

Scalează-ți scripturile Pandas existente la infinit. Descoperă cum API-ul pyspark.pandas îți permite să execuți sintaxa Pandas standard în mod nativ pe un cluster Spark distribuit.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 18 din 21. Ai un script local de date care funcționează perfect, dar dintr-o dată dimensiunea dataset-ului se cvadruplează și mașina rămâne fără memorie. Cunoști sintaxa la perfecție, dar să rescrii totul pentru un framework distribuit durează zile întregi. API-ul pandas pe Spark rezolvă exact această problemă. API-ul pandas pe Spark îți permite să rulezi workload-uri standard de pandas pe un cluster distribuit. Nu doar emulează orbește pandas. Îți interceptează codul pandas și îl traduce în execution plans optimizate de Spark, în fundal. Ca să-l folosești, imporți modulul numit pyspark dot pandas. Convenția standard este să îi atribui alias-ul ps, oglindind direct familiarul alias pd folosit în workload-urile locale de data science. Dacă ai deja un DataFrame pandas local standard în memorie, tranziția este simplă. Invoci o funcție numită from pandas pe modulul tău ps și îi pasezi DataFrame-ul local. Asta convertește obiectul single-node într-un DataFrame distribuit pandas-on-Spark. Din acel moment, sintaxa pe care o folosești pentru a interacționa cu acest nou obiect rămâne identică cu cea pe care o știi deja. Această consistență se extinde și la modul în care datele sunt procesate intern. API-ul distribuit gestionează nativ datele lipsă exact așa cum o face și pandas local. Dacă dataset-ul tău conține valori NumPy Not-a-Number, API-ul pandas pe Spark le gestionează corect în timpul operațiilor matematice sau al transformărilor structurale. Nu trebuie să inventezi o nouă logică de data cleaning pentru joburile tale Spark. Operațiile standard se traduc direct. Dacă vrei să-ți grupezi datele după o anumită coloană, apelezi funcția standard de grupare. Dacă vrei să calculezi media sau suma, faci chain cu funcția de agregare imediat după. Poți chiar să apelezi funcții de plotting direct pe DataFrame-ul distribuit. Spark procesează calculele grele pe tot clusterul, agregă data points-urile necesare și returnează vizualizarea exact ca și cum ai lucra pe o singură mașină. Iată ideea cheie. Arhitectura din spate este fundamental diferită, iar asta introduce un edge case critic în ceea ce privește generarea de indecși. Pandas local se bazează foarte mult pe un index secvențial, strict ordonat, pentru fiecare rând în parte. Spark, în schimb, partiționează datele și le distribuie pe mai multe mașini independente. Impunerea unui index secvențial strict, ordonat global, într-un sistem distribuit necesită o comunicare constantă între worker nodes. Când creezi un DataFrame pandas-on-Spark fără să definești explicit o coloană de index, API-ul generează automat un index default pentru a imita perfect comportamentul standard din pandas. Crearea și menținerea acestui index default necesită sincronizarea stării pe întregul cluster. Dacă operezi pe un dataset masiv, această sincronizare introduce un overhead de performanță sever. API-ul va emite adesea un warning cu privire la acest overhead intern atunci când se execută. Pentru a evita acest bottleneck, este foarte recomandat să atribui imediat o coloană existentă ca index sau să configurezi API-ul să folosească un tip de index distributed-friendly. API-ul pandas pe Spark îți oferă sintaxa exactă de pandas, susținută de motorul de execuție distribuită din Spark, dar dacă ții minte că indecșii secvențiali stricți vin cu un cost mare de sincronizare, îți vei salva clusterul de încetiniri inutile. Asta e tot pentru astăzi. Mulțumesc pentru audiție — du-te și construiește ceva cool.
19

Încarcă și privește: Formate de stocare

3m 58s

Nu toate formatele de fișiere sunt create la fel. Contrastăm fișierele CSV bazate pe rânduri cu formatele columare precum Parquet și ORC, explorând opțiunile de citire/scriere și tehnicile optime de stocare.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 19 din 21. Salvarea unui dataset masiv ca CSV este cel mai simplu lucru din lume și este, de asemenea, unul dintre cele mai distructive lucruri pe care le poți face performanței unui data lake. Plătești pentru mai mult storage, plătești pentru mai mult compute, iar fiecare query downstream merge extrem de greu. Soluția stă în modul în care gestionezi Load and Behold: Storage Formats, și de ce felul în care îți salvezi datele contează la fel de mult ca felul în care le transformi. PySpark folosește o interfață unificată pentru citirea și scrierea datelor pe zeci de sisteme de storage. Apelezi atributul read sau write pe sesiunea ta Spark sau pe DataFrame, specifici un format, oferi un chain de opțiuni și îl pointezi către un file path. Este un pattern previzibil, dar opțiunile pe care le alegi dictează cât de multă muncă va trebui să facă clusterul tău mai târziu. Să începem cu formatele human-readable, CSV și JSON. Acestea sunt formate row-based. Când citești un CSV, Spark parsează datele linie cu linie. Adesea trebuie să faci chain la opțiuni specifice pentru a da sens textului. De exemplu, poți face chain la o opțiune pentru a-i spune lui Spark că fișierul are un header, o altă opțiune pentru a seta un delimiter custom, cum ar fi un pipe sau un tab, și o a treia opțiune pentru a defini exact cum arată o valoare null, poate pasând un string specific, astfel încât Spark să o mapeze corect la o valoare goală în loc să o trateze ca text literal. JSON este puțin mai bun deoarece gestionează structurile nested nativ, dar repetă cheile din schema pentru fiecare record în parte, umflând masiv dimensiunea fișierului. Ambele formate obligă Spark să citească întregul rând de pe disk, chiar dacă query-ul tău cere o singură coloană. Aici intervin formatele columnare, cum ar fi Parquet și ORC. Fii atent la partea asta. Query-urile analitice au rareori nevoie de fiecare coloană dintr-un tabel wide. De obicei, au nevoie de coloane specifice de-a lungul a milioane de rânduri pentru a rula agregări. Parquet și ORC stochează datele organizate pe coloană, nu pe rând. Dacă faci un query pe trei coloane dintr-o sută, Spark citește doar chunk-urile din fișier care conțin acele trei coloane. Dă skip complet la restul, reducând input-ul și output-ul de pe disk la o fracțiune din ceea ce necesită un CSV. Deoarece datele de același tip sunt stocate împreună, formatele columnare se și comprimă excelent. Un director de fișiere JSON se poate micșora cu șaptezeci la sută sau mai mult atunci când este convertit în Parquet. De asemenea, ele includ schema exactă și tipurile de date în metadata fișierului, ceea ce înseamnă că Spark nu trebuie să ghicească sau să facă infer la tipuri la load. Când ești gata să dai write la aceste date, trebuie să gestionezi acel state la destinație. By default, dacă încerci să dai write pe un path unde datele există deja, Spark aruncă o eroare pentru a preveni pierderea accidentală a datelor. Controlezi asta folosind metoda mode înainte de a declanșa salvarea. Dacă pasezi string-ul overwrite, Spark șterge datele existente la target path și le înlocuiește cu DataFrame-ul tău curent. Dacă pasezi append, Spark adaugă pur și simplu noile tale part files în directorul existent. Există, de asemenea, un mode ignore, care în mod silențios nu face nimic dacă directorul este deja populat. Scrierea de date clean, typed și columnare astăzi, salvează clusterul tău de ore întregi de timp de procesare irosit mâine. Dacă vrei să ajuți ca aceste episoade să continue, poți susține podcastul căutând DevStoriesEU pe Patreon. Mersi că ai petrecut câteva minute cu mine. Până data viitoare, numai bine.
20

Vânătoarea de bug-uri: Planuri fizice și Joins

3m 05s

Aruncă o privire sub capota motorului de execuție Spark. Învață cum să depanezi interogările folosind DataFrame.explain() și cum să elimini amestecările (shuffles) costisitoare folosind Broadcast joins.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. Fundamentele PySpark, episodul 20 din 21. Jobul tău PySpark nu este lent pentru că procesează date. Este lent pentru că pierde tot timpul mutând date prin rețea. Când un simplu join îți încetinește clusterul la maximum, soluția stă în depanare: planuri fizice și join-uri. Când scrii un script PySpark, definești operații logice. Îi spui lui Spark ce vrei, nu cum să o facă. Dar când un job are performanțe slabe, trebuie să știi exact cum ți-a executat Spark cererea. Faci acest lucru apelând metoda explain pe DataFrame-ul tău. Când apelezi explain, se afișează planul fizic. Acesta este planul task-urilor reale pe care Spark le rulează pe clusterul tău. Citești acest plan de jos în sus, urmărind datele din fișierele sursă până la rezultatul final. Dacă te uiți la planul fizic pentru un join standard între două DataFrame-uri, probabil vei vedea un pas numit SortMergeJoin. Pentru a face un SortMergeJoin, Spark trebuie să se asigure că rândurile cu aceleași chei de join se află fizic pe același executor. Pentru a obține asta, Spark face un Exchange. Exchange este termenul din planul fizic pentru un network shuffle. Înseamnă că Spark scoate datele din partiții, le trimite prin rețea și le scrie pe disc, ca să le poată citi ceilalți executori. Shuffle-ul este de departe cea mai costisitoare operațiune din distributed computing. Iată ideea cheie. Dacă faci un join între un fact table masiv și un lookup table mic, un shuffle pe tabelul mare este o risipă uriașă de resurse. În loc să faci shuffle pe ambele tabele pentru a alinia cheile, poți pur și simplu să trimiți tot tabelul mic către fiecare executor. Asta se face folosind funcția broadcast din modulul de funcții PySpark SQL. Când apelezi metoda join, pur și simplu încapsulezi DataFrame-ul mai mic în funcția broadcast. Încapsulând tabelul mic, îi dai lui Spark o directivă strictă. Spark va colecta DataFrame-ul mic pe driver node, iar apoi va transmite o copie completă a acestuia în memoria fiecărui executor în parte. Acum, când DataFrame-ul mare este procesat, executorii au deja toate datele de lookup de care au nevoie direct acolo, în RAM. Ei pur și simplu parcurg partițiile existente și fac match la rânduri local. Nu este nevoie de sortare și nicio dată din tabelul mare nu se mută prin rețea. Dacă apelezi explain pentru acest nou join cu broadcast, planul fizic arată complet diferit. SortMergeJoin a dispărut. Pasul costisitor de Exchange lipsește cu desăvârșire. În locul lor, vei vedea un BroadcastExchange și un BroadcastHashJoin. BroadcastExchange mută tabelul mic o singură dată, iar join-ul în sine are loc în întregime pe loc. Cea mai ușoară metodă să dublezi viteza unui job Spark este să te oprești din a muta date care nu trebuie mutate. Citește-ți planurile fizice, identifică network exchange-urile și fă broadcast la tabelele mici. Asta e tot pentru azi. Mersi că ai ascultat — du-te și construiește ceva cool.
21

Profiling pentru memoria și performanța PySpark

3m 41s

Încheiem călătoria noastră PySpark prin introducerea instrumentelor native de profiling. Învață cum să urmărești consumul de memorie linie cu linie și să expui traceback-urile interne Python ascunse.

Descarcă
Salut, sunt Alex de la DEV STORIES DOT EU. PySpark Fundamentals, episodul 21 din 21. Să faci debugging pe codul Python distribuit înseamnă, de obicei, să sapi prin mii de linii de erori Java fără sens, încercând să ghicești de ce a picat funcția ta sau de ce a consumat toată memoria de pe cluster. Acum nu mai trebuie să ghicești. Astăzi ne uităm la cum facem profiling de memorie și performance în PySpark, dar și la simplificarea de stack traces. Când scrii un User Defined Function, sau UDF, în PySpark, codul tău Python rulează peste o infrastructură Java Virtual Machine. Dacă codul tău Python împarte la zero sau apelează o cheie lipsă dintr-un dictionary, acea simplă excepție de Python este înghițită. Este trimisă înapoi prin daemon-ul PySpark, prin rețea, și împachetată în excepții Java masive. Să găsești eroarea reală de Python în logs este un chin. Poți rezolva asta activând simplified tracebacks. Când setezi configurația Spark pentru simplified traceback pe true, PySpark schimbă modul în care raportează erorile. Elimină toate acele logs de interoperabilitate Java și zgomotul de la procesele worker. Data viitoare când un UDF dă fail, consola ta va afișa un stack trace de Python standard și curat, care îți arată exact numărul liniei din fișierul tău Python unde a apărut excepția. Să rezolvi crash-urile este doar jumătate din problemă. Să repari codul lent sau care consumă multă memorie este mult mai greu. Dacă scrii un Pandas UDF care procesează milioane de rânduri, s-ar putea să ruleze cu succes, dar să dureze mult prea mult sau să declanșeze erori de out-of-memory pe nodurile tale executor. În trecut, ca să găsești un bottleneck, trebuia să adaugi manual logging sau să ghicești care operațiune Pandas era ineficientă. Spark 4.0 schimbă asta introducând Python UDF profilers built-in. Iată ideea principală. Acum poți face profiling pe codul tău Python distribuit, linie cu linie, direct din PySpark. Ca să folosești asta, setezi configurația de UDF profiler pe unul din două moduri: performance sau memory. Dacă setezi configurația de profiler pe cuvântul perf, Spark activează performance profiler-ul. Apoi îți rulezi job-ul Spark ca de obicei. În timp ce nodurile worker îți execută Pandas UDF-ul, Spark urmărește timpul de execuție pentru fiecare linie din funcția ta de Python. După ce job-ul se termină, apelezi metoda show pe obiectul de profile din Spark. Spark va printa un raport detaliat în consola ta. Pentru fiecare linie din codul tău, vei vedea exact de câte ori a fost apelată și timpul total petrecut pentru execuția ei. Poți vedea instant dacă o anumită manipulare de string-uri sau o operație matematică îți încetinește întregul pipeline. Dacă te lovești de limite de memorie, setezi configurația de UDF profiler pe cuvântul memory în schimb. Workflow-ul este exact același, dar output-ul se schimbă. Când te uiți pe raportul de profile, Spark îți arată incrementul exact în megabytes cauzat de fiecare linie din codul tău Python. Poți vedea exact unde sunt alocate array-uri mari și unde memoria nu reușește să se elibereze. Această vizibilitate linie cu linie te scapă de ghicit atunci când optimizezi transformări complexe de date. Poți identifica exact cauza problemelor tale de performance fără să părăsești mediul PySpark. Pentru că acesta este ultimul episod din seria noastră despre PySpark, te încurajez să verifici documentația oficială Spark și să încerci aceste tool-uri de debugging hands-on. Dacă ai idei despre ce tehnologii ar trebui să acoperim în următoarea noastră serie, intră pe devstories.eu și lasă-ne un mesaj. Mersi că ai petrecut câteva minute cu mine. Până data viitoare, numai bine.