Wróć do katalogu
Season 14 21 Odcinki 1h 23m 2026

PySpark Fundamentals

v4.1 — Edycja 2026. Kompleksowy przewodnik po PySpark 4.1, obejmujący Spark Connect, DataFrames, złożone typy danych, transformacje danych, SQL, UDFs oraz profilowanie.

Big Data Obliczenia rozproszone Nauka o danych
PySpark Fundamentals
Teraz odtwarzane
Click play to start
0:00
0:00
1
Problem Big Data i obietnica PySpark
Uzasadniamy fundamentalną potrzebę korzystania z PySpark. Odkryj, dlaczego standardowe biblioteki Python, takie jak Pandas, zawodzą przy dużej skali i w jaki sposób PySpark zapewnia rozproszony silnik wykonawczy do płynnego przetwarzania ogromnych zbiorów danych.
4m 02s
2
Rewolucja Spark Connect
Poznaj architekturę Spark Connect. Wyjaśniamy, w jaki sposób PySpark oddzielił klienta od serwera, pozwalając na uruchamianie aplikacji Spark w dowolnym miejscu bez uciążliwych zależności JVM.
3m 56s
3
DataFrames i leniwa ewaluacja
Zanurz się w fundamentalną abstrakcję PySpark: DataFrame. Omawiamy koncepcję leniwej ewaluacji, różnicę między transformacjami a akcjami oraz powody, dla których Spark planuje przed wykonaniem.
4m 25s
4
Tworzenie i przeglądanie DataFrames
Dowiedz się, jak tworzyć instancje DataFrames z surowych obiektów Python, słowników i plików, oraz jak bezpiecznie badać rozproszone dane bez powodowania awarii węzła sterującego.
3m 51s
5
Opanowanie podstawowych typów danych
Przegląd podstawowych typów liczbowych i tekstowych w PySpark. Badamy, jak jawnie definiować schematy przy użyciu StructType i StructField dla solidnych potoków danych.
4m 25s
6
Niebezpieczeństwa związane z precyzją
Odkryj kluczowe różnice między FloatType, DoubleType i DecimalType. Dowiedz się, dlaczego wybór niewłaściwego typu liczbowego może wprowadzić katastrofalne błędy zaokrągleń w danych finansowych.
4m 26s
7
Poskramianie złożonych i zagnieżdżonych danych
Big data nie zawsze są płaskie. Odkrywamy złożone typy danych PySpark, w tym ArrayType, StructType i MapType, które pozwalają na natywne parsowanie głęboko zagnieżdżonych plików JSON.
4m 14s
8
Rzutowanie typów i selekcja
Dowiedz się, jak aktywnie kształtować schematy DataFrame. Omawiamy, jak wybierać podzbiory kolumn oraz jak bezpiecznie rzutować kolumny z jednego typu danych na inny.
4m 17s
9
Skrzyżowanie funkcji: Czyszczenie brudnych danych
Śmieci na wejściu, śmieci na wyjściu. Poznaj niezbędne transformacje DataFrame do usuwania wartości null, wypełniania brakujących danych i natywnej obsługi rekordów NaN w systemach rozproszonych.
3m 54s
10
Transformacja i zmiana kształtu danych
Przejmij kontrolę nad kształtem swoich danych. Odkrywamy, jak generować nowe kolumny za pomocą funkcji matematycznych, wykonywać operacje na ciągach znaków i spłaszczać zagnieżdżone tablice przy użyciu explode().
3m 54s
11
Mechanika grupowania i agregacji
Opanuj strategię split-apply-combine. Zagłębiamy się w grupowanie danych według kluczy i stosowanie potężnych funkcji agregujących do podsumowywania ogromnych zbiorów danych.
3m 37s
12
Kiedy DataFrames się zderzają: Sztuka łączenia
Poruszanie się po niuansach łączenia zbiorów danych. Rozkładamy na czynniki pierwsze siedem różnych typów join w PySpark i wyjaśniamy, jak bezpiecznie scalać DataFrames.
3m 37s
13
Stary SQL, nowe sztuczki
Po co uczyć się nowego API, skoro można użyć surowego SQL? Dowiedz się, jak wykonywać standardowe zapytania SQL bezpośrednio na rozproszonych DataFrames w PySpark.
3m 33s
14
Wymienność DataFrames i SQL
Płynnie łącz SQL z językiem Python. Odkryj, jak tworzyć tymczasowe widoki z DataFrames, używać selectExpr i łączyć programistyczne operacje z wynikami zapytań SQL.
3m 49s
15
Rozszerzanie Spark za pomocą Python UDFs
Gdy wbudowane funkcje nie wystarczają, do akcji wkraczają User-Defined Functions. Odkrywamy, jak pisać niestandardową logikę Python dla DataFrames i dlaczego standardowe skalarne UDFs ukrywają spadek wydajności.
4m 17s
16
Turbodoładowanie UDFs z Apache Arrow
Wyeliminuj wąskie gardło serializacji między JVM a językiem Python. Odkrywamy, jak Vectorized Pandas UDFs i formaty pamięci Apache Arrow niesamowicie przyspieszają Twoje niestandardowe transformacje.
3m 38s
17
Eksplozja wierszy z Python UDTFs
Standardowe UDFs zwracają jedną wartość na wiersz, ale co, jeśli potrzebujesz wielu wierszy? Dowiedz się, jak Python User-Defined Table Functions (UDTFs) rozwiązują złożone problemy generowania relacji jeden-do-wielu.
4m 04s
18
Pandas API w Spark
Skaluj swoje istniejące skrypty Pandas w nieskończoność. Odkryj, jak pyspark.pandas API pozwala na natywne wykonywanie standardowej składni Pandas w rozproszonym klastrze Spark.
3m 53s
19
Wczytaj i podziwiaj: Formaty przechowywania
Nie wszystkie formaty plików są sobie równe. Zestawiamy oparte na wierszach pliki CSVs z formatami kolumnowymi, takimi jak Parquet i ORC, badając opcje odczytu/zapisu oraz optymalne techniki przechowywania.
4m 01s
20
Pogromcy błędów: Plany fizyczne i złączenia
Zajrzyj pod maskę silnika wykonawczego Spark. Dowiedz się, jak debugować zapytania za pomocą DataFrame.explain() i jak wyeliminować kosztowne przetasowania używając Broadcast joins.
3m 18s
21
Profilowanie pamięci i wydajności w PySpark
Kończymy naszą podróż z PySpark, wprowadzając natywne narzędzia do profilowania. Dowiedz się, jak śledzić zużycie pamięci linijka po linijce i ujawniać ukryte wewnętrzne ślady błędów w języku Python.
4m 00s

Odcinki

1

Problem Big Data i obietnica PySpark

4m 02s

Uzasadniamy fundamentalną potrzebę korzystania z PySpark. Odkryj, dlaczego standardowe biblioteki Python, takie jak Pandas, zawodzą przy dużej skali i w jaki sposób PySpark zapewnia rozproszony silnik wykonawczy do płynnego przetwarzania ogromnych zbiorów danych.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 1 z 21. Twój standardowy skrypt w Pythonie działa idealnie podczas testów, ale gdy twój dataset dobija do pięćdziesięciu gigabajtów, wywala się z błędem OutOfMemory. Uderzasz w fizyczne limity pojedynczej maszyny. Rozwiązanie tego bottlenecka to temat dzisiejszego odcinka: problem big data i to, co obiecuje PySpark. Standardowe narzędzia do danych w Pythonie są stworzone do działania na pojedynczym node'zie. Biblioteki takie jak pandas są niesamowicie wydajne, ale wymagają, żeby cały dataset znajdował się w pamięci lokalnej. Jeśli twój serwer ma szesnaście gigabajtów RAM-u i próbujesz załadować pięćdziesiąt gigabajtów logów aplikacji, system operacyjny interweniuje i ubija proces. Skalowanie wertykalne przez wynajęcie większego i droższego serwera tylko opóźnia nieuniknione. Ilość danych rośnie szybciej niż upgrade'y sprzętu. W końcu dane przerastają możliwości twojej maszyny. PySpark rozwiązuje ten problem. To Python API dla Apache Spark. Sam Apache Spark to rozproszony silnik obliczeniowy, który działa na Java Virtual Machine. PySpark działa jak pomost, pozwalając ci pisać logikę czysto w Pythonie, jednocześnie korzystając z mocno zoptymalizowanego, rozproszonego silnika Sparka. To zmienia twoją architekturę ze skalowania wertykalnego na horyzontalne. Zamiast polegać na jednej potężnej maszynie, PySpark partycjonuje twoje dane i dystrybuuje obliczenia w klastrze wielu mniejszych maszyn, znanych jako node'y. Piszesz swój kod w Pythonie, a PySpark tłumaczy go na równoległy execution plan. Jeśli wolumen twoich danych podwoi się w przyszłym miesiącu, nie musisz przepisywać ani jednej linijki kodu. Po prostu dodajesz więcej node'ów do klastra. Ekosystem PySpark jest podzielony na kilka głównych modułów zaprojektowanych pod różne workloady. Pierwszy to Spark SQL. To fundament większości nowoczesnych aplikacji w PySparku. Dostarcza strukturę DataFrame do obsługi danych tabelarycznych rozproszonych na wielu maszynach. Pozwala też na uruchamianie standardowych zapytań SQL bezpośrednio na tych rozproszonych datasetach. Kolejny to Structured Streaming. Ten moduł obsługuje pipeline'y danych w czasie rzeczywistym. Zamiast przetwarzać ogromny batch danych przez noc, Structured Streaming w sposób ciągły przetwarza strumienie rekordów, takie jak odczyty z czujników na żywo czy eventy ruchu sieciowego. Używa dokładnie tego samego modelu programowania co Spark SQL, co oznacza, że twoja logika przetwarzania batchowego i logika streamingu wyglądają niemal identycznie. Następnie mamy MLlib, czyli Machine Learning Library. Trenowanie modeli na ogromnych datasetach na pojedynczej maszynie to znany bottleneck. MLlib dostarcza rozproszone algorytmy machine learningu do zadań takich jak klasyfikacja, regresja i klastrowanie. Rozkłada ciężkie operacje matematyczne na cały klaster, drastycznie skracając czas trenowania. Oto kluczowy wniosek. Prawdziwą siłą PySpark jest abstrakcja. Nigdy nie dzielisz ręcznie swoich ogromnych plików na chunki. Nigdy nie piszesz kodu sieciowego, żeby koordynować serwery. Po prostu definiujesz logiczną sekwencję transformacji, a silnik pod spodem zajmuje się dystrybucją danych, równoległym wykonywaniem, a nawet procesem odzyskiwania, jeśli node straci zasilanie w trakcie obliczeń. PySpark to nie tylko narzędzie do otwierania większych plików. To fundamentalne przejście od obliczeń ograniczonych przez jedną płytę główną do obliczeń ograniczonych jedynie rozmiarem twojego klastra. Jeśli uważasz te odcinki za pomocne i chcesz wesprzeć podcast, możesz wyszukać DevStoriesEU na Patreonie. To wszystko w tym odcinku. Dzięki za wysłuchanie i buduj dalej!
2

Rewolucja Spark Connect

3m 56s

Poznaj architekturę Spark Connect. Wyjaśniamy, w jaki sposób PySpark oddzielił klienta od serwera, pozwalając na uruchamianie aplikacji Spark w dowolnym miejscu bez uciążliwych zależności JVM.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 2 z 21. Przez lata pisanie kodu w PySparku lokalnie oznaczało ciągnięcie za sobą ogromnej, ciężkiej Java Virtual Machine, tylko po to, żeby przetestować prosty skrypt. Musiałeś idealnie zsynchronizować wersje Pythona, konfiguracje Javy i zależności klastra, zanim napisałeś choćby jedną linijkę logiki. Rewolucja Spark Connect sprawia, że to już całkowicie przeszłość. Tradycyjnie PySpark opierał się na architekturze typu tightly coupled. Twój skrypt w Pythonie i execution engine Sparka musiały współistnieć na dokładnie tej samej maszynie fizycznej lub wirtualnej. Odpalenie sesji PySpark oznaczało podniesienie Java Virtual Machine w tle przy użyciu biblioteki typu bridge. Ta architektura obciążała twoje lokalne środowisko deweloperskie pełnym ciężarem execution engine Sparka. Sprawiało to, że osadzanie PySparka w aplikacjach webowych, nowoczesnych edytorach kodu czy urządzeniach typu edge było wysoce niepraktyczne. Spark Connect rozwiązuje ten problem, wprowadzając architekturę typu decoupled client-server. Twoje środowisko pythonowe jest teraz ściśle oddzielone od serwera Sparka. Lokalny klient PySparka staje się lekką biblioteką. Nie wymaga już lokalnej instalacji Javy i sam nie wykonuje tasków przetwarzania danych. Działa wyłącznie jako zdalny interfejs do właściwego klastra Sparka. I tu jest kluczowa sprawa. Kiedy piszesz operacje na DataFrame przy użyciu Spark Connect, lekki klient rejestruje twoje wywołania metod i tłumaczy je na unresolved logical plan. Możesz wyobrazić sobie ten plan jako abstrakcyjny blueprint twojego zapytania, ściśle opisujący, jakie dane przetworzyć, bez martwienia się o to, jak zostaną przetworzone. Klient pakuje ten blueprint używając Protocol Buffers i przesyła go przez połączenie sieciowe gRPC do zdalnego serwera Sparka. Serwer rozpakowuje plan, ogarnia całą złożoną optymalizację zapytań, wykonuje joba na klastrze i na koniec streamuje obliczone wyniki z powrotem do twojego skryptu w Pythonie. Skonfigurowanie tego wymaga drobnej zmiany w tym, jak startujesz swoją aplikację. Nadal używasz buildera SparkSession, ale zamiast polegać na lokalnych konfiguracjach, wywołujesz metodę remote. Podajesz connection string, który szczegółowo określa, gdzie stoi serwer Sparka. Ten string używa dedykowanego schematu połączenia zaczynającego się od liter s c. Więc, jeśli łączysz się z lokalnym serwerem testowym na domyślnym porcie, podajesz string s c dwukropek ukośnik ukośnik localhost dwukropek jeden pięć zero zero dwa. Po tym jednym kroku połączenia, piszesz swój kod DataFrame dokładnie tak samo, jak robiłeś to zawsze. Ponieważ wykonywanie kodu jest w pełni zdalne, możesz podłączyć wielu różnych klientów pythonowych, z różnych aplikacji, do dokładnie tego samego serwera Sparka w tym samym czasie. Kod twojej aplikacji po prostu prosi o transformacje danych, a cała czarna robota zostaje całkowicie po stronie serwera. Poprzez całkowite odizolowanie klienta pythonowego od execution runtime, Spark Connect eliminuje te niesławne konflikty zależności, które kiedyś wysypywały deploye, pozwalając ci na aktualizację środowisk twojej aplikacji całkowicie niezależnie od samego klastra Sparka. Dzięki za spędzenie ze mną tych kilku minut. Do usłyszenia następnym razem, trzymaj się.
3

DataFrames i leniwa ewaluacja

4m 25s

Zanurz się w fundamentalną abstrakcję PySpark: DataFrame. Omawiamy koncepcję leniwej ewaluacji, różnicę między transformacjami a akcjami oraz powody, dla których Spark planuje przed wykonaniem.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySparka, odcinek 3 z 21. Co by było, gdyby twój kod nie uruchamiał się od razu po napisaniu, ale zamiast tego czekał, analizował twój cel końcowy i wyznaczał najszybszą możliwą trasę? Łączysz w chain filtry, agregacje i joiny, a twoja maszyna prawie się nie poci. To dlatego, że nie robi zupełnie nic, dopóki jej do tego nie zmusisz. Ten mechanizm to lazy evaluation i jest to główny silnik napędzający DataFrame'y w PySparku. PySpark DataFrame to rozproszona kolekcja danych zorganizowana w nazwane kolumny. Jeśli znasz bibliotekę pandas, ten koncept wyda ci się identyczny. Różnica polega na tym, że PySpark DataFrame dzieli swoje dane na wiele node'ów obliczeniowych w klastrze. Historycznie, podstawową strukturą w Sparku był Resilient Distributed Dataset, powszechnie znany jako RDD. Ekosystem mocno odszedł od bezpośredniej manipulacji na surowych RDD. Właściwie, od wersji Spark 4.0, bezpośrednie użycie RDD nie jest już wspierane w Spark Connect. DataFrame'y są teraz absolutnym standardem, dostarczając ścisłe API, które pozwala Sparkowi automatycznie optymalizować twoje query. Ta optymalizacja opiera się w całości na lazy evaluation. Każda operacja, którą wykonujesz na DataFrame'ie, wpada do jednej z dwóch ścisłych kategorii: transformacji lub akcji. Transformacje to komendy, które zwracają nowy DataFrame. Przykłady to wybieranie konkretnych kolumn, filtrowanie wierszy na podstawie warunku, grupowanie rekordów czy robienie joinów dwóch oddzielnych tabel. Kiedy aplikujesz transformację, PySpark nie wykonuje przetwarzania danych. Po prostu rejestruje tę operację. Aktualizuje wewnętrzny schemat, tak zwany logical execution plan. Możesz napisać pięćdziesiąt transformacji z rzędu, a Spark tylko szybko zwaliduje składnię i zaktualizuje swój graf. I tu jest kluczowa sprawa. Opóźniając faktyczne wykonanie, PySpark daje swojemu silnikowi zapytań, czyli Catalyst Optimizer, pełny obraz twojego pipeline'u danych. Optymalizator analizuje cały twój chain transformacji, układa je na nowo dla maksymalnej wydajności i całkowicie wyrzuca zbędne kroki, zanim z dysku zostanie odczytany choćby jeden bajt danych. Ten schemat pozostaje całkowicie uśpiony, dopóki nie wywołasz akcji. Akcja to komenda, która żąda konkretnego rezultatu. Albo zwraca dane do twojego drivera, albo zapisuje je w storage'u. Popularne akcje to zliczanie całkowitej liczby wierszy, zbieranie danych z powrotem do lokalnej listy w Pythonie, albo polecenie systemowi wyświetlenia dwudziestu pierwszych rekordów na ekranie. W momencie, gdy odpalasz akcję, silnik rusza do pracy. Tłumaczy twój zoptymalizowany logical plan na physical plan, rozdziela taski do workerów w klastrze i uruchamia obliczenia. Weźmy pod uwagę standardowy data workflow. Najpierw tworzysz DataFrame, wskazując na ogromny plik. Następnie robisz join z osobną tabelą ze szczegółami użytkowników. Po joinie filtrujesz wyniki, żeby uwzględnić tylko użytkowników z konkretnego miasta. Na koniec prosisz Sparka o wyświetlenie wyniku. Dzięki lazy evaluation, Spark tak naprawdę nie ładuje całego pliku, nie wykonuje potężnego rozproszonego joina, żeby dopiero na końcu przefiltrować wyniki. Zamiast tego, optymalizator patrzy na twoje końcowe żądanie, zauważa filtr i wypycha tę operację filtrowania w górę chaina, na długo przed wykonaniem joina. Selektywnie odczytuje tylko istotne rekordy, drastycznie zmniejszając zużycie pamięci i ruch sieciowy w klastrze. Twój skrypt w PySparku nigdy nie jest sekwencją natychmiastowych komend. To zestaw instrukcji szkicujących architektoniczny blueprint, a system rozpoczyna budowę dopiero wtedy, gdy w końcu zażądasz końcowego rezultatu. To wszystko na dziś. Dzięki za wysłuchanie — idź zbudować coś fajnego.
4

Tworzenie i przeglądanie DataFrames

3m 51s

Dowiedz się, jak tworzyć instancje DataFrames z surowych obiektów Python, słowników i plików, oraz jak bezpiecznie badać rozproszone dane bez powodowania awarii węzła sterującego.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 4 z 21. Wywołanie jednej konkretnej metody na ogromnym zbiorze danych to gwarantowany sposób na natychmiastowe zcrashowanie całej aplikacji błędem out-of-memory. Wiedza o tym, jak bezpiecznie przenosić dane do i ze Sparka bez wywalenia driver node'a, jest kluczowa. Właśnie o tym jest ten odcinek: o tworzeniu i przeglądaniu DataFrame'ów. Każda aplikacja PySpark potrzebuje danych do działania. DataFrame'y tworzysz zazwyczaj na trzy sposoby. Po pierwsze, możesz je tworzyć bezpośrednio ze struktur Pythona in-memory. Po prostu definiujesz listę słowników, gdzie każdy słownik reprezentuje wiersz, a klucze to nazwy kolumn, i przekazujesz ją do metody createDataFrame na twojej SparkSession. Po drugie, jeśli masz już pandas DataFrame w pamięci, możesz przekazać dokładnie ten obiekt pandas do tej samej metody createDataFrame. PySpark obsługuje konwersję automatycznie. Trzecim i najpopularniejszym sposobem jest odczyt z plików zewnętrznych. Używasz atrybutu read na swojej SparkSession, a następnie formatu, który cię interesuje, jak csv czy json, i podajesz ścieżkę do pliku. Po załadowaniu danych musisz je zweryfikować. DataFrame'y w PySparku są rozproszone, co oznacza, że nie możesz po prostu zrobić printa zmiennej i zobaczyć danych, tak jak w standardowym skrypcie Pythona. Aby zobaczyć strukturę swoich danych, wywołujesz metodę printSchema. Wypisuje to tekstowe drzewo pokazujące każdą nazwę kolumny i odpowiadający jej typ danych. To najszybszy sposób, żeby sprawdzić, czy twój plik załadował się poprawnie. Aby wyświetlić rzeczywistą zawartość, używasz metody show. Domyślnie wywołanie show wyświetla pierwsze dwadzieścia wierszy w formacie tabelarycznym. Zwróć na to uwagę. Jeśli twoje kolumny zawierają długie stringi, metoda show je ucina. Możesz to wyłączyć, przekazując argument truncate ustawiony na false lub ustawiając go na określoną liczbę znaków. Jeśli twój DataFrame ma dziesiątki kolumn, standardowy widok tabeli zawija się na ekranie i staje się nieczytelny. W takim przypadku możesz przekazać argument vertical ustawiony na true. To wypisze każdy wiersz jako pionowy blok par klucz-wartość, co znacznie ułatwia czytanie szerokich zbiorów danych w terminalu. Teraz przechodzimy do wspomnianego wcześniej crashu out-of-memory. Czasami musisz przenieść rozproszone dane z powrotem do zwykłych obiektów Pythona. Metoda, która do tego służy, nazywa się collect. Oto kluczowa sprawa. Metoda collect bierze każdy pojedynczy wiersz z każdego executora w całym twoim klastrze i wpycha go do pamięci twojego pojedynczego driver node'a. Jeśli twój DataFrame zawiera miliard wierszy, twojemu driverowi zabraknie pamięci i natychmiast się zcrashuje. Powinieneś wywoływać metodę collect tylko wtedy, gdy zagregowałeś lub przefiltrowałeś swoje dane do małego rozmiaru. Pracując z dużymi zbiorami danych, zawsze wyciągaj mniejsze próbki. Zamiast collect, użyj metody take, przekazując liczbę wierszy, którą chcesz pobrać. Zwraca to standardową listę Pythona zawierającą tylko te kilka pierwszych wierszy. Jeśli musisz sprawdzić koniec swojego zbioru danych, użyj metody tail, aby pobrać kilka ostatnich wierszy. Obie metody bezpiecznie ograniczają ilość danych przesyłanych do twojego drivera. Zasada dla danych rozproszonych jest prosta: wypychaj obliczenia na klaster, ale ściśle ograniczaj liczbę wierszy, które pobierasz z powrotem do drivera. To wszystko w tym odcinku. Dzięki za wysłuchanie i buduj dalej!
5

Opanowanie podstawowych typów danych

4m 25s

Przegląd podstawowych typów liczbowych i tekstowych w PySpark. Badamy, jak jawnie definiować schematy przy użyciu StructType i StructField dla solidnych potoków danych.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 5 z 21. Poleganie na automatycznym schema inference może zaoszczędzić ci kilka linijek kodu, ale będzie cię drogo kosztować pod kątem wydajności na produkcji. Klaster często musi przeczytać twój cały dataset, tylko po to, żeby zgadnąć, co jest w środku, zanim wykona jakąkolwiek właściwą pracę. Naprawisz to, opanowując podstawowe typy danych i explicit schemas. Często mylisz standardowe typy Pythona z typami danych PySpark. Kiedy deklarujesz integera lub stringa w standardowym Pythonie, ten obiekt żyje w pamięci twojej lokalnej maszyny. Typy PySpark działają na zupełnie innym poziomie. Są to instrukcje mapowania dla Catalyst optimizera i działającej pod spodem Java Virtual Machine. Kiedy używasz typów danych PySpark, definiujesz ścisłą, cluster-aware strukturę. Gwarantuje to spójność danych na setkach rozproszonych worker nodes i dokładnie dyktuje, jak dane są serializowane w sieci. PySpark dostarcza specyficzny typ dla każdego standardowego kształtu danych, a wybór tego właściwego jest kluczowy dla wydajności. Dla liczb masz ByteType dla bardzo małych integerów, IntegerType dla standardowych liczb i LongType dla dużych wartości. Wybranie ByteType zamiast LongType dla prostego status code'u oszczędza mnóstwo pamięci, gdy ten wybór pomnożysz przez miliardy wierszy. Do tekstu i logiki używasz StringType i BooleanType. Prawidłowa obsługa czasu to kolejny obszar, w którym dokładne typowanie ma znaczenie. PySpark dzieli dane czasowe na DateType i TimestampType. Używasz DateType, gdy interesuje cię tylko data z kalendarza, jak urodziny użytkownika. Używasz TimestampType, gdy potrzebujesz dokładnych punktów w czasie, śledząc zarówno datę, jak i dokładną godzinę, minutę i sekundę wystąpienia eventu. Znajomość tych typów to tylko podstawa. Musisz je zaaplikować bezpośrednio do swojego procesu data ingestion, używając explicit schema. Konstruujesz ten schema za pomocą dwóch konkretnych obiektów: StructType i StructField. Możesz myśleć o StructType jako o planie całego wiersza w twoim dataframe. StructField to plan pojedynczej kolumny w tym wierszu. Aby zbudować explicit schema, tworzysz instancję StructType i przekazujesz jej kolekcję StructFields. Każdy StructField wymaga trzech konkretnych argumentów. Po pierwsze, podajesz nazwę kolumny jako standardowy string. Po drugie, przekazujesz konkretny typ danych PySpark, który chcesz wymusić, taki jak IntegerType lub StringType. Po trzecie, podajesz flagę boolean wskazującą, czy ta kolumna może zawierać wartości null. Na przykład, konstruujesz schema, zaczynając od StructField o nazwie user identifier, przypisanego do StringType, i ustawiasz flagę null na false. Następnie dodajesz StructField o nazwie account age, przypisany do IntegerType, ustawiając flagę null na true. Kiedy ten obiekt StructType jest już w pełni złożony, przekazujesz go bezpośrednio do swojego dataframe readera za pomocą metody schema, zanim wywołasz komendę load, aby odczytać swoje pliki. To jest ta część, która ma znaczenie. Kiedy dostarczysz ten explicit schema z góry, PySpark całkowicie pomija fazę skanowania danych. Aplikuje twój plan bezpośrednio do przychodzącego strumienia danych. To drastycznie skraca czas odczytu pliku. Działa to również jako natychmiastowy quality gate. Jeśli pojawi się uszkodzony plik z tekstem w twojej kolumnie integer, pipeline obsłuży go w oparciu o twoją zdefiniowaną strukturę, zamiast po cichu przesuwać inferred schema downstream i psuć twoje transformacje. Definiowanie twojego schema w sposób explicit przekształca kruchą, kosztowną operację odczytu w przewidywalny, wysoce zoptymalizowany krok pipeline'u. Dzięki za wysłuchanie, happy coding wszystkim!
6

Niebezpieczeństwa związane z precyzją

4m 26s

Odkryj kluczowe różnice między FloatType, DoubleType i DecimalType. Dowiedz się, dlaczego wybór niewłaściwego typu liczbowego może wprowadzić katastrofalne błędy zaokrągleń w danych finansowych.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 6 z 21. Używanie standardowego typu float może wydawać się nieszkodliwe, dopóki twoje query agregujące po cichu nie pomyli się o miliony w transakcjach finansowych. Kod, który działa idealnie, może generować liczby, które są nieznacznie, ale niebezpiecznie błędne. Właśnie dlatego musimy porozmawiać o pułapkach precyzji. W PySpark masz trzy główne sposoby przechowywania liczb z częścią ułamkową. Masz do dyspozycji FloatType, DoubleType i DecimalType. Nie można ich stosować zamiennie. Częstym błędem jest pozwolenie, by PySpark sam wyinferował schema z twoich surowych danych. Inferencja zazwyczaj przypisuje DoubleType do każdej liczby z kropką dziesiętną. Jeśli obliczasz przychody finansowe, poleganie na tym domyślnym zachowaniu to poważne ryzyko operacyjne. Aby zrozumieć dlaczego, musimy przyjrzeć się, jak FloatType i DoubleType działają pod spodem. FloatType wykorzystuje 32-bitową matematykę floating-point IEEE 754. DoubleType używa 64-bitowej wersji tego samego standardu. Oba typy reprezentują liczby jako ułamki binarne. Pomyśl o tym, jak ułamka jedna trzecia nie da się idealnie zapisać w systemie dziesiętnym. Staje się nieskończonym ciągiem trójek. Dokładnie to samo ograniczenie występuje w systemie binarnym. Zwykłych ułamków dziesiętnych, takich jak zero przecinek jeden czy zero przecinek dwa, nie da się idealnie zapisać w systemie dwójkowym. Komputer przechowuje jedynie ich przybliżenie. W przypadku DoubleType masz do dyspozycji 64 bity miejsca, co oznacza, że przybliżenie jest niezwykle bliskie rzeczywistej liczbie. Jeśli zrobisz query na pojedynczym wierszu danych, rzadko zauważysz różnicę. Oto kluczowa kwestia. Błąd kumuluje się podczas agregacji. Kiedy obliczasz całkowite przychody finansowe, sumując miliardy pojedynczych wierszy, te mikroskopijne niedokładności się nawarstwiają. Ułamek centa stracony lub zyskany na każdej transakcji ostatecznie przekłamuje końcowy wynik agregacji o tysiące, a nawet miliony dolarów. Twoja logika agregacji jest matematycznie poprawna, ale typ danych pod spodem psuje wynik. Jeśli twój system oblicza symulacje fizyczne lub trenuje modele machine learningowe, FloatType i DoubleType są dokładnie tym, czego potrzebujesz. Poświęcają dokładność na rzecz bardzo szybkiego przetwarzania sprzętowego. Ale w momencie, gdy przetwarzasz pieniądze, wymagasz ścisłej, bezkompromisowej dokładności. I tu dochodzimy do DecimalType. DecimalType nie używa przybliżeń floating-point. Przechowuje liczby dokładnie tak, jak je zdefiniujesz, używając stałej skali. Konfigurując DecimalType, definiujesz dwa różne parametry. Po pierwsze, określasz precision, czyli maksymalną łączną liczbę cyfr, jaką może pomieścić wartość. Po drugie, określasz scale, co dyktuje dokładną liczbę cyfr dozwolonych po prawej stronie kropki dziesiętnej. Jeśli skonfigurujesz DecimalType z precision równym dziesięć i scale równym dwa, PySpark przydzieli dokładnie tyle miejsca, ile potrzeba do zapisania tej wartości, co do centa. Nie ma tu ułamków binarnych ani zgadywania przy zaokrąglaniu. W praktyce wdrażasz to, przejmując ścisłą kontrolę nad swoimi schema. Podczas odczytu rekordów finansowych z pliku źródłowego, nie pozwól PySparkowi zgadywać typów. Najpierw tworzysz ścisły obiekt schema. Następnie definiujesz swoje pola finansowe, takie jak przychody czy podatki. Na koniec jawnie przypisujesz im DecimalType z wybranymi przez ciebie precision i scale. Kiedy twój dataframe załaduje się z tym schema, twoje standardowe agregacje sumy czy średniej wykonają się perfekcyjnie, od pierwszego do miliardowego wiersza. Poświęcasz odrobinę wydajności obliczeniowej w porównaniu do standardowego DoubleType, ale gwarantujesz, że twoje raportowanie finansowe będzie absolutnie bezbłędne. Zasada jest prosta: używaj typów floating-point dla szybkości i naukowych przybliżeń, ale w momencie, gdy liczba reprezentuje walutę, zablokuj ją za pomocą DecimalType. Dzięki za odsłuch. Do usłyszenia następnym razem!
7

Poskramianie złożonych i zagnieżdżonych danych

4m 14s

Big data nie zawsze są płaskie. Odkrywamy złożone typy danych PySpark, w tym ArrayType, StructType i MapType, które pozwalają na natywne parsowanie głęboko zagnieżdżonych plików JSON.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 7 z 21. W prawdziwym świecie big data rzadko przypomina płaski arkusz kalkulacyjny. Czasami potrzebujesz arraya zagnieżdżonych słowników, żeby w ogóle sparsować pojedynczy event JSON. Żeby to obsłużyć, musimy porozmawiać o okiełznaniu złożonych i zagnieżdżonych danych. Relacyjne workflowy preferują płaskie tabele, ale nowoczesne dane z eventów przychodzą mocno zagnieżdżone. PySpark radzi sobie z tym, udostępniając trzy złożone typy danych. Są to ArrayType, StructType i MapType. Pozwalają ci one na jawne modelowanie hierarchicznych struktur natywnie w silniku. Weźmy na warsztat standardowy profil klienta, żeby zobaczyć, jak te typy działają. Pierwszy koncept to ArrayType. Reprezentuje on kolekcję elementów. Twarda zasada jest taka, że każdy element wewnątrz ArrayType musi mieć dokładnie ten sam bazowy typ danych. Nie możesz mieszać stringów i integerów w tym samym arrayu. Jeśli twój profil klienta zawiera listę ID ostatnich zamówień, definiujesz tę kolumnę jako ArrayType zawierający integery. Kolejny to StructType. StructType modeluje zagnieżdżony, hierarchiczny rekord, działając w zasadzie jak wiersz osadzony w innym wierszu. Zawiera konkretne, nazwane pola. W przeciwieństwie do arraya, każde pole wewnątrz StructType może mieć zupełnie inny typ danych. Załóżmy, że twój klient ma adres. Ten adres zawiera nazwę ulicy jako string, kod pocztowy jako integer i flagę boolean wskazującą, czy jest to nieruchomość komercyjna. Pakujesz te różne pola razem w jeden StructType. I tu jest kluczowa sprawa. Możesz zagnieżdżać te złożone typy dowolnie głęboko. Jeśli klient ma wiele adresów, nie tworzysz płaskich, numerowanych kolumn. Zamiast tego tworzysz ArrayType, w którym wewnętrznym typem elementu jest dokładnie ten adresowy StructType. Masz teraz array structów, który idealnie mapuje się na standardowy array obiektów JSON. Trzecia struktura to MapType, zaprojektowana specjalnie dla par key-value. Różni się od StructType tym, jak obsługuje strukturę w stosunku do schematu. StructType wymaga od ciebie zahardkodowania dokładnych nazw pól z góry. MapType jest elastyczny jeśli chodzi o zawartość danych, ale restrykcyjny pod względem typów danych. Każdy klucz w mapie musi być jednego konkretnego typu, a każda wartość musi być innego konkretnego typu. Możesz użyć MapType do przechowywania preferencji klienta w aplikacji. Kluczami mogą być stringi, takie jak motyw czy język, a wartościami również mogą być stringi, takie jak ciemny czy angielski. Ponieważ jest to MapType, aplikacja upstreamowa może później wstrzyknąć zupełnie nowe klucze preferencji, bez zmuszania cię do zmiany głównego schematu DataFrame. Po prostu dynamicznie odpytujesz wartości po ich kluczach. Kiedy konstruujesz ten złożony schemat w swoim kodzie, budujesz go od środka na zewnątrz. Najpierw definiujesz wewnętrzne pola adresowego StructType. Następnie przekazujesz ten gotowy struct do definicji ArrayType. Kolejnym krokiem jest zdefiniowanie MapType dla preferencji użytkownika. Na koniec opakowujesz wszystkie te komponenty, wraz z prostymi typami skalarnymi, takimi jak string z nazwą klienta, w jeden główny StructType, który definiuje nadrzędny wiersz DataFrame. Zamiast spłaszczać zagnieżdżone struktury do bałaganiarskich stringów JSON, jawne zdefiniowanie tych złożonych schematów pozwala optymalizatorowi Sparka na pruning danych i filtrowanie głęboko w zagnieżdżonych polach, bez deserializacji całego payloadu do pamięci. Dzięki za wysłuchanie — do usłyszenia następnym razem.
8

Rzutowanie typów i selekcja

4m 17s

Dowiedz się, jak aktywnie kształtować schematy DataFrame. Omawiamy, jak wybierać podzbiory kolumn oraz jak bezpiecznie rzutować kolumny z jednego typu danych na inny.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 8 z 21. Zwykły string ukryty w kolumnie typu integer może całkowicie zatrzymać klaster złożony z tysiąca węzłów. Potrzebujesz niezawodnego sposobu na wymuszenie poprawnych struktur danych i precyzyjne określenie, jakie dane przechodzą przez twój pipeline. Dlatego dziś przyjrzymy się Type Castingowi i selekcji. Aby manipulować danymi w PySparku, musisz najpierw zrozumieć, czym właściwie jest kolumna. Instancja kolumny nie jest fizyczną tablicą danych załadowaną do pamięci. Jest to leniwie ewaluowana reprezentacja wyrażenia. Kiedy odwołujesz się do kolumny w swoim kodzie, nie dotykasz danych bazowych. Po prostu dodajesz krok do logicznego planu Sparka. Dane ruszają z miejsca dopiero, gdy później wyzwolisz akcję. Aby pobrać i ukształtować te dane, używasz metody select na swoim DataFrame. Masz dwa główne sposoby, aby powiedzieć metodzie select, których kolumn potrzebujesz. Najprostszy sposób to przekazanie nazw kolumn jako standardowych stringów. Jeśli przekażesz string do metody select, Spark zwróci nowy DataFrame zawierający dokładnie tę kolumnę, całkowicie niezmienioną. Działa to dobrze do podstawowej ekstrakcji, ale nie daje żadnego pola do modyfikacji. Aby zmodyfikować dane podczas selekcji, musisz użyć obiektów Column zamiast stringów. Dostęp do obiektu Column uzyskujesz, odwołując się do niego bezpośrednio z DataFrame. Możesz to zrobić za pomocą dot notation, na przykład dataframe dot age, albo używając bracket notation z nazwą kolumny jako stringiem wewnątrz nawiasów. Bracket notation jest szczególnie przydatna, gdy nazwy twoich kolumn zawierają spacje lub znaki specjalne, które zepsułyby standardową dot notation. I to jest najważniejsza część. Kiedy przekazujesz obiekt Column do metody select, możesz podpiąć do niego metody, aby transformować dane w locie. Jedną z najważniejszych transformacji jest konwersja typów. Dane często docierają w niewłaściwym formacie. Na przykład, możesz otrzymać metryki numeryczne sformatowane jako stringi. Aby to naprawić, używasz metody cast. PySpark udostępnia również alias o nazwie astype, który wykonuje dokładnie tę samą logikę. Metodę cast wywołujesz bezpośrednio na swoim obiekcie Column wewnątrz instrukcji select. Metoda cast wymaga jednego argumentu, którym jest docelowy typ danych. Możesz zdefiniować ten cel, przekazując reprezentację typu jako string, na przykład słowo int, albo przekazując konkretny obiekt typu danych Sparka, taki jak IntegerType. Oto jak to wygląda w prawdziwym skrypcie. Wywołujesz metodę select na swoim DataFrame. W nawiasach tej metody odwołujesz się do docelowej kolumny, używając bracket notation. Tuż obok odwołania do tej kolumny wywołujesz dot cast i podajesz swój nowy typ. Po ewaluacji zwraca to zupełnie nowy DataFrame, w którym twoja wybrana kolumna jest już bezpiecznie przekonwertowana na określony typ. Oryginalny DataFrame pozostaje całkowicie nienaruszony, ponieważ DataFrame'y są niemutowalne. Kluczowy wniosek jest taki, że type casting w PySparku nie jest samodzielnym procesem aplikowanym w miejscu na istniejącym zbiorze danych. Jest to leniwie ewaluowane wyrażenie kolumnowe, nierozerwalnie związane z procesem selekcji danych w celu zbudowania nowego, silnie typowanego DataFrame'u. Jeśli podoba ci się ten podcast i chcesz wesprzeć nasz program, możesz wyszukać DevStoriesEU na Patreonie. To wszystko w tym odcinku. Dzięki za słuchanie i twórz dalej!
9

Skrzyżowanie funkcji: Czyszczenie brudnych danych

3m 54s

Śmieci na wejściu, śmieci na wyjściu. Poznaj niezbędne transformacje DataFrame do usuwania wartości null, wypełniania brakujących danych i natywnej obsługi rekordów NaN w systemach rozproszonych.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 9 z 21. Śmieci na wejściu, śmieci na wyjściu. Ale co robisz, gdy twój śmieciowy dataset ma setki terabajtów i nie możesz ręcznie sprawdzić pojedynczego wiersza? Potrzebujesz systematycznego sposobu, żeby oczyścić go w dużej skali. Właśnie to omówimy dzisiaj w odcinku Function Junction: Czyszczenie brudnych danych. Pierwszym krokiem w czyszczeniu jest zazwyczaj standaryzacja schematu. Często dostajesz surowe pliki ze spacjami, znakami specjalnymi lub literówkami w nagłówkach. Użyj metody o nazwie withColumnRenamed. Po prostu przekazujesz jej stary string z nazwą i nowy, pożądany string. Jeśli masz do poprawienia kilka kolumn, chainujesz tę metodę sekwencyjnie dla każdej z nich, zanim zastosujesz jakiekolwiek złożone transformacje downstream. Zanim usuniesz złe dane, musimy wyjaśnić częste nieporozumienie dotyczące null i NaN w PySpark. Null oznacza, że danego punktu danych całkowicie brakuje. NaN to skrót od Not a Number, co reprezentuje niezdefiniowany wynik matematyczny, taki jak dzielenie zera przez zero. W czystym Pythonie wymagają one oddzielnej obsługi. Jednak PySpark grupuje je razem dla wygody. Kiedy używasz funkcji NA na dataframe, Spark traktuje wartości NaN jako nulle do celów dropowania lub wypełniania. Aby wyeliminować wiersze z brakującymi wartościami, używasz metody na.drop. Wywołanie tej funkcji całkowicie bez argumentów dropuje każdy wiersz zawierający null lub NaN w jakiejkolwiek pojedynczej kolumnie. To podejście jest bardzo destrukcyjne w przypadku szerokich datasetów. Pojedyncza brakująca wartość w opcjonalnej kolumnie z metadanymi usunie wiersz skądinąd idealnych danych transakcyjnych. Aby temu zapobiec, przekaż listę nazw kolumn do parametru subset. PySpark weźmie wtedy pod uwagę tylko te konkretne, krytyczne kolumny przy decydowaniu, czy zdropować wiersz. Dropowanie wierszy nie zawsze jest dozwolone przez reguły biznesowe. Często musisz zastąpić brakujące wartości bezpiecznymi defaultami. Osiągniesz to używając na.fill. Chociaż możesz przekazać pojedynczą wartość, żeby wypełnić wszystkie kolumny, lepszym podejściem jest przekazanie słownika. Klucze słownika reprezentują konkretne nazwy kolumn, a wartości twoje wybrane zamienniki. Ten wzorzec pozwala ci wypełnić brakującą metrykę numeryczną zerem, jednocześnie zastępując brakującą kategorię stringiem takim jak unknown. Zrobienie tego przez słownik wykonuje się w jednym przebiegu, co jest bardzo wydajne. Na koniec, twoje dane mogą być w pełni wypełnione, ale nadal nieprawidłowe. Outliery i fizycznie niemożliwe wartości wymagają logicznego filtrowania. Izolujesz dobre dane używając metody where, żeby zachować tylko te wiersze, które spełniają określony warunek. Dla granic liczbowych lub dat, metoda between to twoje najlepsze narzędzie. Wybierasz swoją kolumnę, wywołujesz between i podajesz dolny oraz górny limit. Zastępuje to rozwlekłą logikę greater-than i less-than, dzięki czemu twój kod jest łatwiejszy do czytania. Każdy wiersz wypadający poza te limity jest odfiltrowany z wynikowego dataframe'u. Oto kluczowy wniosek. Kolejność ma ogromne znaczenie przy czyszczeniu w dużej skali. Zawsze najpierw zmieniaj nazwy kolumn, żeby zablokować swój schemat, następnie dropuj lub wypełniaj brakujące wartości, żeby ustabilizować typy danych, a outliery filtruj na samym końcu, tylko wtedy, gdy wiesz, że dane bazowe są poprawne strukturalnie. To wszystko w tym odcinku. Dzięki za wysłuchanie i buduj dalej!
10

Transformacja i zmiana kształtu danych

3m 54s

Przejmij kontrolę nad kształtem swoich danych. Odkrywamy, jak generować nowe kolumny za pomocą funkcji matematycznych, wykonywać operacje na ciągach znaków i spłaszczać zagnieżdżone tablice przy użyciu explode().

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 10 z 21. Czasami pojedynczy wiersz danych zawiera array ukrytych rekordów — i musisz go zdetonować, żeby poprawnie go przeanalizować. Transformacja i reshaping danych to sposób, w jaki rozpakowujesz, formatujesz i strukturyzujesz te informacje do dalszego przetwarzania. Kiedy musisz zmodyfikować dataframe w PySpark, nie zmieniasz danych in place. Dataframe'y są niemutowalne. Zamiast tego tworzysz nowe wersje za pomocą metody o nazwie withColumn. Ta metoda przyjmuje dwa argumenty. Pierwszy to string reprezentujący nazwę kolumny, którą chcesz utworzyć lub zastąpić. Drugi to wyrażenie kolumnowe definiujące właściwe dane. Jeśli podasz nazwę, która już istnieje w dataframe'ie, PySpark nadpisze oryginalną kolumnę. Jeśli nazwa jest całkowicie nowa, PySpark doda nową kolumnę po prawej stronie twojego datasetu. Żeby zdefiniować, co ma się znaleźć w tej nowej kolumnie, zazwyczaj używasz wbudowanych funkcji PySpark. Są one importowane z modułu funkcji SQL i zapewniają wysoce zoptymalizowane operacje, które wykonują się na całym twoim klastrze. Weźmy na przykład manipulację stringami. Dane tekstowe ze źródeł zewnętrznych rzadko są idealnie sformatowane. Możesz mieć kolumnę zawierającą nazwy użytkowników zapisane nieprzewidywalną mieszanką wielkich i małych liter. Możesz to naprawić, przekazując swoją istniejącą kolumnę do wbudowanej funkcji, takiej jak lower, która zamienia cały tekst na małe litery. Alternatywnie, możesz użyć funkcji do kapitalizacji, żeby upewnić się, że pierwsza litera jest wielka, a pozostałe małe. W praktyce wbudowujesz te operacje bezpośrednio w transformacje swojego dataframe'u. Wywołujesz withColumn, nazywasz swoją kolumnę docelową i przypisujesz jej wynik funkcji lower zastosowanej na kolumnie wejściowej. PySpark ewaluuje to wyrażenie dla każdego pojedynczego wiersza. Możesz połączyć wiele wywołań withColumn, żeby zastosować kilka transformacji sekwencyjnie, za każdym razem przekazując progresywnie zaktualizowany dataframe do następnego kroku. Teraz druga część to reshaping. Czyszczenie stringów zmienia wartości, ale co się dzieje, gdy podstawowy kształt twoich danych uniemożliwia analizę? Tutaj robi się ciekawie. Możesz otrzymać dataset, w którym identyfikator osoby znajduje się w jednej kolumnie, a jej miesięczne dochody za cały rok są spakowane w jeden array w sąsiedniej kolumnie. Nie możesz uruchomić standardowych agregacji relacyjnych na zagnieżdżonym arrayu. Potrzebujesz każdej pojedynczej wartości dochodu w osobnym wierszu, żeby obliczyć średnie lub znaleźć minima. Rozwiązujesz ten problem strukturalny za pomocą wbudowanej funkcji o nazwie explode. Funkcja explode obsługuje konkretnie arraye i mapy. Wywołujesz withColumn, określasz nazwę kolumny, którą chcesz mieć na wyjściu, i przekazujesz funkcję explode opakowującą twoją kolumnę z arrayem. PySpark wykonuje to, biorąc oryginalny pojedynczy wiersz i rozrywając go. Jeśli array dochodów zawiera dwanaście odrębnych wartości, explode generuje dwanaście całkowicie oddzielnych wierszy. W nowym dataframe'ie kolumna docelowa zawiera teraz pojedynczą, płaską wartość dochodu na wiersz zamiast listy. Co kluczowe, PySpark duplikuje wszystkie pozostałe kolumny z oryginalnego wiersza. Identyfikator użytkownika jest dokładnie kopiowany do wszystkich dwunastu nowych wierszy. Logiczna relacja między użytkownikiem a jego dochodami pozostaje nienaruszona, ale dane są teraz płaskie. Zrobiłeś reshaping zagnieżdżonej struktury w długą tabelę gotową do standardowych operacji grupowania i filtrowania. Prawdziwa moc transformacji w PySpark polega na tym, że funkcje takie jak explode i lower nie tylko manipulują pojedynczymi wartościami; definiują logiczny plan obliczeń, który skaluje się natychmiastowo, niezależnie od tego, czy masz sto, czy sto miliardów wierszy, nie wymagając od ciebie napisania ani jednej ręcznej pętli. To tyle w tym odcinku. Do usłyszenia następnym razem!
11

Mechanika grupowania i agregacji

3m 37s

Opanuj strategię split-apply-combine. Zagłębiamy się w grupowanie danych według kluczy i stosowanie potężnych funkcji agregujących do podsumowywania ogromnych zbiorów danych.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 11 z 21. Kiedy patrzysz na miliardy pojedynczych rekordów, czytanie ich wiersz po wierszu jest niemożliwe. Aby wyciągnąć z nich jakikolwiek sens, musisz je podsumować. Dzisiaj omówimy dokładnie, jak to się robi: mechanika grupowania i agregacji. Pod spodem PySpark przetwarza agregacje przy użyciu klasycznej strategii przetwarzania danych, zwanej split-apply-combine. Ten wzorzec działa dokładnie tak, jak brzmi. Najpierw PySpark dzieli ogromny zbiór danych na odrębne, logiczne buckety na podstawie wybranego przez ciebie klucza. Następnie aplikuje konkretne obliczenia do każdego bucketa niezależnie, w całym klastrze. Na koniec łączy te niezależne wyniki z powrotem w jeden, podsumowany rezultat. W swoim kodzie odpalasz fazę split, wywołując metodę group by na swoim DataFrame. Po prostu podajesz nazwę kolumny, której chcesz użyć jako klucza grupowania. Na przykład, jeśli masz ogromną tabelę historycznych transakcji, możesz zrobić group by po kolumnie z nazwą użytkownika. I tu jest kluczowa sprawa. Wywołanie group by nie zwraca nowego DataFrame. Zamiast tego zwraca konstrukcję przejściową, czyli obiekt GroupedData. Ponieważ PySpark stosuje lazy evaluation, zbudował tylko execution plan do zorganizowania tych bucketów. Tak naprawdę nie przeniesie żadnych danych, dopóki nie powiesz mu, jaką operację matematyczną ma wykonać na tych bucketach. Aby dostarczyć tę operację matematyczną, chainujesz metodę aggregate, zazwyczaj zapisywaną jako agg, bezpośrednio na swoich pogrupowanych danych. To załatwia fazy apply i combine. Wewnątrz metody aggregate mówisz PySparkowi, co ma policzyć, używając narzędzi z modułu PySpark SQL functions. Ten moduł zawiera dziesiątki zoptymalizowanych operacji agregacji. Powiedzmy, że chcesz policzyć średni dochód dla każdego z tych użytkowników. Importujesz funkcję average, zazwyczaj nazywaną avg. Przekazujesz nazwę swojej kolumny z dochodem do funkcji average i umieszczasz to wewnątrz metody aggregate. Kiedy to się wykona, PySpark policzy średni dochód dla każdego unikalnego bucketa użytkownika jednocześnie. Wtedy wkracza faza combine, zwracając standardowy, czytelny DataFrame. Ten nowy DataFrame zawiera tylko jeden wiersz na użytkownika, sparowany z jego nowo policzonym średnim dochodem. W tym momencie masz idealnie podsumowaną tabelę. Jednak ponieważ obliczenia odbywały się równolegle w rozproszonym klastrze, końcowe wiersze są zwracane w losowej kolejności, w zależności od tego, jak nody przetwarzające kończyły swoją pracę. Jeśli musisz zobaczyć tych, którzy zarabiają najwięcej, losowa kolejność jest bezużyteczna. Aby to naprawić, chainujesz metodę order by na końcu swojego kroku agregacji. Przekazujesz do metody order by kolumnę zawierającą twoje nowe średnie i każesz jej posortować wyniki malejąco. PySpark weźmie połączone wyniki, uszereguje je i dostarczy czystą, posortowaną tabelę. Wzorzec split-apply-combine jest potężny właśnie dlatego, że idealnie mapuje się na rozproszony sprzęt, pozwalając na podsumowanie ogromnych zbiorów danych w kilka sekund. Ale pamiętaj, że grupowanie danych to tylko połowa operacji. Grupowanie wymaga agregacji, żeby dokończyć robotę, w przeciwnym razie masz po prostu klaster pełen pustych bucketów czekających na instrukcje. Dzięki za spędzenie ze mną tych kilku minut. Do usłyszenia następnym razem, trzymaj się.
12

Kiedy DataFrames się zderzają: Sztuka łączenia

3m 37s

Poruszanie się po niuansach łączenia zbiorów danych. Rozkładamy na czynniki pierwsze siedem różnych typów join w PySpark i wyjaśniamy, jak bezpiecznie scalać DataFrames.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 12 z 21. Scalanie dwóch ogromnych tabel to najkosztowniejsza operacja w obliczeniach rozproszonych. Zastosuj złą logikę dopasowywania, a stanie się to najprostszym sposobem na wywalenie klastra przez out of memory. Dokładna wiedza o tym, jak bezpiecznie łączyć zbiory danych, to główny temat When DataFrames Collide: The Art of Joining. Podstawowym mechanizmem łączenia danych w PySpark jest metoda join. Wywołujesz ją na swoim bazowym DataFrame, przekazując DataFrame, który chcesz dołączyć, konkretną kolumnę lub kolumny, po których chcesz połączyć, oraz metodę join. Jeśli nie podasz żadnej metody join, PySpark domyślnie użyje inner join. Weźmy konkretny scenariusz. Masz jeden DataFrame z danymi o wzroście ludzi, a drugi DataFrame z ich dochodami. Oba datasety współdzielą kolumnę o nazwie name. Przy inner join, PySpark patrzy na kolumnę name w obu datasetach i zostawia tylko te wiersze, gdzie name istnieje w obu miejscach. Jeśli ktoś pojawia się w danych o wzroście, ale brakuje go w danych o dochodach, jego rekord jest całkowicie odrzucany z wyniku. Aby zachować niedopasowane rekordy, zmieniasz typ join. Left join zachowuje każdy wiersz z twojego początkowego DataFrame, którym w tym przypadku są dane o wzroście. Jeśli PySpark znajdzie pasującą wartość name w danych o dochodach, dołącza ten dochód. Jeśli nie znajdzie dopasowania, zachowuje wiersz ze wzrostem, ale wstawia wartość null w kolumnie dochodów. Right join wykonuje dokładnie odwrotną operację, zachowując wszystkie dochody i uzupełniając brakujący wzrost wartościami null. Kiedy potrzebujesz absolutnie wszystkiego, używasz full join. PySpark zachowuje każdy rekord z obu DataFrame'ów. Pasujące wartości name są scalane w jeden wiersz, a wszystkie wartości name istniejące tylko w jednym datasecie są zachowywane, przy czym wartości null wypełniają brakujące dane z drugiej strony. Oto kluczowa kwestia. Cross join działa inaczej, ponieważ całkowicie ignoruje warunek join. Paruje każdy pojedynczy wiersz w DataFrame heights z każdym pojedynczym wierszem w DataFrame incomes, tworząc iloczyn kartezjański. Jeśli obie tabele mają tylko tysiąc wierszy, cross join zwraca milion wierszy. Ten gwałtowny przyrost danych to powód, dla którego cross joiny są domyślnie mocno ograniczone i często wymagają jawnej konfiguracji, aby wykonać się bez wyrzucenia błędu. Dwa ostatnie typy join to w rzeczywistości operacje filtrujące, a nie prawdziwe scalanie danych. Left semi join szuka dopasowań, zwracając wiersze z DataFrame heights tylko wtedy, gdy name występuje również w DataFrame incomes. Kluczową różnicą w stosunku do inner join jest to, że left semi join nie zaciąga żadnych kolumn z prawej strony. Zostajesz z dokładnie tymi samymi kolumnami, od których zacząłeś, po prostu odfiltrowanymi do rekordów, które mają odpowiednie dopasowanie. Left anti join działa dokładnie odwrotnie. Zwraca wiersze z DataFrame heights tylko wtedy, gdy name nie istnieje w danych incomes. Całkowicie odrzuca kolumny z prawej strony. Dzięki temu left anti join jest najwydajniejszym sposobem na identyfikację brakujących danych lub znalezienie rekordów, których nie udało się przetworzyć downstream. Wybór typu join decyduje nie tylko o tym, jakie dane dostajesz z powrotem, ale też o tym, ile danych musi fizycznie przejść przez twoją sieć, aby wygenerować wynik. Dzięki za uwagę. Do usłyszenia następnym razem!
13

Stary SQL, nowe sztuczki

3m 33s

Po co uczyć się nowego API, skoro można użyć surowego SQL? Dowiedz się, jak wykonywać standardowe zapytania SQL bezpośrednio na rozproszonych DataFrames w PySpark.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 13 z 21. Masz zespół analityków, którzy piszą świetny SQL, ale twoje dane leżą na ogromnym, rozproszonym klastrze. Możesz zmusić ich do nauki zupełnie nowej składni Pythona, albo pozwolić im używać języka, który już znają. I tu do gry wchodzi uruchamianie surowych stringów SQL bezpośrednio w PySparku, czyli stary SQL, nowe sztuczki. PySpark daje ci bezpośrednie połączenie ze standardowym SQL-em przez jedną metodę w twojej sesji Spark, zwaną po prostu sql. Do tej metody przekazujesz surowy string SQL. Wynikiem nie jest zwykły tekst. To standardowy DataFrame w PySparku. To oznacza, że możesz odpalić standardowe zapytanie do bazy, dostać z powrotem DataFrame i od razu przekazać go do innej funkcji w Pythonie. To jest w pełni interoperacyjne. Zanim zaczniesz odpytywać dane za pomocą SQL-a, PySpark musi wiedzieć, jakie tabele w ogóle istnieją. Masz dwa główne sposoby na wystawienie swoich danych do silnika SQL. Po pierwsze, jeśli masz już DataFrame w Pythonie, możesz wywołać metodę, żeby zarejestrować go jako temporary view. Nadajesz mu nazwę jako string, i nagle zachowuje się jak tabela w twoich zapytaniach SQL. Po drugie, możesz tworzyć tabele w całości wewnątrz swojego stringa SQL. Przekazujesz instrukcję create table do metody sql. Wewnątrz tego stringa definiujesz schemat i mówisz PySparkowi dokładnie, gdzie leżą pliki z danymi, na przykład podając ścieżkę w cloud storage zawierającą pliki Parquet. PySpark rejestruje to w swoim wewnętrznym katalogu. Od tego momentu odpytujesz ją po nazwie, dokładnie tak samo jak tradycyjną tabelę w bazie danych. Porównaj, jak ta sama logika wygląda w obu podejściach. Załóżmy, że musisz wyciągnąć nazwy klientów, odrzucić tych z zerowym saldem i połączyć wynik z tabelą orders. W API DataFrame budujesz chain metod w Pythonie. Wywołujesz select na swoim datasecie klientów, żeby wybrać kolumnę name. Następnie chainujesz metodę filter, sprawdzając, czy balance jest większy od zera. Na koniec doklejasz metodę join, odwołującą się do datasetu orders po pasującym kluczu. To podejście jest bardzo programistyczne. W podejściu SQL piszesz standardowy select wyciągający kolumnę name, dodajesz klauzulę where dla kolumny balance i piszesz inner join dla tabeli orders. Siedzi to w twoim skrypcie jako pojedynczy, czytelny blok stringa. A oto kluczowa sprawa. Istnieje powszechne, błędne przekonanie, że pisanie SQL-a wewnątrz stringów w Pythonie musi być wolniejsze albo mniej natywne niż używanie ustrukturyzowanych metod DataFrame. To nieprawda. Niezależnie od tego, czy chainujesz metody w Pythonie, czy przekazujesz surowy string SQL, PySpark traktuje je identycznie. Oba wejścia są natychmiast parsowane, tłumaczone na dokładnie ten sam logical plan i przekazywane do optymalizatora Catalyst. Execution engine nie wie i nie dba o to, jakiego API użyłeś do wyrażenia swoich intencji. Wydajność jest dokładnie taka sama. Wybór między API DataFrame a surowym SQL-em nigdy nie jest kwestią wydajności klastra. Chodzi wyłącznie o to, co sprawia, że twój zespół pracuje szybciej, a twój codebase jest łatwiejszy w utrzymaniu. Dzięki, że wpadłeś. Mam nadzieję, że dowiedziałeś się czegoś nowego.
14

Wymienność DataFrames i SQL

3m 49s

Płynnie łącz SQL z językiem Python. Odkryj, jak tworzyć tymczasowe widoki z DataFrames, używać selectExpr i łączyć programistyczne operacje z wynikami zapytań SQL.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 14 z 21. Możesz utknąć w debacie, czy pisać transformacje danych w Pythonie, czy w SQL-u. Wymuszanie ścisłego wyboru między jednym a drugim sprawia, że tracisz mnóstwo możliwości. Prawdziwa przewaga polega na płynnym przeplataniu DataFrame'ów i SQL-a w ramach dokładnie tego samego pipeline'u. Czasami skomplikowany zestaw zagnieżdżonych joinów jest dla twojego zespołu znacznie łatwiejszy do czytania i utrzymania w czystym SQL-u. Innym razem musisz dynamicznie iterować po nazwach kolumn, co jest niemożliwe w czystym SQL-u, ale trywialne w Pythonie. PySpark pozwala ci łączyć oba te podejścia bez przerywania twojego data flow. Aby zacząć pisać SQL na istniejącym Pythonowym DataFrame'ie, musisz najpierw wystawić ten DataFrame do silnika Spark SQL. Osiągniesz to, wywołując metodę create or replace temp view bezpośrednio na swoim DataFrame'ie. Przekazujesz pojedynczy argument typu string, który staje się nazwą tabeli. Ta operacja nie przenosi żadnych danych. Nie zapisuje niczego na dysku. Po prostu rejestruje tymczasowy wskaźnik w twojej obecnej sesji Sparka. Silnik SQL wie teraz, jak rozwiązać tę nazwę tabeli z powrotem do twojego Pythonowego DataFrame'u. Teraz możesz ją odpytać. Wywołujesz spark dot sql i przekazujesz swój standardowy select jako string, odwołując się do nazwy tabeli, którą właśnie utworzyłeś. Oto kluczowa sprawa. Wynikiem tego wywołania spark dot sql nie jest statyczny tekst, ani żaden inny typ obiektu. Zwraca on standardowy PySpark DataFrame. Oznacza to, że możesz natychmiast chainować normalne metody Pythonowego DataFrame'u bezpośrednio na końcu twojego wywołania SQL. Możesz napisać pięćdziesięciolinijkowy string SQL do obsługi skomplikowanej funkcji okna, zamknąć nawias spark dot sql i natychmiast dokleić metodę dot filter lub dot group by. Przechodzisz z Pythona do SQL-a i z powrotem do Pythona w jednym bloku kodu. Jeśli potrzebujesz SQL-a tylko do kalkulacji konkretnej kolumny, rejestrowanie pełnego temporary view jest niepotrzebne. Zamiast tego używasz metody select expression. Ta metoda działa jak pomost. Działa dokładnie tak samo jak standardowa metoda select w DataFrame'ie, ale przyjmuje surowe wyrażenia w postaci stringów SQL zamiast Pythonowych obiektów kolumn. Jeśli musisz wykonać instrukcję case-when, użyć funkcji matematycznych lub zrzutować typ danych używając natywnej składni SQL, przekazujesz dokładnie te stringi SQL do select expression. Spark bierze te stringi, parsuje je i wykonuje dokładnie tak, jak zrobiłby to w pełnym zapytaniu SQL. Pozwala ci to pozostać całkowicie w chainowalnym API DataFrame'ów, jednocześnie polegając na składni SQL dla skomplikowanej logiki na poziomie wiersza. Granica między tymi dwoma paradygmatami jest całkowicie sztuczna. Niezależnie od tego, czy chainujesz metody Pythona, piszesz czyste zapytania SQL, czy używasz stringów w select expression, Spark kompiluje to wszystko do dokładnie tego samego, zoptymalizowanego execution planu. Jeśli chcesz pomóc nam w dalszym tworzeniu tych odcinków, możesz wyszukać DevStoriesEU na Patreonie, aby wesprzeć nasz program. To wszystko w tym odcinku. Dzięki za słuchanie i twórz dalej!
15

Rozszerzanie Spark za pomocą Python UDFs

4m 17s

Gdy wbudowane funkcje nie wystarczają, do akcji wkraczają User-Defined Functions. Odkrywamy, jak pisać niestandardową logikę Python dla DataFrames i dlaczego standardowe skalarne UDFs ukrywają spadek wydajności.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 15 z 21. Piszesz customową funkcję w Pythonie, wpinasz ją w swój data pipeline i działa bezbłędnie na małej próbce. Ale kiedy odpalasz ją na pełnym datasecie, job zwalnia do ślimaczego tempa, a zużycie CPU skacze w górę. Sam kod jest w porządku, ale płacisz ukryty podatek od wykonania. Dzisiaj porozmawiamy o rozszerzaniu Sparka za pomocą Python UDFs. User Defined Function, czyli UDF, pozwala ci wykonywać customową logikę w Pythonie bezpośrednio na Spark DataFrame. Używasz tego, gdy wbudowane funkcje Spark SQL nie pokrywają twojej specyficznej logiki biznesowej. Proces jest prosty. Zaczynasz od napisania standardowej funkcji w Pythonie. Na przykład piszesz funkcję, która przyjmuje stringa, aplikuje złożoną, customową regułę formatowania i zwraca zmodyfikowanego stringa. Żeby Spark rozpoznał tę funkcję, importujesz funkcję udf z modułu PySpark SQL functions i nakładasz ją jako decorator bezpośrednio nad definicją twojej funkcji w Pythonie. Przekazujesz też return type do decoratora, na przykład typ string albo integer. Jeśli nie podasz return type, Spark domyślnie użyje typu string, co może powodować ciche błędy w danych, jeśli twoja funkcja tak naprawdę zwraca liczbę. Po dodaniu decoratora, twoja customowa funkcja w Pythonie działa dokładnie tak samo jak natywna funkcja Sparka. Możesz przekazać ją do operacji na DataFrame, takich jak select, podając jej nazwy kolumn jako argumenty. A oto kluczowa sprawa. Standardowy, skalarny Python UDF operuje ściśle na jednym wierszu naraz. Pobiera jedną lub więcej wartości kolumn z pojedynczego wiersza jako input, wykonuje twoją customową logikę w Pythonie i zwraca dokładnie jedną wartość wyjściową dla tego konkretnego wiersza. Jeśli twój DataFrame zawiera dziesięć milionów wierszy, twoja funkcja w Pythonie jest wywoływana dziesięć milionów osobnych razy. Ta operacja wiersz po wierszu jest łatwa do zrozumienia, ale tworzy ogromny performance bottleneck, o którym wspomnieliśmy na początku. Żeby zrozumieć, dlaczego to jest tak wolne, musisz spojrzeć na to, jak Spark wykonuje kod pod spodem. Spark jest napisany w Scali, co oznacza, że jego główny silnik działa wewnątrz Java Virtual Machine, czyli JVM. Twój customowy UDF jest napisany w Pythonie. JVM nie potrafi natywnie wykonywać kodu w Pythonie. Żeby zastosować twój UDF, Spark jest zmuszony odpalić osobne procesy Python worker obok swoich własnych executorów. Następnie musi fizycznie przenieść dane z przestrzeni pamięci JVM do procesu Pythona. Spark polega na pythonowej bibliotece do serializacji o nazwie cloudpickle, żeby obsłużyć ten złożony transfer. To właśnie tutaj pobierany jest ten podatek wydajnościowy. Dla każdego pojedynczego wiersza w twoim datasecie, Spark serializuje inputy w JVM, wysyła te dane binarne przez lokalny socket do Python workera i deserializuje je do standardowych obiektów w Pythonie. Twoja customowa funkcja w końcu wykonuje się na tych obiektach. Następnie cały ten cykl powtarza się w drugą stronę. Python serializuje wartość wyjściową przy użyciu cloudpickle, odsyła ją z powrotem przez socket, a JVM deserializuje ją z powrotem do wewnętrznego formatu pamięci Sparka. Ta ciągła serializacja i deserializacja między Javą a Pythonem jest niesamowicie kosztowna. Prawdziwym kosztem standardowego Python UDF rzadko jest sama logika, którą piszesz; to cichy overhead związany z tłumaczeniem danych w tę i z powrotem między dwoma zupełnie różnymi środowiskami runtime dla każdego pojedynczego wiersza. Dzięki za spędzenie ze mną tych kilku minut. Do usłyszenia następnym razem, trzymaj się.
16

Turbodoładowanie UDFs z Apache Arrow

3m 38s

Wyeliminuj wąskie gardło serializacji między JVM a językiem Python. Odkrywamy, jak Vectorized Pandas UDFs i formaty pamięci Apache Arrow niesamowicie przyspieszają Twoje niestandardowe transformacje.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 16 z 21. A co, gdybyś mógł przyspieszyć swoje customowe funkcje w Pythonie w Sparku dziesięciokrotnie, zmieniając tylko jeden dekorator? Standardowe UDF-y w Pythonie słyną z tego, że są strasznie wolne, ale rozwiązanie nie wymaga przepisywania logiki w Scali. Dzisiaj omówimy turbodoładowanie UDF-ów za pomocą Apache Arrow. Uruchamiając standardowego UDF-a w Pythonie, zderzasz się z ogromną barierą wydajnościową na granicy języków. Spark działa wewnątrz Java Virtual Machine, ale twoja customowa logika działa w osobnym workerze Pythona. Aby przesyłać między nimi dane, Spark wyciąga wiersze ze swojej pamięci wewnętrznej, serializuje je za pomocą biblioteki o nazwie cloudpickle i wysyła do Pythona. Python przetwarza dane wiersz po wierszu, serializuje wynik i odsyła go z powrotem. Robienie tego dla milionów pojedynczych wierszy tworzy nieznośny bottleneck serializacji. Apache Arrow zmienia zasady tej wymiany danych. Arrow to cross-language'owy, kolumnowy format danych in-memory. Standaryzuje to, jak dane wyglądają w pamięci, dzięki czemu zarówno JVM, jak i Python rozumieją je natywnie, bez skomplikowanej translacji. Zamiast serializować dane wiersz po wierszu, Spark pakuje dane w duże, kolumnowe batche. Wszystkie wartości dla konkretnej kolumny znajdują się tuż obok siebie w ciągłym obszarze pamięci. Spark wysyła te duże bloki do Pythona w jednym, wydajnym kroku. Możesz to wykorzystać na dwa sposoby. Po pierwsze, możesz włączyć optymalizację Arrow dla standardowych UDF-ów. Robisz to, ustawiając właściwość konfiguracyjną Sparka dla Arrow execution na true, albo podając parametr useArrow equals true podczas rejestrowania twojego UDF-a. Spark użyje Arrow do przesyłania danych w batchach, drastycznie zmniejszając narzut na serializację, mimo że twoja funkcja w Pythonie technicznie nadal wykonuje logikę wiersz po wierszu. Oto kluczowa informacja. Aby uzyskać maksymalny wzrost prędkości, chcesz, aby twój kod w Pythonie przetwarzał te batche Arrow jednocześnie. I tutaj wkraczają Pandas UDF-y. Opakowując twoją customową funkcję dekoratorem pandas UDF, zmieniasz sposób, w jaki funkcja odbiera dane. Zamiast dostawać pojedynczą wartość dla jednego wiersza, twoja funkcja otrzymuje Pandas Series zawierające cały batch wartości. Twoja funkcja aplikuje zwektoryzowaną operację na całym tym batchu i zwraca nowe Pandas Series o dokładnie tej samej długości. Pomyśl o funkcji o nazwie calculate tax. Dodajesz dekorator pandas UDF i deklarujesz, że zwraca ona typ double. Funkcja przyjmuje Pandas Series zawierające ceny produktów. Wewnątrz funkcji nie piszesz for-loopa. Po prostu piszesz instrukcję return, która mnoży wejściowe Series przez jeden przecinek dwa. Ponieważ Pandas pod spodem jest oparty na wysoce zoptymalizowanym kodzie w C, mnoży cały blok cen natychmiast. Spark następnie bierze to zwrócone Series i płynnie scala je z powrotem z DataFrame za pomocą Arrow. Prawdziwa siła Pandas UDF polega nie tylko na tym, że omija bottleneck serializacji w cloudpickle, ale też na tym, że przenosi twoje właściwe obliczenia z wolnych pętli w Pythonie do zwektoryzowanej, natywnej egzekucji. Dzięki za wysłuchanie. Trzymajcie się wszyscy.
17

Eksplozja wierszy z Python UDTFs

4m 04s

Standardowe UDFs zwracają jedną wartość na wiersz, ale co, jeśli potrzebujesz wielu wierszy? Dowiedz się, jak Python User-Defined Table Functions (UDTFs) rozwiązują złożone problemy generowania relacji jeden-do-wielu.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 17 z 21. Standardowe User-Defined Functions są ściśle ograniczone do mapowania jeden do jednego. Przekazujesz jedną wartość i otrzymujesz dokładnie jedną wartość. Ale co, jeśli pojedynczy, gęsty wpis logu musi zostać rozwinięty do stu oddzielnych wierszy? Aby to rozwiązać, używasz pythonowych User-Defined Table Functions, czyli UDTF. UDTF robi dokładnie to, co sugeruje nazwa. Zwraca całą tabelę na podstawie jednego wejścia. Podczas gdy standardowy UDF oblicza pojedynczą wartość skalarną, UDTF może wygenerować wiele wierszy i kolumn. To narzędzie, po które sięgasz, gdy musisz zrobić explode na zagnieżdżonym stringu JSON, sparsować plik tekstowy z delimiterami linijka po linijce, albo wygenerować sekwencję dat z pojedynczego timestampa. Aby utworzyć UDTF w PySpark, nie piszesz zwykłej, samodzielnej funkcji. Zamiast tego definiujesz klasę w Pythonie. Ta klasa wymaga konkretnej metody o nazwie eval. To właśnie w metodzie eval dzieje się właściwa transformacja. Kiedy uruchamiasz UDTF, Spark wywołuje tę metodę dla każdej wartości wejściowej. I tu jest kluczowa sprawa. Wewnątrz metody eval nie używasz standardowego return. Zamiast tego używasz pythonowego słowa kluczowego yield. Za każdym razem, gdy metoda robi yield wartości, Spark zamienia to na nowy wiersz w tabeli wyjściowej. Jeśli przekażesz pojedynczy wejściowy string, metoda eval może przeiterować po nim i zrobić yield dziesięć razy. Spark bierze te dziesięć yieldów i tworzy dziesięć osobnych wierszy. Przejdźmy przez konkretny przykład. Tworzysz klasę o nazwie ProcessWords. Twoim celem jest przekazanie całego zdania i otrzymanie z powrotem tabeli, w której każde słowo ma swój własny wiersz. Piszesz metodę eval tak, aby przyjmowała string tekstowy. Wewnątrz metody robisz split zdania po spacjach. Następnie iterujesz po powstałych słowach. Dla każdego słowa robisz yield tuple'a zawierającego samo słowo. Zanim Spark będzie mógł użyć tej klasy, nakładasz na nią dekorator UDTF z PySpark. Dekorator jest obowiązkowy, ponieważ definiuje twój output schema. Jawnie deklarujesz nazwy kolumn i typy danych, które generuje twoja funkcja. Jeśli robisz yield stringa, mówisz dekoratorowi, że output to kolumna typu string. Jeśli chcesz zrobić yield słowa i liczby jego znaków, robisz yield dwuelementowego tuple'a, a twój dekorator określa schemat z kolumną typu string i kolumną typu integer. Poza metodą eval, klasa UDTF może również zawierać opcjonalną metodę terminate. Spark wywołuje metodę terminate dokładnie raz dla każdej partycji danych, po tym jak wszystkie wiersze wejściowe zostaną przetworzone przez metodę eval. Jest to bardzo przydatne do agregacji. Jeśli twoja metoda eval śledzi wewnętrzny licznik przez wiele wierszy wejściowych, metoda terminate może zrobić yield jednego końcowego wiersza zawierającego tę całkowitą liczbę przed zamknięciem partycji. Kiedy wywołujesz UDTF w operacji na DataFrame, zachowuje się on jak inline table. Jeśli przekażesz istniejącą kolumnę z DataFrame do UDTF, Spark zastosuje tę funkcję tabelaryczną wiersz po wierszu. Ponieważ funkcja tabelaryczna zwraca wiele wierszy dla każdego pojedynczego wiersza wejściowego, połączenie tego outputu z twoim oryginalnym datasetem wymaga niejawnego lateral join. Spark ogarnia to pod spodem, duplikując dane z oryginalnego wiersza, aby dopasować je do nowych wierszy po explode, wygenerowanych przez twoją klasę w Pythonie. Prawdziwą siłą pythonowego UDTF jest całkowite odpięcie wolumenu wejściowego od wyjściowego, co pozwala pojedynczemu punktowi danych rozkwitnąć w pełny, wielokolumnowy dataset. To wszystko w tym odcinku. Dzięki za wysłuchanie i twórz dalej!
18

Pandas API w Spark

3m 53s

Skaluj swoje istniejące skrypty Pandas w nieskończoność. Odkryj, jak pyspark.pandas API pozwala na natywne wykonywanie standardowej składni Pandas w rozproszonym klastrze Spark.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 18 z 21. Masz lokalny skrypt danych, który działa idealnie, ale nagle rozmiar twojego datasetu rośnie czterokrotnie i twojej maszynie brakuje pamięci. Znasz składnię bezbłędnie, ale przepisanie wszystkiego pod rozproszony framework zajmie ci całe dni. Pandas API w Sparku wypełnia dokładnie tę lukę. Pandas API w Sparku pozwala ci uruchamiać standardowe workloady pandasa na rozproszonym klastrze. Nie polega to tylko na ślepej emulacji pandasa. Przechwytuje twój kod w pandasie i pod spodem tłumaczy go na zoptymalizowane execution plany Sparka. Aby z niego skorzystać, importujesz moduł o nazwie pyspark dot pandas. Standardową konwencją jest nadanie mu aliasu ps, co bezpośrednio odzwierciedla znajomy alias pd, używany w lokalnych workloadach data science. Jeśli masz już w pamięci standardowy, lokalny DataFrame pandasa, przejście jest proste. Wywołujesz funkcję from pandas na module ps i przekazujesz swój lokalny DataFrame. To konwertuje obiekt typu single-node na rozproszony DataFrame pandas-on-Spark. Od tego momentu składnia, której używasz do interakcji z tym nowym obiektem, pozostaje identyczna z tą, którą już znasz. Ta spójność dotyczy również tego, jak dane są przetwarzane pod spodem. Rozproszone API natywnie obsługuje brakujące dane dokładnie tak samo, jak lokalny pandas. Jeśli twój dataset zawiera wartości NumPy Not-a-Number, pandas API w Sparku zarządza nimi poprawnie podczas operacji matematycznych czy transformacji strukturalnych. Nie musisz wymyślać nowej logiki czyszczenia danych dla swoich jobów w Sparku. Standardowe operacje przekładają się bezpośrednio. Jeśli chcesz pogrupować dane po konkretnej kolumnie, wywołujesz standardową funkcję grupowania. Jeśli chcesz obliczyć średnią lub sumę, po prostu chainujesz funkcję agregującą zaraz po tym. Możesz nawet wywoływać funkcje do plotowania bezpośrednio na rozproszonym DataFrame'ie. Spark przetwarza ciężkie obliczenia na klastrze, agreguje niezbędne data pointy i zwraca wizualizację tak, jakbyś pracował na pojedynczej maszynie. Oto kluczowa kwestia. Architektura pod spodem jest fundamentalnie inna, a to wprowadza krytyczny edge case dotyczący generowania indeksów. Lokalny pandas mocno polega na sekwencyjnym, ściśle uporządkowanym indeksie dla każdego pojedynczego wiersza. Spark jednak partycjonuje dane i dystrybuuje je na wiele niezależnych maszyn. Wymuszenie ścisłego, globalnie uporządkowanego, sekwencyjnego indeksu w systemie rozproszonym wymaga stałej komunikacji między worker node'ami. Kiedy tworzysz DataFrame pandas-on-Spark bez jawnego definiowania kolumny indeksu, API automatycznie generuje domyślny indeks, aby idealnie naśladować standardowe zachowanie pandasa. Tworzenie i utrzymywanie tego domyślnego indeksu wymaga synchronizacji stanu na całym klastrze. Jeśli operujesz na ogromnym datasecie, ta synchronizacja wprowadza poważny performance overhead. API często wyrzuci warning dotyczący tego wewnętrznego overheadu podczas wykonywania. Aby uniknąć tego bottlenecku, zdecydowanie zaleca się natychmiastowe przypisanie istniejącej kolumny jako indeksu lub skonfigurowanie API tak, aby używało typu indeksu przyjaznego dla środowisk rozproszonych. Pandas API w Sparku daje ci dokładną składnię pandasa napędzaną przez rozproszony execution engine Sparka, ale pamiętanie o tym, że ścisłe, sekwencyjne indeksy niosą ze sobą duży koszt synchronizacji, uchroni twój klaster przed niepotrzebnymi spowolnieniami. To wszystko na dziś. Dzięki za wysłuchanie — idź zbudować coś fajnego.
19

Wczytaj i podziwiaj: Formaty przechowywania

4m 01s

Nie wszystkie formaty plików są sobie równe. Zestawiamy oparte na wierszach pliki CSVs z formatami kolumnowymi, takimi jak Parquet i ORC, badając opcje odczytu/zapisu oraz optymalne techniki przechowywania.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySpark, odcinek 19 z 21. Zapisanie ogromnego datasetu jako CSV to najprostsza rzecz na świecie, a jednocześnie jedna z najbardziej destrukcyjnych rzeczy, jakie możesz zrobić dla wydajności swojego data lake'a. Płacisz za więcej storage'u, płacisz za więcej compute'u, a każde downstreamowe query wlecze się w nieskończoność. Rozwiązanie kryje się w tym, jak podchodzisz do tematu Load and Behold: Storage Formats, i dlaczego to, jak zapisujesz dane, ma takie samo znaczenie, jak to, jak je transformujesz. PySpark używa zunifikowanego interfejsu do odczytu i zapisu danych w dziesiątkach systemów storage'owych. Wywołujesz atrybut read lub write na swojej sesji Sparka lub DataFrame'ie, określasz format, podajesz chain opcji i wskazujesz ścieżkę do pliku. To przewidywalny wzorzec, ale opcje, które wybierzesz, dyktują, ile pracy twój cluster będzie musiał wykonać później. Zacznijmy od formatów czytelnych dla człowieka, czyli CSV i JSON. Są to formaty row-based. Kiedy czytasz CSV, Spark parsuje dane linijka po linijce. Często musisz zchainować konkretne opcje, żeby ten tekst miał sens. Na przykład, możesz zchainować opcję, która powie Sparkowi, że plik ma header, kolejną opcję, żeby ustawić customowy delimiter, taki jak pipe czy tabulator, i trzecią opcję, żeby dokładnie zdefiniować, jak wygląda wartość null, na przykład przekazując konkretny string, żeby Spark poprawnie zmapował go na pustą wartość, zamiast traktować jako dosłowny tekst. JSON jest nieco lepszy, ponieważ natywnie obsługuje zagnieżdżone struktury, ale powtarza klucze schemy dla każdego pojedynczego rekordu, co potężnie pompuje rozmiar pliku. Oba formaty zmuszają Sparka do odczytania całego wiersza z dysku, nawet jeśli twoje query prosi tylko o jedną kolumnę. I tutaj wkraczają formaty columnar, takie jak Parquet i ORC. Zwróć na to uwagę. Analityczne query rzadko potrzebują każdej kolumny w szerokiej tabeli. Zazwyczaj potrzebują konkretnych kolumn z milionów wierszy, żeby odpalić agregacje. Parquet i ORC przechowują dane zorganizowane według kolumn, a nie wierszy. Jeśli zrobisz query na trzy kolumny ze stu, Spark odczyta tylko te chunki pliku, które zawierają te trzy kolumny. Całkowicie pomija resztę, ucinając dyskowe I/O do ułamka tego, czego wymaga CSV. Ponieważ dane tego samego typu są przechowywane razem, formaty columnar również pięknie się kompresują. Katalog plików JSON może skurczyć się o siedemdziesiąt procent lub więcej po konwersji do Parquet. Osadzają one również dokładną schemę i typy danych w metadanych pliku, co oznacza, że Spark nie musi zgadywać ani inferować typów podczas ładowania. Kiedy będziesz gotowy, żeby zapisać te dane z powrotem, musisz zarządzać stanem w miejscu docelowym. Domyślnie, jeśli spróbujesz zrobić write do ścieżki, gdzie dane już istnieją, Spark rzuci błędem, żeby zapobiec przypadkowej utracie danych. Kontrolujesz to używając metody mode przed wyzwoleniem zapisu. Jeśli przekażesz string overwrite, Spark usunie istniejące dane w docelowej ścieżce i zastąpi je twoim obecnym DataFrame'em. Jeśli przekażesz append, Spark po prostu doda twoje nowe part files do istniejącego katalogu. Jest też mode ignore, który po cichu nie robi niczego, jeśli katalog jest już zapełniony. Zapisywanie czystych, otypowanych danych columnar dzisiaj, oszczędza twojemu clusterowi godziny zmarnowanego czasu przetwarzania jutro. Jeśli chcesz pomóc w tworzeniu kolejnych odcinków, możesz wesprzeć program, wyszukując DevStoriesEU na Patreonie. Dzięki za spędzenie ze mną tych kilku minut. Do usłyszenia następnym razem, trzymaj się.
20

Pogromcy błędów: Plany fizyczne i złączenia

3m 18s

Zajrzyj pod maskę silnika wykonawczego Spark. Dowiedz się, jak debugować zapytania za pomocą DataFrame.explain() i jak wyeliminować kosztowne przetasowania używając Broadcast joins.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySparka, odcinek 20 z 21. Twój job w PySparku nie jest wolny dlatego, że przetwarza dane. Jest wolny, bo traci cały czas na przerzucanie danych przez sieć. Kiedy zwykły join sprawia, że twój cluster ledwo dycha, rozwiązanie znajdziesz w Bug Busting: Physical Plans and Joins. Pisząc skrypt w PySparku, definiujesz operacje logiczne. Mówisz Sparkowi, co ma zrobić, a nie jak ma to zrobić. Ale kiedy job działa za wolno, musisz dokładnie wiedzieć, jak Spark wykonał twoje żądanie. Robisz to, wywołując metodę explain na swoim DataFrame. Wywołanie explain wypisuje physical plan. To schemat rzeczywistych tasków, które Spark odpala na twoim clusterze. Czytasz ten plan z dołu do góry, śledząc dane od plików źródłowych aż do finalnego outputu. Jeśli spojrzysz na physical plan dla standardowego joina między dwoma DataFrame'ami, pewnie zobaczysz krok o nazwie SortMergeJoin. Żeby wykonać SortMergeJoin, Spark musi upewnić się, że wiersze z tymi samymi join keys znajdują się fizycznie na tym samym executorze. Żeby to osiągnąć, Spark wykonuje Exchange. Exchange to termin z physical planu oznaczający network shuffle. Oznacza to, że Spark wyciąga dane z partycji, przepycha je przez sieć i zapisuje na dysku, żeby inne executory mogły je odczytać. Shuffle to zdecydowanie najdroższa operacja w distributed computing. I tu pojawia się kluczowa sprawa. Jeśli robisz joina ogromnej fact table z małą lookup table, robienie shuffle na dużej tabeli to ogromne marnotrawstwo zasobów. Zamiast robić shuffle obu tabel, żeby dopasować klucze, możesz po prostu wysłać całą małą tabelę do każdego executora. Robi się to za pomocą funkcji broadcast z modułu PySpark SQL functions. Wywołując metodę join, po prostu opakowujesz mniejszy DataFrame w funkcję broadcast. Opakowując małą tabelę, dajesz Sparkowi jasną dyrektywę. Spark zrobi collect małego DataFrame'a do driver node'a, a następnie prześle jego pełną kopię do pamięci każdego pojedynczego executora. Teraz, kiedy duży DataFrame jest przetwarzany, executory mają już wszystkie potrzebne dane z lookup table prosto w RAM-ie. Po prostu streamują przez swoje istniejące partycje i lokalnie dopasowują wiersze. Żadne sortowanie nie jest potrzebne, a żadne dane z dużej tabeli nie latają po sieci. Jeśli wywołasz explain na tym nowym broadcast joinie, physical plan wygląda zupełnie inaczej. SortMergeJoin znika. Kosztowny krok Exchange całkowicie znika. Na ich miejscu zobaczysz BroadcastExchange i BroadcastHashJoin. BroadcastExchange przenosi małą tabelę tylko raz, a sam join odbywa się całkowicie w miejscu. Najprostszy sposób na podwojenie prędkości joba w Sparku to przestać przenosić dane, które nie muszą być przenoszone. Czytaj swoje physical plany, wyłapuj network exchanges i rób broadcast swoich małych tabel. To wszystko na dziś. Dzięki za wysłuchanie — idź zbudować coś fajnego.
21

Profilowanie pamięci i wydajności w PySpark

4m 00s

Kończymy naszą podróż z PySpark, wprowadzając natywne narzędzia do profilowania. Dowiedz się, jak śledzić zużycie pamięci linijka po linijce i ujawniać ukryte wewnętrzne ślady błędów w języku Python.

Pobierz
Cześć, tu Alex z DEV STORIES DOT EU. Podstawy PySparka, odcinek 21 z 21. Debugowanie rozproszonego kodu w Pythonie zazwyczaj oznacza przekopywanie się przez tysiące linijek bezsensownych błędów Javy w próbie odgadnięcia, dlaczego twoja funkcja wywaliła błąd albo dlaczego zjadła całą pamięć na klastrze. Już nie musisz zgadywać. Dzisiaj przyjrzymy się profilowaniu pamięci i wydajności w PySparku, a także upraszczaniu stack trace'ów. Kiedy piszesz User Defined Function, czyli UDF w PySparku, twój kod w Pythonie działa na infrastrukturze Java Virtual Machine. Jeśli twój kod w Pythonie dzieli przez zero albo odwołuje się do brakującego klucza w dictionary, ten prosty pythonowy exception zostaje połknięty. Jest przekazywany z powrotem przez daemona PySparka, przez sieć i opakowany w ogromne javowe exceptiony. Znalezienie właściwego błędu Pythona w logach jest żmudne. Możesz to naprawić, włączając uproszczone tracebacki. Kiedy ustawisz konfigurację Sparka dla uproszczonych tracebacków na true, PySpark zmienia sposób raportowania błędów. Usuwa wszystkie logi z interoperacyjności Javy i szum z procesów workerów. Następnym razem, gdy twój UDF wywali błąd, twoja konsola wyrzuci standardowy, czysty stack trace z Pythona, pokazujący dokładny numer linijki w twoim pliku w Pythonie, w której wystąpił exception. Naprawianie crashy to tylko połowa sukcesu. Naprawa powolnego lub pożerającego pamięć kodu jest znacznie trudniejsza. Jeśli napiszesz Pandas UDF, który przetwarza miliony wierszy, może on wykonać się poprawnie, ale trwać o wiele za długo albo wywołać błędy out-of-memory na twoich node'ach executorów. Historycznie, znalezienie wąskiego gardła wymagało dodawania ręcznego logowania albo zgadywania, która operacja w Pandas była nieefektywna. Spark 4.0 zmienia to, wprowadzając wbudowane profilery Python UDF. Oto kluczowa sprawa. Możesz teraz profilować swój rozproszony kod w Pythonie linijka po linijce, bezpośrednio w PySparku. Żeby tego użyć, ustawiasz konfigurację profilera UDF na jeden z dwóch trybów: performance albo memory. Jeśli ustawisz konfigurację profilera na słowo "perf", Spark aktywuje profiler performance. Następnie odpalasz swojego joba w Sparku tak jak zwykle. Kiedy node'y workerów wykonują twój Pandas UDF, Spark śledzi czas wykonania każdej pojedynczej linijki twojej funkcji w Pythonie. Kiedy twój job się skończy, wywołujesz metodę show na obiekcie profile Sparka. Spark wypisze szczegółowy raport w twojej konsoli. Dla każdej linijki twojego kodu zobaczysz dokładnie, ile razy została wywołana i jaki był całkowity czas jej wykonywania. Możesz natychmiast zobaczyć, czy konkretna manipulacja na stringach albo operacja matematyczna spowalnia twój cały pipeline. Jeśli mierzysz się z limitami pamięci, zamiast tego ustawiasz konfigurację profilera UDF na słowo "memory". Workflow jest dokładnie taki sam, ale output się zmienia. Kiedy przeglądasz raport z profilowania, Spark pokazuje ci dokładny przyrost w megabajtach spowodowany przez każdą linijkę twojego kodu w Pythonie. Możesz dokładnie zobaczyć, gdzie alokowane są duże arraye i gdzie pamięć nie jest zwalniana. Taka widoczność linijka po linijce eliminuje zgadywanie przy optymalizacji złożonych transformacji danych. Możesz precyzyjnie namierzyć dokładną przyczynę twoich problemów z wydajnością bez opuszczania środowiska PySparka. Ponieważ to ostatni odcinek naszej serii o PySparku, zachęcam cię do sprawdzenia oficjalnej dokumentacji Sparka i wypróbowania tych narzędzi do debugowania w praktyce. Jeśli masz pomysły na technologie, które powinniśmy omówić w kolejnej serii, wpadnij na devstories.eu i daj nam znać. Dzięki za spędzenie ze mną tych kilku minut. Do usłyszenia następnym razem, trzymaj się.