Voltar ao catálogo
Season 14 21 Episódios 1h 20m 2026

PySpark Fundamentals

v4.1 — Edição de 2026. Um guia abrangente sobre o PySpark 4.1, que aborda o Spark Connect, DataFrames, tipos de dados complexos, transformações de dados, SQL, UDFs e profiling.

Big Data Computação Distribuída Ciência de Dados
PySpark Fundamentals
A Reproduzir
Click play to start
0:00
0:00
1
O Problema do Big Data e a Promessa do PySpark
Estabelecemos a necessidade fundamental do PySpark. Descubra por que razão bibliotecas padrão de Python como o Pandas falham em grande escala, e como o PySpark fornece um motor de execução distribuída para processar conjuntos de dados massivos de forma contínua.
3m 54s
2
A Revolução do Spark Connect
Explore a arquitetura do Spark Connect. Explicamos como o PySpark separou o cliente e o servidor, permitindo-lhe executar aplicações Spark em qualquer lugar sem as pesadas dependências da JVM.
3m 16s
3
DataFrames e Lazy Evaluation
Mergulhe na abstração fundamental do PySpark: o DataFrame. Discutimos o conceito de lazy evaluation, a diferença entre transformações e ações, e por que razão o Spark planeia antes de executar.
4m 23s
4
Criar e Visualizar DataFrames
Aprenda a instanciar DataFrames a partir de objetos Python em bruto, dicionários e ficheiros, e como inspecionar com segurança os seus dados distribuídos sem bloquear o seu nó driver.
3m 36s
5
Dominar os Tipos de Dados Básicos
Uma visita guiada aos tipos numéricos e de strings fundamentais do PySpark. Exploramos como definir esquemas explicitamente usando StructType e StructField para pipelines de dados robustos.
4m 19s
6
Os Perigos da Precisão
Descubra as diferenças críticas entre FloatType, DoubleType e DecimalType. Aprenda por que razão escolher o tipo numérico errado pode introduzir erros de arredondamento desastrosos em dados financeiros.
4m 16s
7
Dominar Dados Complexos e Aninhados
O Big Data nem sempre é plano. Exploramos os tipos de dados complexos do PySpark, incluindo ArrayType, StructType e MapType, permitindo-lhe analisar nativamente JSON profundamente aninhado.
4m 20s
8
Type Casting e Seleção
Aprenda a moldar ativamente os esquemas do seu DataFrame. Abordamos como selecionar subconjuntos de colunas e como fazer o cast seguro de colunas de um tipo de dados para outro.
3m 28s
9
Junção de Funções: Limpeza de Dados Sujos
Lixo entra, lixo sai. Aprenda as transformações essenciais de DataFrames para remover nulos, preencher valores em falta e lidar com registos NaN nativamente em sistemas distribuídos.
3m 51s
10
Transformar e Remodelar Dados
Assuma o controlo da forma dos seus dados. Exploramos como gerar novas colunas com funções matemáticas, realizar manipulações de strings e achatar arrays aninhados usando explode.
3m 51s
11
A Mecânica do Agrupamento e Agregação
Domine a estratégia split-apply-combine. Mergulhamos no agrupamento de dados por chaves e na aplicação de poderosas funções de agregação para resumir conjuntos de dados massivos.
3m 41s
12
Quando os DataFrames Colidem: A Arte das Joins
Navegar pelas nuances da combinação de conjuntos de dados. Detalhamos os sete tipos diferentes de joins no PySpark e explicamos como fundir DataFrames com segurança.
3m 26s
13
Velho SQL, Novos Truques
Porquê aprender uma nova API quando pode usar SQL em bruto? Aprenda a executar queries SQL padrão diretamente contra DataFrames PySpark distribuídos.
3m 53s
14
Intercâmbio entre DataFrames e SQL
Misture e combine SQL com Python de forma contínua. Descubra como criar temporary views a partir de DataFrames, usar selectExpr e encadear operações programáticas nos resultados de queries SQL.
3m 39s
15
Estender o Spark com Python UDFs
Quando as funções integradas não são suficientes, as User-Defined Functions entram em ação. Exploramos como escrever lógica Python personalizada para DataFrames, e por que razão as UDFs escalares padrão escondem uma penalização de desempenho.
4m 04s
16
Turbinar UDFs com o Apache Arrow
Elimine o estrangulamento da serialização de JVM para Python. Revelamos como as Vectorized Pandas UDFs e os formatos de memória do Apache Arrow turbinam as suas transformações personalizadas.
3m 36s
17
Expandir Linhas com Python UDTFs
As UDFs padrão devolvem um valor por linha, mas e se precisar de várias linhas? Aprenda como as Python User-Defined Table Functions (UDTFs) resolvem problemas complexos de geração de um-para-muitos.
4m 00s
18
A Pandas API no Spark
Escale os seus scripts Pandas existentes até ao infinito. Descubra como a API pyspark.pandas lhe permite executar a sintaxe padrão do Pandas nativamente num cluster Spark distribuído.
3m 59s
19
Carregar e Contemplar: Formatos de Armazenamento
Nem todos os formatos de ficheiro são criados da mesma forma. Contrastamos CSVs baseados em linhas com formatos colunares como Parquet e ORC, explorando opções de leitura/escrita e técnicas de armazenamento ideais.
3m 33s
20
Caça aos Bugs: Planos Físicos e Joins
Espreite debaixo do capô do motor de execução do Spark. Aprenda a depurar queries usando DataFrame.explain() e como eliminar shuffles dispendiosos usando Broadcast joins.
3m 20s
21
Profiling de Memória e Desempenho no PySpark
Concluímos a nossa jornada no PySpark introduzindo ferramentas nativas de profiling. Aprenda a acompanhar o consumo de memória linha a linha e a expor tracebacks internos ocultos do Python.
4m 03s

Episódios

1

O Problema do Big Data e a Promessa do PySpark

3m 54s

Estabelecemos a necessidade fundamental do PySpark. Descubra por que razão bibliotecas padrão de Python como o Pandas falham em grande escala, e como o PySpark fornece um motor de execução distribuída para processar conjuntos de dados massivos de forma contínua.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 1 de 21. O teu script Python padrão funciona perfeitamente em testes, mas no momento em que o teu dataset atinge cinquenta gigabytes, vai abaixo com um erro OutOfMemory. Atingiste os limites físicos de uma única máquina. A solução para este bottleneck é o foco deste episódio: o problema de big data e a promessa do PySpark. As ferramentas de dados padrão de Python são construídas para execução num único node. Bibliotecas como o pandas são incrivelmente eficientes, mas exigem que todo o dataset resida na memória local. Se o teu servidor tiver dezasseis gigabytes de RAM e tentares carregar cinquenta gigabytes de logs da aplicação, o sistema operativo intervém e mata o processo. Escalar verticalmente alugando um servidor maior e mais caro apenas adia o inevitável. Os dados crescem mais rápido do que os upgrades de hardware. Mais cedo ou mais tarde, os dados ultrapassam a capacidade da máquina. O PySpark resolve esta limitação. É a API de Python para o Apache Spark. O próprio Apache Spark é um motor de computação distribuída que corre na Java Virtual Machine. O PySpark atua como uma ponte, permitindo-te escrever a tua lógica puramente em Python, enquanto tiras partido do motor distribuído altamente otimizado do Spark. Isto muda a tua arquitetura de scaling vertical para scaling horizontal. Em vez de dependeres de uma única máquina enorme, o PySpark particiona os teus dados e distribui os teus cálculos por um cluster de várias máquinas mais pequenas, conhecidas como nodes. Tu escreves o teu código Python, e o PySpark traduz isso num plano de execução paralela. Se o teu volume de dados duplicar no próximo mês, não tens de reescrever uma única linha de código. Simplesmente adicionas mais nodes ao cluster. O ecossistema PySpark está organizado em alguns módulos principais, desenhados para diferentes workloads. O primeiro é o Spark SQL. Esta é a base para a maioria das aplicações PySpark modernas. Fornece uma estrutura de DataFrame para lidar com dados tabulares espalhados por várias máquinas. Também te permite correr queries SQL standard diretamente contra estes datasets distribuídos. A seguir, temos o Structured Streaming. Este módulo lida com pipelines de dados em tempo real. Em vez de processar um batch massivo de dados durante a noite, o Structured Streaming processa continuamente fluxos de registos, como leituras de sensores em tempo real ou eventos de tráfego web. Usa exatamente o mesmo modelo de programação do Spark SQL, o que significa que a tua lógica de processamento em batch e a tua lógica de streaming parecem quase idênticas. Depois temos o MLlib, a Machine Learning Library. Treinar modelos em datasets massivos numa única máquina é um bottleneck notório. O MLlib fornece algoritmos de machine learning distribuídos para tarefas como classificação, regressão e clustering. Espalha as operações matemáticas pesadas por todo o cluster, reduzindo drasticamente o tempo de treino. Aqui está o ponto chave. O verdadeiro poder do PySpark é a abstração. Tu nunca divides manualmente os teus ficheiros massivos em chunks. Nunca escreves código de rede para coordenar os servidores. Simplesmente defines uma sequência lógica de transformações, e o motor subjacente lida com a distribuição de dados, a execução paralela, e até mesmo o processo de recuperação caso um node perca energia a meio do cálculo. O PySpark não é meramente um utilitário para abrir ficheiros maiores. É uma mudança fundamental de uma computação limitada por uma única motherboard para uma computação limitada apenas pelo tamanho do teu cluster. Se achas estes episódios úteis e queres apoiar o programa, podes procurar por DevStoriesEU no Patreon. É tudo por este episódio. Obrigado por ouvires, e continua a desenvolver!
2

A Revolução do Spark Connect

3m 16s

Explore a arquitetura do Spark Connect. Explicamos como o PySpark separou o cliente e o servidor, permitindo-lhe executar aplicações Spark em qualquer lugar sem as pesadas dependências da JVM.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 2 de 21. Durante anos, escrever código PySpark localmente significava arrastar uma Java Virtual Machine enorme e pesada só para testar um script simples. Tinhas de sincronizar perfeitamente as versões de Python, as configurações de Java e as dependências do cluster antes de escreveres uma única linha de lógica. A revolução do Spark Connect torna isso completamente obsoleto. Tradicionalmente, o PySpark dependia de uma arquitetura fortemente acoplada. O teu script Python e o execution engine do Spark tinham de coexistir exatamente na mesma máquina física ou virtual. Iniciar uma sessão PySpark significava arrancar uma Java Virtual Machine em background usando uma bridge library. Esta arquitetura sobrecarregava o teu ambiente de desenvolvimento local com todo o peso do execution engine do Spark. Isto tornava a integração do PySpark em aplicações web, editores de código modernos ou edge devices altamente impraticável. O Spark Connect resolve isto introduzindo uma arquitetura client-server desacoplada. O teu ambiente Python está agora estritamente separado do servidor Spark. O cliente PySpark local torna-se uma biblioteca leve. Já não requer uma instalação local de Java e não executa tarefas de processamento de dados por si só. Atua puramente como uma interface remota para o cluster Spark real. Aqui está o ponto chave. Quando escreves operações de DataFrame com o Spark Connect, o cliente leve regista as tuas method calls e traduz essas chamadas para um unresolved logical plan. Podes imaginar este plano como um blueprint abstrato da tua query, que descreve estritamente que dados processar, sem te preocupares com a forma como são processados. O cliente empacota este blueprint usando Protocol Buffers e transmite-o através de uma ligação de rede gRPC para o servidor Spark remoto. O servidor desempacota o plano, lida com toda a otimização complexa da query, executa o job em todo o cluster e, finalmente, faz o stream dos resultados computados de volta para o teu script Python. Configurar isto requer uma pequena alteração na forma como inicias a tua aplicação. Continuas a usar o builder da SparkSession, mas, em vez de dependeres de configurações locais, chamas o método remote. Forneces uma connection string que detalha onde o servidor Spark está localizado. Esta string usa um connection scheme dedicado que começa com as letras s c. Portanto, se te estiveres a ligar a um servidor de teste local na porta default, forneces a string s c dois pontos barra barra localhost dois pontos um cinco zero zero dois. Após esse único passo de ligação, escreves o teu código de DataFrame da mesma forma que sempre fizeste. Como a execução é totalmente remota, podes ligar vários clientes Python diferentes, de aplicações distintas, exatamente ao mesmo servidor Spark em simultâneo. O código da tua aplicação simplesmente pede transformações de dados, e o heavy lifting fica inteiramente do lado do servidor. Ao isolar completamente o cliente Python do runtime, o Spark Connect elimina os notórios conflitos de dependências que costumavam quebrar os deployments, permitindo-te atualizar os teus ambientes de aplicação de forma totalmente independente do próprio cluster Spark. Obrigado por passares uns minutos comigo. Até à próxima, fica bem.
3

DataFrames e Lazy Evaluation

4m 23s

Mergulhe na abstração fundamental do PySpark: o DataFrame. Discutimos o conceito de lazy evaluation, a diferença entre transformações e ações, e por que razão o Spark planeia antes de executar.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos do PySpark, episódio 3 de 21. E se o teu código não corresse logo quando o escreves, mas em vez disso esperasse, analisasse o teu objetivo final e traçasse a rota mais rápida possível? Encadeias filtros, agregações e joins, e a tua máquina mal se esforça. Isso acontece porque ela não faz nada até a forçares. Este mecanismo chama-se lazy evaluation, e é o motor principal por trás dos DataFrames do PySpark. Um DataFrame do PySpark é uma coleção distribuída de dados organizada em colunas nomeadas. Se estás familiarizado com pandas, o conceito é idêntico. A diferença é que um DataFrame do PySpark divide os seus dados por vários compute nodes num cluster. Historicamente, a estrutura base no Spark era o Resilient Distributed Dataset, geralmente conhecido como RDD. O ecossistema afastou-se bastante da manipulação direta de RDDs. De facto, a partir da versão 4.0 do Spark, a utilização direta de RDDs já não é suportada no Spark Connect. Os DataFrames são agora o standard definitivo, fornecendo uma API estrita que permite ao Spark otimizar automaticamente as tuas queries. Essa otimização depende inteiramente de lazy evaluation. Cada operação que fazes num DataFrame enquadra-se numa de duas categorias estritas: uma transformation ou uma action. As transformations são comandos que devolvem um novo DataFrame. Exemplos incluem selecionar colunas específicas, filtrar linhas com base numa condição, agrupar registos, ou fazer join de duas tabelas separadas. Quando aplicas uma transformation, o PySpark não executa o processamento de dados. Ele simplesmente regista a operação. Atualiza um blueprint interno chamado logical execution plan. Podes escrever cinquenta transformations consecutivas, e o Spark vai apenas validar rapidamente a sintaxe e atualizar o seu graph. Aqui está o ponto chave. Ao adiar a execução real, o PySpark dá ao seu query engine subjacente, o Catalyst Optimizer, a imagem completa do teu data pipeline. O optimizer inspeciona toda a tua chain de transformations, reorganiza-as para máxima eficiência, e elimina totalmente os passos desnecessários antes que um único byte de dados seja lido do disco. Este blueprint permanece completamente adormecido até invocares uma action. Uma action é um comando que exige um resultado concreto. Ou retorna dados para o teu driver program ou escreve dados para o storage. Actions comuns incluem contar o número total de linhas, fazer collect dos dados de volta para uma lista local de Python, ou comandar o sistema para mostrar os primeiros vinte registos no teu ecrã. No momento em que disparas uma action, o motor arranca. Ele traduz o teu logical plan otimizado num physical plan, distribui as tasks pelos workers do cluster, e corre a computação. Considera um data workflow standard. Primeiro, crias um DataFrame a apontar para um ficheiro massivo. Depois, fazes join com uma tabela separada de detalhes de utilizadores. Após o join, filtras os resultados para incluir apenas utilizadores de uma cidade específica. Finalmente, pedes ao Spark para mostrar o output. Devido à lazy evaluation, o Spark não carrega realmente o ficheiro inteiro, não faz um join distribuído massivo, e depois filtra os resultados no fim. Em vez disso, o optimizer olha para o teu pedido final, repara no filter, e empurra essa operação de filter para cima na chain, muito antes de o join acontecer. Ele lê seletivamente apenas os registos relevantes, reduzindo drasticamente o uso de memória e o tráfego de rede em todo o cluster. O teu script PySpark nunca é uma sequência de comandos imediatos. É um conjunto de instruções a desenhar um blueprint arquitetónico, e o sistema só começa a construção quando finalmente exiges o resultado final. É tudo por hoje. Obrigado por ouvires — vai construir algo fixe.
4

Criar e Visualizar DataFrames

3m 36s

Aprenda a instanciar DataFrames a partir de objetos Python em bruto, dicionários e ficheiros, e como inspecionar com segurança os seus dados distribuídos sem bloquear o seu nó driver.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos do PySpark, episódio 4 de 21. Chamar um método específico num dataset enorme é a forma garantida de crashar instantaneamente toda a tua aplicação com um erro de out-of-memory. Saber como mover dados para dentro e para fora do Spark em segurança, sem rebentar com o teu driver node, é crucial. É exatamente isso que este episódio aborda: criar e visualizar DataFrames. Qualquer aplicação PySpark precisa de dados para funcionar. Geralmente, crias DataFrames de três maneiras. Primeiro, podes criá-los diretamente a partir de estruturas Python em memória. Basta definires uma lista de dicionários, onde cada dicionário representa uma row e as keys são os nomes das colunas, e passá-la para o método create DataFrame na tua SparkSession. Segundo, se já tiveres um DataFrame de pandas em memória, podes passar exatamente esse objeto pandas para o mesmo método create DataFrame. O PySpark trata da conversão automaticamente. A terceira e mais comum maneira é ler de ficheiros externos. Usas o atributo read na tua SparkSession, seguido pelo formato que queres, como csv ou json, e forneces o file path. Assim que os teus dados estiverem carregados, precisas de os verificar. Os DataFrames de PySpark são distribuídos, o que significa que não podes simplesmente fazer print da variável e ver os dados como farias num script Python normal. Para veres a estrutura dos teus dados, chamas o método print schema. Isto gera uma árvore em texto, mostrando o nome de cada coluna e o seu data type correspondente. É a maneira mais rápida de verificares se o teu ficheiro foi carregado corretamente. Para veres o conteúdo em si, usas o método show. Por default, chamar o show mostra as primeiras vinte rows num formato tabular. Presta atenção a esta parte. Se as tuas colunas contiverem strings longas, o método show vai truncá-las. Podes desativar isto passando um argumento truncate definido como false, ou defini-lo para um número específico de caracteres. Se o teu DataFrame tiver dezenas de colunas, a view de tabela standard faz wrap no ecrã e torna-se ilegível. Nesse caso, podes passar o argumento vertical definido como true. Isto faz print de cada row como um bloco vertical de pares key-value, tornando datasets largos muito mais fáceis de ler num terminal. Agora, chegamos ao crash por out-of-memory mencionado há pouco. Às vezes, precisas de trazer os dados distribuídos de volta para objetos Python normais. O método para fazer isto chama-se collect. Aqui está o ponto chave. O método collect pega em cada row de cada executor em todo o teu cluster e força-a para a memória do teu único driver node. Se o teu DataFrame contiver mil milhões de rows, o teu driver vai ficar sem memória e crashar instantaneamente. Só deves chamar o collect quando tiveres agregado ou filtrado os teus dados para um tamanho pequeno. Ao lidar com grandes datasets, extrai sempre amostras mais pequenas. Em vez do collect, usa o método take, passando o número de rows que queres. Isto devolve uma lista Python standard contendo apenas essas primeiras rows. Se precisares de verificar o final do teu dataset, usa o método tail para ir buscar as últimas rows. Ambos os métodos limitam em segurança a quantidade de dados transferida para o teu driver. A regra para dados distribuídos é simples: envia as computações para o cluster, mas limita rigorosamente o número de rows que trazes de volta para o driver. É tudo por este episódio. Obrigado por ouvires, e continua a desenvolver!
5

Dominar os Tipos de Dados Básicos

4m 19s

Uma visita guiada aos tipos numéricos e de strings fundamentais do PySpark. Exploramos como definir esquemas explicitamente usando StructType e StructField para pipelines de dados robustos.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 5 de 21. Confiar na inferência automática de schema pode poupar-te algumas linhas de código, mas vai custar-te caro na performance em produção. O cluster muitas vezes tem de ler todo o teu dataset só para adivinhar o que lá está dentro antes de fazer qualquer trabalho real. Tu resolves isto dominando os data types básicos e schemas explícitos. É comum confundir os types standard de Python com os data types de PySpark. Quando declaras um integer ou uma string em Python standard, esse objeto vive na memória da tua máquina local. Os types de PySpark operam num nível completamente diferente. Eles são instruções de mapping para o optimizer Catalyst e para a Java Virtual Machine subjacente. Quando usas data types de PySpark, estás a definir uma estrutura estrita e cluster-aware. Isto garante a consistência dos dados em centenas de worker nodes distribuídos e dita exatamente como os dados são serializados pela rede. O PySpark fornece um type específico para cada formato de dados standard, e selecionar o correto é crucial para a performance. Para números, tens o ByteType para integers muito pequenos, IntegerType para números standard, e LongType para valores grandes. Selecionar ByteType em vez de LongType para um simples status code poupa uma memória significativa quando essa escolha é multiplicada por milhares de milhões de rows. Para texto e lógica, usas StringType e BooleanType. Lidar com o tempo corretamente é outra área onde o typing exato importa. O PySpark divide os dados temporais em DateType e TimestampType. Usas DateType quando só te interessa a data do calendário, como o aniversário de um utilizador. Usas TimestampType quando precisas de pontos exatos no tempo, registando tanto a data como a hora, minuto e segundo exatos em que um evento ocorreu. Conhecer estes types é apenas a base. Tens de os aplicar diretamente ao teu processo de ingestão de dados usando um schema explícito. Tu constróis este schema usando dois objetos específicos: StructType e StructField. Podes pensar num StructType como o blueprint para uma row inteira no teu dataframe. Um StructField é o blueprint para uma única coluna dentro dessa row. Para construíres um schema explícito, instancias um StructType e passas-lhe uma coleção de StructFields. Cada StructField requer três argumentos específicos. Primeiro, forneces o nome da coluna como uma string standard. Segundo, passas o data type de PySpark específico que queres forçar, como IntegerType ou StringType. Terceiro, forneces uma flag boolean a indicar se esta coluna tem permissão para conter valores null. Por exemplo, constróis um schema a começar com um StructField chamado user identifier, atribuído a um StringType, e defines a flag null como false. A seguir, crias um StructField chamado account age, atribuído a um IntegerType, definindo a flag null como true. Assim que este objeto StructType estiver totalmente montado, passas o mesmo diretamente para o teu dataframe reader usando o método schema antes de chamares o comando load para leres os teus ficheiros. Esta é a parte que importa. Quando forneces este schema explícito à partida, o PySpark salta completamente a fase de data scanning. Ele aplica o teu blueprint diretamente à stream de dados de entrada. Isto reduz drasticamente o tempo que demora a ler um ficheiro. Também atua como um quality gate imediato. Se um ficheiro malformado chegar com texto na tua coluna de integers, o pipeline lida com isso com base na tua estrutura definida, em vez de passar silenciosamente o schema inferido para downstream e partir as tuas transformações. Definir o teu schema explicitamente transforma uma operação de leitura frágil e cara num step de pipeline previsível e altamente otimizado. Obrigado por ouvirem, happy coding a todos!
6

Os Perigos da Precisão

4m 16s

Descubra as diferenças críticas entre FloatType, DoubleType e DecimalType. Aprenda por que razão escolher o tipo numérico errado pode introduzir erros de arredondamento desastrosos em dados financeiros.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 6 de 21. Usar um float standard pode parecer inofensivo, até a tua query de agregação calcular mal, e de forma silenciosa, milhões em transações financeiras. Um código que corre perfeitamente pode produzir números ligeira e perigosamente errados. É exatamente por isso que precisamos de falar sobre os perigos da precisão. No PySpark, tens três maneiras principais de armazenar números com partes fracionárias. Tens o FloatType, o DoubleType e o DecimalType. Eles não são intercambiáveis. Um erro comum é deixar o PySpark inferir um schema a partir dos teus dados brutos. A inferência geralmente atribui o DoubleType a qualquer número com um ponto decimal. Se estiveres a calcular receitas financeiras, confiar neste comportamento default é um sério risco operacional. Para perceber porquê, precisamos de analisar como o FloatType e o DoubleType funcionam internamente. O FloatType usa matemática de floating-point IEEE 754 de 32 bits. O DoubleType usa a versão de 64 bits do mesmo standard. Ambos representam números como frações binárias. Pensa em como a fração um terço não pode ser escrita na perfeição usando decimais de base dez. Torna-se numa string interminável de três. Exatamente a mesma limitação existe em binário. Números decimais comuns, como zero ponto um ou zero ponto dois, não podem ser representados na perfeição em base dois. O computador armazena uma minúscula aproximação. Com o DoubleType, tens 64 bits de espaço, o que significa que a aproximação é incrivelmente próxima do número real. Se fizeres uma query a uma única linha de dados, raramente vais notar a diferença. Aqui está o ponto chave. O erro acumula-se durante as agregações. Quando calculas as receitas financeiras totais somando milhares de milhões de linhas individuais, essas imprecisões microscópicas acumulam-se. Uma fração de cêntimo perdida ou ganha em cada transação acaba por distorcer o total agregado final em milhares ou até milhões de dólares. A tua lógica de agregação está matematicamente correta, mas o data type subjacente corrompe o resultado. Se o teu sistema estiver a calcular simulações de física ou a treinar modelos de machine learning, o FloatType e o DoubleType são exatamente o que queres. Eles trocam a exatidão por processamento de hardware a alta velocidade. Mas no momento em que lidas com dinheiro, precisas de uma precisão rigorosa e inflexível. Isto leva-nos ao DecimalType. O DecimalType não usa aproximações de floating-point. Armazena os números exatamente como os defines, usando uma scale fixa. Quando configuras um DecimalType, defines dois parâmetros distintos. Primeiro, especificas a precision, que é o número máximo total de dígitos que o valor pode conter. Segundo, especificas a scale, que dita o número exato de dígitos permitidos à direita do ponto decimal. Se configurares um DecimalType com uma precision de dez e uma scale de dois, o PySpark aloca o espaço exato necessário para armazenar esse valor ao cêntimo. Não há frações binárias nem suposições de arredondamento. Na prática, implementas isto assumindo um controlo rigoroso dos teus schemas. Ao ler registos financeiros de um ficheiro de origem, não deixes o PySpark adivinhar os tipos. Primeiro, crias um objeto de schema rigoroso. A seguir, defines os teus campos financeiros, como receita ou imposto. Por fim, atribuis-lhes explicitamente um DecimalType com a tua precision e scale escolhidas. Assim que o teu dataframe carregar com este schema, as tuas agregações standard de soma ou média vão executar perfeitamente da primeira à bilionésima linha. Sacrificas uma pequena quantidade de performance de compute em comparação com um DoubleType standard, mas garantes que os teus relatórios financeiros são absolutamente impecáveis. A regra é simples: usa tipos de floating-point para velocidade e aproximações científicas, mas no momento em que um número representar moeda, fixa-o com um DecimalType. Obrigado por ouvires. Até à próxima!
7

Dominar Dados Complexos e Aninhados

4m 20s

O Big Data nem sempre é plano. Exploramos os tipos de dados complexos do PySpark, incluindo ArrayType, StructType e MapType, permitindo-lhe analisar nativamente JSON profundamente aninhado.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 7 de 21. O big data do mundo real raramente é uma folha de cálculo plana. Às vezes, precisas de um array de dicionários aninhados só para fazer o parse de um único evento JSON. Para lidar com isso, temos de falar sobre como domar dados complexos e aninhados. Os workflows relacionais preferem tabelas planas, mas os dados de eventos modernos chegam fortemente aninhados. O PySpark lida com isto fornecendo três tipos de dados complexos. São eles o ArrayType, o StructType e o MapType. Estes permitem-te modelar explicitamente estruturas hierárquicas de forma nativa no engine. Pega num perfil de cliente padrão para veres como estes tipos funcionam. O primeiro conceito é o ArrayType. Este representa uma coleção de elementos. A regra estrita é que cada item dentro de um ArrayType tem de partilhar exatamente o mesmo tipo de dados subjacente. Não podes misturar strings e integers no mesmo array. Se o teu perfil de cliente incluir uma lista de IDs de encomendas recentes, defines essa coluna como um ArrayType contendo integers. A seguir, temos o StructType. Um StructType modela um record hierárquico aninhado, funcionando essencialmente como uma row embutida dentro de outra row. Contém campos específicos e nomeados. Ao contrário de um array, cada campo dentro de um StructType pode ter um tipo de dados completamente diferente. Supõe que o teu cliente tem uma morada. Essa morada contém o nome da rua como uma string, o código postal como um integer, e uma flag boolean a indicar se é uma propriedade comercial. Agrupas estes campos distintos num único StructType. Aqui está o ponto chave. Podes aninhar estes tipos complexos com uma profundidade arbitrária. Se um cliente tiver várias moradas, não crias colunas planas e numeradas. Em vez disso, crias um ArrayType onde o tipo de elemento interno é exatamente esse StructType de morada. Agora tens um array de structs, que mapeia perfeitamente para um array JSON standard de objetos. A terceira estrutura é o MapType, desenhado especificamente para pares key-value. Difere de um StructType na forma como lida com a estrutura versus o schema. Um StructType exige que faças hardcode dos nomes exatos dos campos à partida. Um MapType é flexível quanto ao conteúdo dos dados, mas estrito quanto aos tipos de dados. Cada key no map tem de ser de um tipo específico, e cada value tem de ser de outro tipo específico. Podes usar um MapType para guardar as preferências de aplicação do cliente. As keys podem ser strings, como tema ou idioma, e os values também podem ser strings, como escuro ou inglês. Por ser um MapType, a aplicação upstream pode injetar keys de preferência totalmente novas mais tarde, sem te forçar a alterar o schema central do DataFrame. Simplesmente fazes a query dos values dinamicamente pelas suas keys. Quando constróis este schema complexo no teu código, constrói-lo de dentro para fora. Primeiro, defines os campos internos do StructType de morada. Depois, passas essa struct completa para uma definição de ArrayType. A seguir, defines o MapType para as preferências do utilizador. Finalmente, envolves todos estes componentes, juntamente com tipos scalar simples, como a string do nome do cliente, num StructType mestre que define a row global do DataFrame. Em vez de achatar estruturas aninhadas em strings JSON confusas, definir explicitamente estes schemas complexos permite que o optimizer do Spark faça o prune dos dados e filtre profundamente dentro dos campos aninhados sem fazer o deserialize de todo o payload para a memória. Obrigado por ouvires — até à próxima.
8

Type Casting e Seleção

3m 28s

Aprenda a moldar ativamente os esquemas do seu DataFrame. Abordamos como selecionar subconjuntos de colunas e como fazer o cast seguro de colunas de um tipo de dados para outro.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 8 de 21. Um simples valor de string escondido numa coluna de inteiros pode paralisar completamente um cluster de mil nós. Precisas de uma forma fiável de impor estruturas de dados corretas e escolher exatamente que dados passam pelo teu pipeline, e é por isso que hoje vamos olhar para Type Casting e Selection. Para manipulares dados no PySpark, primeiro tens de perceber o que uma coluna realmente é. Uma instância de coluna não é um array físico de dados carregado em memória. É uma representação de uma expressão avaliada de forma lazy. Quando referencias uma coluna no teu código, não estás a tocar nos dados subjacentes. Estás simplesmente a adicionar um passo ao logical plan do Spark. Os dados só se movem quando uma action é desencadeada mais tarde. Para recuperar e moldar estes dados, usas o método select no teu DataFrame. Tens duas formas principais de dizer ao método select que colunas queres. A forma mais simples é passar os nomes das colunas como strings de texto normais. Se passares uma string ao select, o Spark devolve um novo DataFrame que contém exatamente essa coluna, completamente inalterada. Isto funciona bem para uma extração básica, mas não oferece espaço para modificações. Para modificar os dados durante a seleção, tens de usar objetos Column em vez de strings. Acedes a um objeto Column referenciando-o diretamente a partir do DataFrame. Podes fazer isto usando dot notation, como dataframe ponto age, ou usando bracket notation com o nome da coluna como uma string dentro de parênteses retos. A bracket notation é especialmente útil quando os nomes das tuas colunas contêm espaços ou caracteres especiais que iriam quebrar a dot notation normal. Esta é a parte que interessa. Quando passas um objeto Column para o método select, podes anexar-lhe métodos para transformar os dados on the fly. Uma das transformações mais críticas é a conversão de tipos. Os dados chegam muitas vezes no formato errado. Por exemplo, podes receber métricas numéricas formatadas como strings de texto. Para corrigir isto, usas o método cast. O PySpark também fornece um alias chamado astype, que executa exatamente a mesma lógica. Chamas o método cast diretamente no teu objeto Column dentro do statement select. O método cast requer um argumento, que é o data type de destino. Podes definir este destino passando uma representação em string do tipo, como a palavra int, ou passando um objeto de data type específico do Spark, como IntegerType. Eis como isto flui num script real. Chamas o método select no teu DataFrame. Dentro dos parênteses desse método, referencias a tua coluna de destino usando bracket notation. Logo a seguir a essa referência de coluna, chamas ponto cast e forneces o teu novo tipo. Quando avaliado, isto devolve um DataFrame completamente novo, onde a tua coluna selecionada está agora convertida com segurança para o tipo especificado. O DataFrame original permanece totalmente intacto, porque os DataFrames são imutáveis. A principal conclusão é que o type casting no PySpark não é um processo isolado aplicado in place a um dataset existente. É uma column expression avaliada de forma lazy, inerentemente ligada ao ato de selecionar dados para construir um novo DataFrame strongly typed. Se gostas do podcast e queres ajudar a apoiar o programa, podes procurar por DevStoriesEU no Patreon. Ficamos por aqui neste episódio. Obrigado por ouvires, e continua a construir!
9

Junção de Funções: Limpeza de Dados Sujos

3m 51s

Lixo entra, lixo sai. Aprenda as transformações essenciais de DataFrames para remover nulos, preencher valores em falta e lidar com registos NaN nativamente em sistemas distribuídos.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 9 de 21. Garbage in, garbage out. Mas o que fazes quando o teu dataset cheio de lixo tem centenas de terabytes de tamanho e não consegues inspecionar manualmente uma única row? Precisas de uma forma sistemática de o sanitizar à escala. É exatamente isso que abordamos hoje no Function Junction: Cleaning Dirty Data. O primeiro passo na limpeza é, geralmente, padronizar o teu schema. Vais receber frequentemente ficheiros raw com espaços, caracteres especiais ou erros de digitação nos cabeçalhos. Usa o método chamado with column renamed. Simplesmente passas-lhe o nome antigo da string e o novo nome da string desejado. Se tiveres várias colunas para corrigir, fazes chain deste método sequencialmente para cada coluna antes de aplicares qualquer transformação complexa downstream. Antes de remover maus dados, temos de esclarecer uma confusão frequente em relação a null e NaN em PySpark. Null significa que um data point está completamente ausente. NaN significa Not a Number, que representa um resultado matemático indefinido, como dividir zero por zero. Em Python puro, estes exigem tratamento separado. No entanto, o PySpark agrupa-os por conveniência. Quando usas as funções N A do dataframe, o Spark avalia os valores NaN como nulls para efeitos de drop ou fill. Para eliminar rows com valores ausentes, usas o método N A dot drop. Chamar esta função completamente vazia faz drop de qualquer row que contenha um null ou NaN numa única coluna. Esta abordagem é altamente destrutiva em wide datasets. Um único valor ausente numa coluna de metadados opcional vai apagar uma row de dados de transação que, de resto, estariam perfeitos. Para evitar isto, passa uma lista de nomes de colunas ao parâmetro subset. O PySpark vai então avaliar apenas essas colunas críticas e específicas ao decidir se faz drop da row. Fazer drop de rows nem sempre é permitido pelas regras de negócio. Muitas vezes, tens de substituir valores ausentes por defaults seguros. Consegues fazer isto usando o N A dot fill. Embora possas passar um único valor para fazer fill a todas as colunas, a melhor abordagem é passar um dicionário. As keys do dicionário representam os nomes das colunas específicas, e os values representam as tuas substituições escolhidas. Este padrão permite-te fazer fill a uma métrica numérica ausente com um zero, enquanto substituis simultaneamente uma categoria ausente por uma text string como unknown. Fazer isto através de um dicionário é executado numa única pass, o que é altamente eficiente. Por fim, os teus dados podem estar totalmente preenchidos, mas ainda assim inválidos. Outliers e valores fisicamente impossíveis exigem filtragem lógica. Isolas os bons dados usando o método where para manter apenas as rows que satisfazem uma condição específica. Para limites numéricos ou de data, o método between é a tua melhor ferramenta. Selecionas a tua coluna, chamas o between e forneces os limites inferior e superior. Isto substitui a lógica verbosa de greater-than e less-than, tornando o teu código mais fácil de ler. Qualquer row que caia fora desses limites é filtrada do dataframe resultante. Aqui está o key insight. A ordem importa muito ao limpar à escala. Renomeia sempre as colunas primeiro para fixar o teu schema, faz drop ou fill aos valores ausentes a seguir para estabilizar os teus data types, e filtra os outliers por último, apenas quando souberes que os dados subjacentes são estruturalmente sólidos. É tudo por este episódio. Obrigado por ouvires, e continua a construir!
10

Transformar e Remodelar Dados

3m 51s

Assuma o controlo da forma dos seus dados. Exploramos como gerar novas colunas com funções matemáticas, realizar manipulações de strings e achatar arrays aninhados usando explode.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 10 de 21. Às vezes, uma única linha de dados contém um array de registos ocultos — e tu precisas de detonar esse array para o analisares corretamente. Transformar e remodelar dados é a forma como descompactas, formatas e estruturas essa informação para processamento downstream. Quando precisas de modificar um dataframe no PySpark, não alteras os dados in place. Os dataframes são imutáveis. Em vez disso, crias novas versões usando um método chamado withColumn. Este método recebe dois argumentos. O primeiro é uma string que representa o nome da coluna que queres criar ou substituir. O segundo é uma expressão de coluna que define os dados reais. Se forneceres um nome que já existe no dataframe, o PySpark sobrepõe a coluna original. Se o nome for completamente novo, o PySpark faz append da nova coluna ao lado direito do teu dataset. Para definir o que vai para essa nova coluna, normalmente usas as built-in functions do PySpark. Estas são importadas do módulo de funções SQL e fornecem operações altamente otimizadas que são executadas em todo o teu cluster. Considera a manipulação de strings. Dados de texto de fontes externas raramente estão perfeitamente formatados. Podes ter uma coluna com nomes de utilizador escritos numa mistura imprevisível de letras maiúsculas e minúsculas. Podes corrigir isto passando a tua coluna existente para uma built-in function como lower, que força todo o texto para minúsculas. Em alternativa, podes usar uma função de capitalização para garantir que a primeira letra é maiúscula e as restantes são minúsculas. Na prática, constróis estas operações diretamente nas tuas transformações de dataframe. Chamas withColumn, dás um nome à tua coluna de destino e atribuis-lhe o resultado da função lower aplicada à tua coluna de input. O PySpark avalia esta expressão para cada linha. Podes encadear várias chamadas withColumn para aplicar diversas transformações sequencialmente, passando o dataframe progressivamente atualizado para o passo seguinte de cada vez. Agora, a segunda parte disto é o reshaping. Limpar strings altera os valores, mas o que acontece quando a forma fundamental dos teus dados está a impedir a análise? É aqui que a coisa fica interessante. Podes receber um dataset onde o identificador de uma pessoa está numa coluna, e os seus rendimentos mensais de todo o ano estão compactados num único array na coluna adjacente. Não podes executar agregações relacionais standard num array aninhado. Precisas de cada valor de rendimento individual na sua própria linha para calcular médias ou encontrar mínimos. Resolves este problema estrutural usando uma built-in function chamada explode. A função explode lida especificamente com arrays e maps. Chamas withColumn, especificas o nome da coluna que queres para o output, e passas a função explode a encapsular a tua coluna de array. O PySpark executa isto pegando na linha única original e abrindo-a. Se o array de rendimentos contiver doze valores distintos, o explode gera doze linhas completamente separadas. No novo dataframe, a coluna de destino agora contém um único valor de rendimento flat por linha, em vez de uma lista. Crucialmente, o PySpark duplica todas as outras colunas da linha original. O identificador do utilizador é copiado exatamente para todas as doze novas linhas. A relação lógica entre o utilizador e os seus rendimentos permanece perfeitamente intacta, mas os dados agora estão flat. Fizeste o reshape de uma estrutura aninhada para uma tabela longa, pronta para operações standard de agrupamento e filtragem. O verdadeiro poder das transformações do PySpark é que funções como explode e lower não manipulam apenas valores individuais; elas definem um plano de computação lógico que escala instantaneamente, quer tenhas cem linhas ou cem mil milhões de linhas, sem nunca exigir que escrevas um único loop manual. É tudo por este episódio. Até à próxima!
11

A Mecânica do Agrupamento e Agregação

3m 41s

Domine a estratégia split-apply-combine. Mergulhamos no agrupamento de dados por chaves e na aplicação de poderosas funções de agregação para resumir conjuntos de dados massivos.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 11 de 21. Quando estás a olhar para milhares de milhões de registos individuais, lê-los linha a linha é impossível. Para extraíres algum significado real, tens de os resumir. Hoje vamos cobrir exatamente como isso acontece: A Mecânica de Agrupamento e Agregação. Nos bastidores, o PySpark processa agregações usando uma estratégia de dados clássica chamada split-apply-combine. Este pattern é exatamente o que parece. Primeiro, o PySpark divide o enorme dataset em buckets lógicos distintos com base numa key que tu escolhes. A seguir, aplica um cálculo específico a cada bucket individualmente em todo o cluster. Finalmente, combina essas respostas independentes num único resultado resumido. No teu código, disparas a fase de split chamando o método group by no teu DataFrame. Basta dares o nome da coluna que queres usar como a tua grouping key. Por exemplo, se tiveres uma tabela enorme de transações históricas, podes fazer group by pela coluna do nome de utilizador. Aqui está o ponto chave. Chamar o group by não devolve um novo DataFrame. Em vez disso, devolve uma construção de transição chamada objeto GroupedData. Como o PySpark avalia o teu código de forma lazy, apenas construiu o execution plan para organizar estes buckets. Não vai mover quaisquer dados até lhe dizeres que operação matemática executar nesses buckets. Para fornecer essa operação matemática, fazes chain do método aggregate, normalmente escrito como agg, diretamente nos teus grouped data. Isto trata das fases de apply e combine. Dentro do método aggregate, dizes ao PySpark o que calcular usando ferramentas do módulo PySpark SQL functions. Este módulo contém dezenas de operações de aggregation otimizadas. Digamos que queres calcular o rendimento médio para cada um desses utilizadores. Importarias a função average, normalmente referida como avg. Passas o nome da tua coluna de rendimento para a função average, e colocas isso dentro do método aggregate. Quando isto executa, o PySpark calcula o rendimento médio para cada bucket de utilizador distinto em simultâneo. A fase de combine entra então em ação, devolvendo um DataFrame standard e legível. Este novo DataFrame contém apenas uma linha por utilizador, emparelhada com o seu rendimento médio recém-calculado. Nesta fase, tens uma tabela perfeitamente resumida. No entanto, como o cálculo aconteceu em paralelo num cluster distribuído, as linhas finais são devolvidas na ordem aleatória em que os processing nodes terminaram o seu trabalho. Se precisas de ver os que ganham mais, a ordem aleatória é inútil. Para corrigir isto, fazes chain do método order by no final do teu passo de aggregation. Passas ao método order by a coluna que contém as tuas novas médias, e dizes-lhe para fazer sort por ordem decrescente. O PySpark vai pegar nos resultados combinados, ordená-los e entregar uma tabela limpa e ordenada. O pattern split-apply-combine é poderoso precisamente porque mapeia na perfeição para hardware distribuído, permitindo que datasets massivos sejam resumidos em segundos. Mas lembra-te que fazer o grouping de dados é apenas metade da operação. O grouping requer uma aggregation para terminar o trabalho, caso contrário ficas apenas com um cluster cheio de buckets vazios à espera de instruções. Obrigado por passares uns minutos comigo. Até à próxima, fica bem.
12

Quando os DataFrames Colidem: A Arte das Joins

3m 26s

Navegar pelas nuances da combinação de conjuntos de dados. Detalhamos os sete tipos diferentes de joins no PySpark e explicamos como fundir DataFrames com segurança.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 12 de 21. Fazer o merge de duas tabelas enormes é a operação mais dispendiosa em computação distribuída. Aplica a lógica de matching errada, e torna-se na maneira mais fácil de crashar o teu cluster por falta de memória. Saber exatamente como combinar datasets em segurança é o tema do When DataFrames Collide: The Art of Joining. O mecanismo principal para combinar dados em PySpark é o método join. Tu chamas isto no teu DataFrame base, passando o DataFrame que queres anexar, a coluna ou colunas específicas para fazer o match, e o método join. Se não forneceres nenhum método join, o PySpark faz um inner join por defeito. Considera um cenário concreto. Tens um DataFrame a registar as alturas das pessoas, e um segundo DataFrame a registar os seus rendimentos. Ambos os datasets partilham uma coluna chamada name. Com um inner join, o PySpark olha para a coluna name em ambos os datasets e apenas mantém as linhas onde o name existe nos dois lados. Se uma pessoa aparece nos dados de alturas, mas está ausente nos dados de rendimentos, o seu registo é completamente descartado do resultado. Para reteres registos sem match, mudas o tipo de join. Um left join mantém todas as linhas do teu DataFrame inicial, que neste caso são os dados de alturas. Se o PySpark encontrar um match para o name nos dados de rendimentos, ele anexa esse rendimento. Se não encontrar um match, mantém a linha da altura, mas coloca um valor null na coluna do rendimento. Um right join faz exatamente o inverso, mantendo todos os rendimentos e preenchendo as alturas em falta com nulls. Quando precisas de absolutamente tudo, usas um full join. O PySpark retém todos os registos de ambos os DataFrames. Os names com match fazem merge para uma única linha, e quaisquer names que existam em apenas um dataset são mantidos, com valores null a preencher os dados em falta do outro lado. Aqui está a ideia principal. Um cross join opera de forma diferente porque ignora a condição de join por completo. Ele emparelha cada linha do DataFrame heights com cada linha do DataFrame incomes, criando um produto cartesiano. Se ambas as tabelas tiverem apenas mil linhas, um cross join devolve um milhão de linhas. Este crescimento explosivo é o motivo pelo qual os cross joins são fortemente restritos por defeito, e muitas vezes exigem uma configuração explícita para serem executados sem lançar um erro. Os dois últimos tipos de join são, na verdade, operações de filtragem em vez de verdadeiros merges de dados. Um left semi join procura por matches, devolvendo linhas do DataFrame heights apenas se o name também aparecer no DataFrame incomes. A diferença crucial para um inner join é que um left semi join não puxa nenhuma coluna do lado direito. Ficas exatamente com as mesmas colunas com que começaste, apenas filtradas para os registos que têm um match correspondente. Um left anti join faz precisamente o oposto. Devolve linhas do DataFrame heights apenas se o name não existir nos dados de incomes. Descarta as colunas do lado direito por completo. Isto torna o left anti join na maneira mais eficiente de identificar dados em falta ou encontrar registos que falharam o processamento downstream. A escolha do join determina não apenas os dados que recebes de volta, mas também a quantidade de dados que tem de se mover fisicamente pela tua rede para gerar o resultado. Obrigado por ouvires. Até à próxima!
13

Velho SQL, Novos Truques

3m 53s

Porquê aprender uma nova API quando pode usar SQL em bruto? Aprenda a executar queries SQL padrão diretamente contra DataFrames PySpark distribuídos.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 13 de 21. Tens uma equipa de analistas que escreve excelente SQL, mas os teus dados estão num cluster distribuído enorme. Poderias obrigá-los a aprender uma sintaxe de Python completamente nova, ou poderias deixá-los usar a linguagem que já conhecem. É aí que entra a execução de strings de raw SQL diretamente no PySpark, ensinando novos truques ao velho SQL. O PySpark dá-te uma ponte direta para o standard SQL através de um único método na tua Spark session, chamado simplesmente sql. Passas uma string de raw SQL para este método. O output não é plain text. É um DataFrame standard de PySpark. Isto significa que podes correr uma query de base de dados standard, receber um DataFrame de volta, e passá-lo imediatamente para outra função de Python. É totalmente interoperável. Antes de poderes fazer queries aos dados com SQL, o PySpark precisa de saber que tabelas existem. Tens duas formas principais de expor os teus dados ao SQL engine. Primeiro, se já tens um DataFrame em Python, podes chamar um método para o registar como uma temporary view. Dás-lhe um nome em formato string, e de repente age como uma tabela nas tuas queries de SQL. Segundo, podes criar tabelas inteiramente dentro da tua string de SQL. Passas um statement create table para o método sql. Dentro dessa string, defines o schema e dizes ao PySpark exatamente onde vivem os ficheiros de dados subjacentes, como uma path de cloud storage que contém ficheiros Parquet. O PySpark regista isto no seu catalog interno. A partir daí, fazes a query pelo nome, tal como numa tabela de base de dados tradicional. Compara como a mesma lógica fica em ambas as abordagens. Imagina que precisas de ir buscar nomes de clientes, fazer drop de quem tenha um balance de zero, e fazer merge do resultado com uma tabela de orders. Na DataFrame API, constróis uma chain de métodos de Python. Chamas o select no teu dataset de clientes para escolher a coluna name. Depois fazes chain de um método filter, para verificar se o balance é maior que zero. Finalmente, fazes append de um método join a referenciar o dataset de orders numa key correspondente. É altamente programático. Na abordagem de SQL, escreves um statement select standard a puxar a coluna name, adicionas uma cláusula where para o balance, e escreves um inner join para a tabela de orders. Isto fica no teu script como um único bloco de string legível. Aqui está o insight principal. Existe um equívoco comum de que escrever SQL dentro de strings de Python tem de ser mais lento ou menos nativo do que usar os métodos estruturados de DataFrame. Isso é falso. Quer faças chain de métodos de Python ou passes uma string de raw SQL, o PySpark trata-os de forma idêntica. Ambos os inputs sofrem parse imediatamente, são traduzidos para o mesmíssimo logical plan, e entregues ao Catalyst optimizer. O execution engine não sabe nem quer saber que API usaste para expressar a tua intenção. A performance é exatamente a mesma. A escolha entre a DataFrame API e o raw SQL nunca é sobre a performance do cluster. É puramente sobre o que torna a tua equipa mais rápida e a tua codebase mais fácil de manter. Obrigado por estares aí. Espero que tenhas aprendido algo novo.
14

Intercâmbio entre DataFrames e SQL

3m 39s

Misture e combine SQL com Python de forma contínua. Descubra como criar temporary views a partir de DataFrames, usar selectExpr e encadear operações programáticas nos resultados de queries SQL.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 14 de 21. Podes dar por ti no meio de um debate sobre se deves escrever as tuas transformações de dados em Python ou SQL. Forçar uma escolha estrita entre os dois deixa uma enorme quantidade de utilidade de lado. A verdadeira vantagem está em alternar entre DataFrames e SQL de forma fluida dentro do mesmo pipeline. Às vezes, um conjunto complexo de nested joins é muito mais fácil para a tua equipa ler e manter em raw SQL. Outras vezes, precisas de iterar pelos nomes das colunas de forma dinâmica, o que é impossível em pure SQL, mas trivial em Python. O PySpark permite-te combinar ambas as abordagens sem quebrar o teu data flow. Para começares a escrever SQL contra um DataFrame Python existente, tens primeiro de expor esse DataFrame ao engine de Spark SQL. Consegues fazer isto chamando o método create or replace temp view diretamente no teu DataFrame. Passas um único argumento de string, que se torna o nome da tabela. Esta operação não move quaisquer dados. Não escreve no disco. Simplesmente regista um pointer temporário na tua sessão Spark atual. O engine de SQL agora sabe como resolver esse nome de tabela de volta para o teu DataFrame Python. Agora podes fazer queries. Chamas spark dot sql e passas o teu statement select standard como uma string, referenciando o nome da tabela que acabaste de criar. Aqui está o detalhe crucial. O output dessa chamada spark dot sql não é um resultado de texto estático, nem é um tipo diferente de objeto. Retorna um DataFrame PySpark standard. Isto significa que podes fazer chain imediatamente de métodos normais de DataFrame Python diretamente no fim da tua chamada SQL. Podes escrever uma string SQL de cinquenta linhas para lidar com uma window function complexa, fechar os parênteses do spark dot sql, e adicionar imediatamente um método dot filter ou dot group by. Transitas de Python para SQL e de volta para Python num único bloco de código. Se precisares de SQL apenas para o cálculo de uma coluna específica, registar uma temporary view completa é desnecessário. Em vez disso, usas o método select expression. Este método atua como uma ponte. Funciona exatamente como um método select standard de um DataFrame, mas aceita expressões de string em raw SQL em vez de objetos column de Python. Se precisares de executar um statement case-when, realizar funções matemáticas, ou fazer cast de um data type usando sintaxe SQL nativa, passas essas strings SQL exatas para o select expression. O Spark pega nessas strings, faz o parse delas, e executa-as exatamente como faria dentro de uma query SQL completa. Isto permite-te permanecer totalmente dentro da API chainable de DataFrames, enquanto dependes da sintaxe SQL para lógica complexa ao nível da row. A fronteira entre estes dois paradigmas é completamente artificial. Quer faças chain de métodos Python, escrevas queries em raw SQL, ou uses strings de select expression, o Spark compila tudo para exatamente o mesmo execution plan otimizado. Se nos quiseres ajudar a continuar a fazer estes episódios, podes procurar por DevStoriesEU no Patreon para apoiares o programa. É tudo por este episódio. Obrigado por ouvires, e continua a construir!
15

Estender o Spark com Python UDFs

4m 04s

Quando as funções integradas não são suficientes, as User-Defined Functions entram em ação. Exploramos como escrever lógica Python personalizada para DataFrames, e por que razão as UDFs escalares padrão escondem uma penalização de desempenho.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 15 de 21. Escreves uma função personalizada em Python, integras no teu data pipeline, e funciona na perfeição numa pequena amostra. Mas quando a corres no dataset completo, o job fica extremamente lento, enquanto o uso da CPU dispara. O código em si está correto, mas estás a pagar uma taxa de execução oculta. Hoje vamos falar sobre como estender o Spark com Python UDFs. Uma User Defined Function, ou UDF, permite-te executar lógica Python personalizada diretamente num DataFrame do Spark. Usas isto quando as funções built-in do Spark SQL não cobrem a tua lógica de negócio específica. O processo é simples. Começas por escrever uma função Python standard. Por exemplo, escreves uma função que recebe uma string de texto, aplica uma regra de formatação personalizada complexa e retorna a string modificada. Para que o Spark reconheça esta função, importas a função udf do módulo de funções do PySpark SQL e aplicas como um decorator diretamente acima da definição da tua função Python. Também passas um return type para o decorator, como um tipo string ou um tipo integer. Se não forneceres um return type, o Spark assume um tipo string por default, o que pode causar problemas de dados silenciosos se a tua função na verdade retornar um número. Depois de decorada, a tua função Python personalizada age exatamente como uma função nativa do Spark. Podes passá-la para operações de DataFrame, como um statement select, passando-lhe os nomes das colunas como argumentos. Aqui está o ponto chave. Uma Python UDF escalar standard opera estritamente uma linha de cada vez. Recebe um ou mais valores de coluna de uma única linha como input, avalia a tua lógica Python personalizada, e retorna exatamente um valor de output para essa linha específica. Se o teu DataFrame contiver dez milhões de linhas, a tua função Python é invocada dez milhões de vezes separadas. Esta operação linha a linha é fácil de entender, mas cria o enorme bottleneck de performance que mencionámos no início. Para entenderes por que é tão lenta, tens de olhar para como o Spark executa o código under the hood. O Spark é construído em Scala, o que significa que o seu core engine corre dentro de uma Java Virtual Machine, ou JVM. A tua UDF personalizada é escrita em Python. A JVM não consegue executar código Python nativamente. Para aplicar a tua UDF, o Spark é forçado a fazer spin up de processos worker de Python separados, ao lado dos seus próprios executors. Depois, tem de mover fisicamente os dados para fora do espaço de memória da JVM e para dentro do processo Python. O Spark depende de uma library de serialização de Python chamada cloudpickle para lidar com esta transferência complexa. É aqui que a taxa de performance é cobrada. Para cada linha no teu dataset, o Spark serializa os inputs na JVM, envia esses dados binários através de um socket local para o worker de Python, e desserializa-os em objetos Python standard. A tua função personalizada finalmente corre nesses objetos. Depois, todo o ciclo acontece na ordem inversa. O Python serializa o valor de output usando o cloudpickle, envia-o de volta pelo socket, e a JVM desserializa-o de volta para o formato de memória interna do Spark. Esta serialização e desserialização constantes entre Java e Python são incrivelmente caras. O custo real de uma Python UDF standard raramente é a lógica que escreves; é o overhead silencioso de traduzir dados de um lado para o outro entre dois ambientes de runtime completamente diferentes em cada linha. Obrigado por passares uns minutos comigo. Até à próxima, fica bem.
16

Turbinar UDFs com o Apache Arrow

3m 36s

Elimine o estrangulamento da serialização de JVM para Python. Revelamos como as Vectorized Pandas UDFs e os formatos de memória do Apache Arrow turbinam as suas transformações personalizadas.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. PySpark Fundamentals, episódio 16 de 21. E se pudesses acelerar as tuas funções Python personalizadas no Spark num fator de dez, apenas mudando um único decorator? As Python UDFs standard são notoriamente lentas, mas a solução não exige que reescrevas a tua lógica em Scala. Hoje, vamos falar sobre como turbinar UDFs com o Apache Arrow. Quando corres uma Python UDF standard, bates numa enorme barreira de performance na fronteira entre as linguagens. O Spark opera dentro da Java Virtual Machine, mas a tua lógica personalizada corre num processo worker de Python separado. Para passar dados entre eles, o Spark extrai rows da sua memória interna, serializa-as usando uma library chamada cloudpickle, e envia-as para o Python. O Python processa os dados uma row de cada vez, serializa o resultado e envia-o de volta. Fazer isto para milhões de rows individuais cria um bottleneck de serialização insuportável. O Apache Arrow muda as regras desta troca de dados. O Arrow é um formato de dados in-memory, colunar e cross-language. Ele padroniza a forma como os dados aparecem em memória, para que tanto a JVM como o Python os entendam nativamente, sem traduções complexas. Em vez de serializar os dados row a row, o Spark agrupa os dados em grandes batches colunares. Todos os valores de uma coluna específica ficam lado a lado em memória contígua. O Spark envia estes grandes blocos para o Python num único passo eficiente. Podes tirar partido disto de duas maneiras. Primeiro, podes ativar a otimização Arrow para UDFs standard. Fazes isto definindo a propriedade de configuração do Spark para a execução do Arrow como true, ou especificando o parâmetro useArrow igual a true ao registares a tua UDF. O Spark vai usar o Arrow para transferir os dados em batches, reduzindo drasticamente o overhead de serialização, mesmo que a tua função Python ainda execute tecnicamente a lógica uma row de cada vez. Aqui está o ponto chave. Para obteres o máximo speed boost, queres que o teu código Python processe essas batches do Arrow simultaneamente. É aqui que entram as Pandas UDFs. Ao envolveres a tua função personalizada com o decorator pandas UDF, mudas a forma como a função recebe os dados. Em vez de receber um único valor para uma row, a tua função recebe uma Pandas Series contendo uma batch inteira de valores. A tua função aplica uma operação vetorizada a essa batch inteira e retorna uma nova Pandas Series com exatamente o mesmo comprimento. Pensa numa função chamada calculate tax. Aplicas o decorator pandas UDF e declaras que ela retorna um tipo double. A função aceita uma Pandas Series contendo preços de produtos. Dentro da função, não escreves um for-loop. Escreves simplesmente um return statement que multiplica a Series de input por um ponto dois. Como o Pandas é suportado por código C altamente otimizado internamente, ele multiplica todo o bloco de preços instantaneamente. O Spark depois pega nessa Series retornada e faz o merge perfeitamente de volta para o DataFrame usando o Arrow. O verdadeiro poder de uma Pandas UDF não é apenas contornar o bottleneck de serialização do cloudpickle, mas sim transferir a tua computação real de loops lentos de Python para uma execução nativa e vetorizada. Obrigado por ouvirem. Fiquem bem, pessoal.
17

Expandir Linhas com Python UDTFs

4m 00s

As UDFs padrão devolvem um valor por linha, mas e se precisar de várias linhas? Aprenda como as Python User-Defined Table Functions (UDTFs) resolvem problemas complexos de geração de um-para-muitos.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 17 de 21. As User-Defined Functions standard estão estritamente limitadas a um mapeamento de um-para-um. Passas um valor e recebes exatamente um valor de volta. Mas e se uma única entrada de log densa precisar de ser expandida em cem linhas separadas? Para resolver isto, usas User-Defined Table Functions em Python, ou UDTFs. Uma UDTF faz exatamente o que o nome indica. Devolve uma tabela inteira a partir de um único input. Enquanto uma UDF standard calcula um único valor escalar, uma UDTF pode emitir múltiplas linhas e múltiplas colunas. Esta é a ferramenta a que recorres quando precisas de fazer explode de uma string JSON nested, fazer o parse de um ficheiro de texto delimitado linha a linha, ou gerar uma sequência de datas a partir de um único timestamp. Para criar uma UDTF em PySpark, não escreves uma função standalone básica. Em vez disso, defines uma classe Python. Esta classe requer um método específico chamado eval. O método eval é onde a transformação propriamente dita acontece. Quando executas a UDTF, o Spark chama este método para cada valor de input. Aqui está o ponto chave. Dentro desse método eval, não usas um return statement standard. Em vez disso, usas a keyword yield do Python. Cada vez que o método faz yield de um valor, o Spark traduz isso numa nova linha na tua tabela de output. Se passares uma única string de input, o método eval pode fazer um loop por ela e fazer yield dez vezes. O Spark pega nesses dez yields e produz dez linhas distintas. Vamos ver um exemplo concreto. Constróis uma classe chamada ProcessWords. O teu objetivo é passar uma frase completa e receber de volta uma tabela onde cada palavra tem a sua própria linha. Escreves o método eval para aceitar uma string de texto. Dentro do método, fazes o split da frase por espaços. A seguir, fazes um loop pelas palavras resultantes. Para cada palavra, fazes yield de um tuple que contém a própria palavra. Antes que o Spark possa usar esta classe, aplicas-lhe o decorator PySpark UDTF. O decorator é obrigatório porque define o teu schema de output. Declaras explicitamente os nomes das colunas e os data types que a tua função gera. Se fizeres yield de uma string, dizes ao decorator que o output é uma coluna de string. Se quiseres fazer yield da palavra e da sua contagem de caracteres, fazes yield de um tuple de dois elementos, e o teu decorator especifica um schema com uma coluna de string e uma coluna de integer. Para além do método eval, uma classe UDTF também pode incluir um método terminate opcional. O Spark chama o método terminate exatamente uma vez para cada partição de dados, depois de todas as linhas de input terem sido processadas pelo método eval. Isto é altamente útil para agregação. Se o teu método eval mantiver um contador interno ao longo de múltiplas linhas de input, o método terminate pode fazer yield de uma linha final que contém essa contagem total antes da partição fechar. Quando chamas uma UDTF numa operação de DataFrame, ela comporta-se como uma inline table. Se passares uma coluna de DataFrame existente para a UDTF, o Spark aplica a table function linha a linha. Como uma table function faz output de múltiplas linhas para cada linha de input única, combinar este output com o teu dataset original requer um lateral join implícito. O Spark trata disto nos bastidores, duplicando os dados da linha original para corresponderem às novas linhas exploded geradas pela tua classe Python. O poder definidor de uma UDTF de Python é desvincular completamente o teu volume de input do teu volume de output, permitindo que um único data point floresça num dataset completo de múltiplas colunas. É tudo por este episódio. Obrigado por ouvires, e continua a desenvolver!
18

A Pandas API no Spark

3m 59s

Escale os seus scripts Pandas existentes até ao infinito. Descubra como a API pyspark.pandas lhe permite executar a sintaxe padrão do Pandas nativamente num cluster Spark distribuído.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 18 de 21. Tens um script de dados local que funciona perfeitamente, mas de repente o tamanho do teu dataset quadruplica e a tua máquina fica sem memória. Conheces a sintaxe impecavelmente, mas reescrever tudo para uma framework distribuída demora dias. A API de pandas no Spark resolve exatamente esta lacuna. A API de pandas no Spark permite-te executar workloads padrão de pandas num cluster distribuído. Ela não emula apenas o pandas cegamente. Interceta o teu código pandas e traduz tudo em planos de execução otimizados de Spark nos bastidores. Para a usares, importas o módulo chamado pyspark dot pandas. A convenção padrão é atribuir-lhe o alias ps, espelhando diretamente o familiar alias pd usado em workloads locais de data science. Se já tiveres um DataFrame pandas local padrão em memória, a transição é simples. Invocas uma função chamada from pandas no teu módulo ps e passas o teu DataFrame local. Isto converte o objeto single-node num DataFrame pandas-on-Spark distribuído. A partir desse ponto, a sintaxe que usas para interagir com este novo objeto permanece idêntica à que já conheces. Esta consistência estende-se à forma como os dados são processados internamente. A API distribuída lida nativamente com missing data exatamente como o pandas local. Se o teu dataset contém valores NumPy Not-a-Number, a API de pandas no Spark gere-os corretamente durante operações matemáticas ou transformações estruturais. Não precisas de inventar nova lógica de data cleaning para os teus jobs de Spark. As operações padrão traduzem-se diretamente. Se quiseres agrupar os teus dados por uma coluna específica, chamas a função de grouping padrão. Se quiseres calcular a média ou a soma, fazes chain da função aggregate logo a seguir. Podes até chamar funções de plotting diretamente no DataFrame distribuído. O Spark processa as computações pesadas em todo o cluster, agrega os data points necessários e devolve a visualização como se estivesses a trabalhar numa única máquina. Aqui está o ponto chave. A arquitetura subjacente é fundamentalmente diferente, e isso introduz um edge case crítico em relação à geração de indexes. O pandas local depende fortemente de um index sequencial e estritamente ordenado para cada linha. O Spark, no entanto, particiona os dados e distribui-os por várias máquinas independentes. Impor um index sequencial estrito e globalmente ordenado num sistema distribuído exige comunicação constante entre os worker nodes. Quando crias um DataFrame pandas-on-Spark sem definires explicitamente uma coluna de index, a API gera automaticamente um index default para imitar perfeitamente o comportamento padrão do pandas. Criar e manter este index default requer a sincronização de state em todo o cluster. Se estiveres a operar num dataset massivo, esta sincronização introduz um overhead de performance severo. A API emite frequentemente um warning sobre este overhead interno quando executa. Para evitar este bottleneck, é altamente recomendado atribuir uma coluna existente como index imediatamente ou configurar a API para usar um tipo de index distributed-friendly. A API de pandas no Spark dá-te a sintaxe exata de pandas, alimentada pelo motor de execução distribuída de Spark, mas lembrares-te de que indexes sequenciais estritos acarretam um custo de sincronização pesado vai salvar o teu cluster de slowdowns desnecessários. É tudo por hoje. Obrigado por ouvires — vai construir algo fixe.
19

Carregar e Contemplar: Formatos de Armazenamento

3m 33s

Nem todos os formatos de ficheiro são criados da mesma forma. Contrastamos CSVs baseados em linhas com formatos colunares como Parquet e ORC, explorando opções de leitura/escrita e técnicas de armazenamento ideais.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 19 de 21. Guardar um dataset massivo como CSV é a coisa mais fácil do mundo, e é também uma das coisas mais destrutivas que podes fazer à performance do teu data lake. Pagas por mais storage, pagas por mais compute, e cada query downstream arrasta-se. A solução está na forma como lidas com Load and Behold: Storage Formats, e por que motivo a forma como guardas os teus dados importa tanto como a forma como os transformas. O PySpark usa uma interface unificada para ler e escrever dados em dezenas de sistemas de storage. Chamas o atributo read ou write na tua sessão Spark ou DataFrame, especificas um formato, forneces uma chain de options, e apontas para um file path. É um padrão previsível, mas as options que escolhes ditam a quantidade de trabalho que o teu cluster vai ter de fazer mais tarde. Vamos começar com os formatos human-readable, CSV e JSON. Estes são formatos row-based. Quando lês um CSV, o Spark faz o parse dos dados linha a linha. Muitas vezes precisas de fazer chain de options específicas para dar sentido ao texto. Por exemplo, podes fazer chain de uma option para dizer ao Spark que o ficheiro tem um header, outra option para definir um delimiter customizado como um pipe ou um tab, e uma terceira option para definir exatamente o aspeto de um valor null, talvez passando uma string específica para que o Spark a mapeie corretamente para um valor vazio em vez de a tratar como texto literal. O JSON é ligeiramente melhor porque lida com estruturas nested nativamente, mas repete as keys do schema para cada registo, inflacionando massivamente o tamanho do ficheiro. Ambos os formatos forçam o Spark a ler a row inteira do disco, mesmo que a tua query peça apenas uma única coluna. É aqui que entram os formatos colunares como Parquet e ORC. Presta atenção a esta parte. Queries analíticas raramente precisam de todas as colunas numa tabela wide. Normalmente precisam de colunas específicas ao longo de milhões de rows para correr agregações. Parquet e ORC guardam os dados organizados por coluna, não por row. Se fizeres uma query a três colunas num total de cem, o Spark só lê os chunks do ficheiro que contêm essas três colunas. Ele faz skip do resto por completo, cortando o input e output de disco para uma fração do que um CSV exige. Como os dados do mesmo tipo são guardados juntos, os formatos colunares também comprimem lindamente. Um diretório de ficheiros JSON pode encolher setenta por cento ou mais quando convertido para Parquet. Eles também embutem o schema exato e os data types nos metadados do ficheiro, o que significa que o Spark não tem de adivinhar ou inferir tipos no load. Quando estiveres pronto para escrever estes dados novamente, tens de gerir o state no destino. Por default, se tentares escrever para um path onde já existem dados, o Spark lança um erro para evitar a perda acidental de dados. Controlas isto usando o método mode antes de disparar o save. Se passares a string overwrite, o Spark apaga os dados existentes no target path e substitui-os pelo teu DataFrame atual. Se passares append, o Spark simplesmente adiciona os teus novos part files ao diretório existente. Existe também um mode ignore, que silenciosamente não faz nada se o diretório já estiver populado. Escrever dados colunares limpos e tipados hoje poupa ao teu cluster horas de tempo de processamento desperdiçado amanhã. Se quiseres ajudar a manter estes episódios a sair, podes apoiar o programa procurando por DevStoriesEU no Patreon. Obrigado por passares uns minutos comigo. Até à próxima, fica bem.
20

Caça aos Bugs: Planos Físicos e Joins

3m 20s

Espreite debaixo do capô do motor de execução do Spark. Aprenda a depurar queries usando DataFrame.explain() e como eliminar shuffles dispendiosos usando Broadcast joins.

Download
Olá, daqui é o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 20 de 21. O teu job PySpark não está lento por estar a processar dados. Está lento porque passa o tempo todo a mover dados pela rede. Quando um simples join deixa o teu cluster a arrastar-se, a solução está no Bug Busting: Physical Plans and Joins. Quando escreves um script PySpark, defines operações lógicas. Dizes ao Spark o que queres, não como o fazer. Mas quando um job tem mau desempenho, precisas de saber exatamente como o Spark executou o teu pedido. Fazes isso ao chamar o método explain no teu DataFrame. Chamar o explain imprime o physical plan. Este é o esquema das tasks reais que o Spark executa no teu cluster. Lês este plano de baixo para cima, seguindo os dados desde os ficheiros de origem até ao output final. Se olhares para o physical plan de um join standard entre dois DataFrames, provavelmente vais ver um passo chamado SortMergeJoin. Para fazer um SortMergeJoin, o Spark tem de garantir que as linhas com as mesmas join keys estão fisicamente localizadas no mesmo executor. Para conseguir isto, o Spark faz um Exchange. Exchange é o termo do physical plan para um network shuffle. Significa que o Spark está a arrancar dados das partições, a enviá-los pela rede e a gravá-los no disco para que os outros executors os possam ler. O shuffle é a operação mais pesada na computação distribuída. Aqui está o ponto chave. Se estiveres a fazer join de uma fact table enorme com uma pequena lookup table, fazer shuffle da tabela grande é um enorme desperdício de recursos. Em vez de fazeres shuffle de ambas as tabelas para alinhar as keys, podes simplesmente enviar a tabela pequena inteira para cada executor. Isto faz-se com a função broadcast do módulo de funções SQL do PySpark. Quando chamas o teu método join, simplesmente envolves o DataFrame mais pequeno na função broadcast. Ao envolveres a tabela pequena, dás ao Spark uma diretiva estrita. O Spark vai fazer collect do DataFrame pequeno para o driver node, e depois transmitir uma cópia completa do mesmo para a memória de cada executor. Agora, quando o DataFrame grande for processado, os executors já têm toda a lookup data de que precisam ali mesmo na RAM. Eles simplesmente percorrem as suas partições existentes e fazem o match das linhas localmente. Não é preciso fazer sort, e nenhuns dados da tabela grande se movem pela rede. Se chamares o explain neste novo broadcasted join, o physical plan fica completamente diferente. O SortMergeJoin desaparece. O passo pesado de Exchange está completamente ausente. No seu lugar, vais ver um BroadcastExchange e um BroadcastHashJoin. O BroadcastExchange move a tabela pequena apenas uma vez, e o join em si acontece inteiramente no local. A maneira mais fácil de duplicar a velocidade de um job de Spark é parar de mover dados que não precisam de se mover. Lê os teus physical plans, identifica os network exchanges, e faz broadcast das tuas tabelas pequenas. É tudo por hoje. Obrigado por ouvires — vai construir algo fixe.
21

Profiling de Memória e Desempenho no PySpark

4m 03s

Concluímos a nossa jornada no PySpark introduzindo ferramentas nativas de profiling. Aprenda a acompanhar o consumo de memória linha a linha e a expor tracebacks internos ocultos do Python.

Download
Olá, daqui fala o Alex da DEV STORIES DOT EU. Fundamentos de PySpark, episódio 21 de 21. Fazer debugging a código Python distribuído geralmente significa vasculhar milhares de linhas de erros Java sem sentido, a tentar adivinhar por que motivo a tua função falhou ou por que razão consumiu toda a memória do teu cluster. Já não precisas de adivinhar. Hoje, vamos analisar o profiling de memória e performance do PySpark, além de simplificar as stack traces. Quando escreves uma User Defined Function, ou UDF, em PySpark, o teu código Python corre sobre uma infraestrutura de Java Virtual Machine. Se o teu código Python dividir por zero ou referenciar uma dictionary key inexistente, essa simples exception de Python é engolida. É passada de volta pelo daemon do PySpark, através da rede, e encapsulada em enormes exceptions de Java. Encontrar o verdadeiro erro de Python nos teus logs é aborrecido. Podes corrigir isto ativando as tracebacks simplificadas. Quando defines a configuração do Spark para simplified traceback como true, o PySpark altera a forma como reporta os erros. Ele remove todos os logs de interoperabilidade de Java e o ruído dos processos worker. Da próxima vez que uma UDF falhar, a tua consola vai mostrar uma stack trace de Python standard e limpa, a mostrar o número exato da linha no teu ficheiro Python onde a exception ocorreu. Corrigir crashes é apenas metade da batalha. Corrigir código lento ou que consome muita memória é muito mais difícil. Se escreveres uma Pandas UDF que processa milhões de linhas, ela pode correr com sucesso, mas demorar demasiado tempo ou causar erros de out-of-memory nos teus executor nodes. Historicamente, encontrar o bottleneck exigia adicionar logging manual ou adivinhar qual a operação de Pandas que era ineficiente. O Spark 4.0 muda isto ao introduzir profilers de Python UDF integrados. Aqui está o ponto chave. Agora podes fazer o profiling do teu código Python distribuído linha a linha, diretamente no PySpark. Para usares isto, defines a configuração do UDF profiler para um de dois modos: performance ou memory. Se definires a configuração do profiler para a palavra perf, o Spark ativa o performance profiler. A seguir, corres o teu Spark job normalmente. À medida que os worker nodes executam a tua Pandas UDF, o Spark regista o tempo de execução de cada linha da tua função Python. Assim que o teu job terminar, chamas o método show no objeto de profile do Spark. O Spark vai imprimir um relatório detalhado na tua consola. Para cada linha do teu código, vais ver exatamente quantas vezes foi chamada e o tempo total gasto a executá-la. Podes ver instantaneamente se uma manipulação de strings ou operação matemática específica está a atrasar todo o teu pipeline. Se estiveres a lidar com limites de memória, defines a configuração do UDF profiler para a palavra memory. O workflow é exatamente o mesmo, mas o output muda. Quando vês o relatório de profile, o Spark mostra-te o incremento exato em megabytes causado por cada linha do teu código Python. Podes ver exatamente onde grandes arrays estão a ser alocados e onde a memória não está a ser libertada. Esta visibilidade linha a linha elimina as suposições na otimização de transformações de dados complexas. Podes identificar a causa exata dos teus problemas de performance sem saíres do teu ambiente PySpark. Como este é o episódio final da nossa série de PySpark, encorajo-te a consultar a documentação oficial do Spark e a experimentar estas ferramentas de debugging na prática. Se tiveres ideias sobre que tecnologias devemos abordar na nossa próxima série, passa por devstories.eu e diz-nos. Obrigado por passares uns minutos comigo. Até à próxima, e fica bem.