Volver al catálogo
Season 14 21 Episodios 1h 27m 2026

PySpark Fundamentals

v4.1 — Edición 2026. Una guía exhaustiva de PySpark 4.1, que cubre Spark Connect, DataFrames, tipos de datos complejos, transformaciones de datos, SQL, UDFs y profiling.

Big Data Computación distribuida Ciencia de datos
PySpark Fundamentals
Reproduciendo ahora
Click play to start
0:00
0:00
1
El problema del Big Data y la promesa de PySpark
Establecemos la necesidad fundamental de usar PySpark. Descubre por qué las bibliotecas estándar de Python como Pandas fallan a gran escala, y cómo PySpark proporciona un motor de ejecución distribuida para procesar datasets masivos sin problemas.
4m 10s
2
La revolución de Spark Connect
Explora la arquitectura de Spark Connect. Explicamos cómo PySpark ha desacoplado el cliente y el servidor, permitiéndote ejecutar aplicaciones Spark en cualquier lugar sin las pesadas dependencias de la JVM.
4m 09s
3
DataFrames y Lazy Evaluation
Sumérgete en la abstracción fundamental de PySpark: el DataFrame. Hablamos sobre el concepto de lazy evaluation, la diferencia entre transformaciones y acciones, y por qué Spark planifica antes de ejecutar.
4m 37s
4
Creación y visualización de DataFrames
Aprende a instanciar DataFrames a partir de objetos en bruto de Python, diccionarios y archivos, y cómo inspeccionar de forma segura tus datos distribuidos sin bloquear tu nodo driver.
4m 07s
5
Dominando los tipos de datos básicos
Un recorrido por los tipos numéricos y de cadena fundamentales de PySpark. Exploramos cómo definir esquemas explícitamente usando StructType y StructField para crear pipelines de datos robustos.
4m 20s
6
Los peligros de la precisión
Descubre las diferencias críticas entre FloatType, DoubleType y DecimalType. Aprende por qué elegir el tipo numérico equivocado puede introducir errores de redondeo desastrosos en los datos financieros.
5m 01s
7
Domando datos complejos y anidados
El Big Data no siempre es plano. Exploramos los tipos de datos complejos de PySpark, incluyendo ArrayType, StructType y MapType, que te permiten analizar de forma nativa JSON profundamente anidados.
4m 40s
8
Type Casting y selección
Aprende a moldear activamente los esquemas de tus DataFrames. Cubrimos cómo seleccionar subconjuntos de columnas y cómo hacer cast de columnas de un tipo de datos a otro de forma segura.
4m 06s
9
Function Junction: Limpieza de datos sucios
Si entra basura, sale basura. Aprende las transformaciones esenciales de DataFrames para eliminar nulos, rellenar valores faltantes y manejar registros NaN de forma nativa en sistemas distribuidos.
3m 48s
10
Transformación y remodelación de datos
Toma el control de la forma de tus datos. Exploramos cómo generar nuevas columnas con funciones matemáticas, realizar manipulaciones de cadenas y aplanar arrays anidados usando explode.
4m 33s
11
La mecánica de la agrupación y agregación
Domina la estrategia split-apply-combine. Nos sumergimos en la agrupación de datos por claves y en la aplicación de potentes funciones de agregación para resumir datasets masivos.
3m 54s
12
Cuando los DataFrames colisionan: El arte de hacer Joins
Navegando por los matices de combinar datasets. Desglosamos los siete tipos diferentes de join en PySpark y explicamos cómo fusionar DataFrames de forma segura.
3m 29s
13
Viejo SQL, nuevos trucos
¿Por qué aprender una nueva API cuando puedes usar SQL en bruto? Aprende a ejecutar consultas SQL estándar directamente contra DataFrames distribuidos de PySpark.
3m 45s
14
Intercambiando DataFrames y SQL
Mezcla y combina SQL con Python sin problemas. Descubre cómo crear vistas temporales a partir de DataFrames, usar selectExpr y encadenar operaciones programáticas en los resultados de consultas SQL.
3m 53s
15
Extendiendo Spark con UDFs de Python
Cuando las funciones integradas no son suficientes, entran en juego las User-Defined Functions. Exploramos cómo escribir lógica personalizada en Python para DataFrames, y por qué las UDFs escalares estándar esconden una penalización de rendimiento.
4m 04s
16
Acelerando las UDFs con Apache Arrow
Elimina el cuello de botella de la serialización de JVM a Python. Descubrimos cómo las Vectorized Pandas UDFs y los formatos de memoria de Apache Arrow potencian tus transformaciones personalizadas.
3m 43s
17
Explotando filas con UDTFs de Python
Las UDFs estándar devuelven un valor por fila, pero ¿qué pasa si necesitas múltiples filas? Aprende cómo las User-Defined Table Functions (UDTFs) de Python resuelven problemas complejos de generación de uno a muchos.
4m 20s
18
La API de Pandas en Spark
Escala tus scripts de Pandas existentes hasta el infinito. Descubre cómo la API pyspark.pandas te permite ejecutar sintaxis estándar de Pandas de forma nativa en un clúster distribuido de Spark.
4m 19s
19
Carga y descubre: Formatos de almacenamiento
No todos los formatos de archivo son iguales. Contrastamos los CSVs basados en filas con formatos columnares como Parquet y ORC, explorando opciones de lectura/escritura y técnicas de almacenamiento óptimas.
4m 01s
20
Cazando bugs: Planes físicos y Joins
Echa un vistazo bajo el capó del motor de ejecución de Spark. Aprende a depurar consultas usando DataFrame.explain() y cómo eliminar costosos shuffles utilizando Broadcast joins.
3m 48s
21
Profiling de memoria y rendimiento en PySpark
Concluimos nuestro viaje por PySpark presentando herramientas de profiling nativas. Aprende a rastrear el consumo de memoria línea por línea y a exponer tracebacks internos ocultos de Python.
4m 41s

Episodios

1

El problema del Big Data y la promesa de PySpark

4m 10s

Establecemos la necesidad fundamental de usar PySpark. Descubre por qué las bibliotecas estándar de Python como Pandas fallan a gran escala, y cómo PySpark proporciona un motor de ejecución distribuida para procesar datasets masivos sin problemas.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 1 de 21. Tu script estándar de Python funciona perfectamente en las pruebas, pero en cuanto tu dataset alcanza los cincuenta gigabytes, falla con un error de OutOfMemory. Has llegado a los límites físicos de una sola máquina. La solución a este cuello de botella es el tema central de este episodio: el problema del big data y la promesa de PySpark. Las herramientas de datos estándar de Python están diseñadas para ejecución en un solo nodo. Librerías como pandas son increíblemente eficientes, pero requieren que todo el dataset resida en la memoria local. Si tu servidor tiene dieciséis gigabytes de RAM e intentas cargar cincuenta gigabytes de logs de la aplicación, el sistema operativo interviene y mata el proceso. Escalar verticalmente alquilando un servidor más grande y caro solo retrasa lo inevitable. Los datos crecen más rápido que las actualizaciones de hardware. Tarde o temprano, los datos superan la capacidad de la máquina. PySpark resuelve esta limitación. Es la API de Python para Apache Spark. El propio Apache Spark es un motor de computación distribuida que se ejecuta en la Java Virtual Machine. PySpark actúa como puente, permitiéndote escribir tu lógica completamente en Python mientras aprovechas el motor distribuido altamente optimizado de Spark. Esto cambia tu arquitectura de escalado vertical a escalado horizontal. En lugar de depender de una única máquina enorme, PySpark particiona tus datos y distribuye tus cálculos a través de un cluster de muchas máquinas más pequeñas, conocidas como nodos. Escribes tu código Python y PySpark lo traduce a un plan de ejecución paralela. Si tu volumen de datos se duplica el mes que viene, no tienes que reescribir ni una sola línea de código. Simplemente añades más nodos al cluster. El ecosistema de PySpark se organiza en unos pocos módulos principales diseñados para diferentes cargas de trabajo. El primero es Spark SQL. Esta es la base de la mayoría de las aplicaciones modernas de PySpark. Proporciona una estructura DataFrame para gestionar datos tabulares distribuidos en varias máquinas. También te permite ejecutar consultas SQL estándar directamente sobre estos datasets distribuidos. El siguiente es Structured Streaming. Este módulo gestiona pipelines de datos en tiempo real. En lugar de procesar un batch masivo de datos durante la noche, Structured Streaming procesa continuamente flujos de registros, como lecturas de sensores en vivo o eventos de tráfico web. Utiliza exactamente el mismo modelo de programación que Spark SQL, lo que significa que tu lógica de procesamiento batch y tu lógica de streaming son prácticamente idénticas. Luego está MLlib, la librería de Machine Learning. Entrenar modelos con datasets masivos en una sola máquina es un cuello de botella muy conocido. MLlib proporciona algoritmos de machine learning distribuidos para tareas como clasificación, regresión y clustering. Distribuye las operaciones matemáticas pesadas por todo el cluster, reduciendo drásticamente el tiempo de entrenamiento. Aquí reside la clave. El verdadero poder de PySpark es la abstracción. Nunca divides manualmente tus archivos masivos en chunks. Nunca escribes código de red para coordinar los servidores. Simplemente defines una secuencia lógica de transformaciones, y el motor subyacente gestiona la distribución de los datos, la ejecución en paralelo e incluso el proceso de recuperación si un nodo pierde energía a mitad del cálculo. PySpark no es simplemente una utilidad para abrir archivos más grandes. Es un cambio fundamental: de una computación limitada por una sola placa base a una computación limitada únicamente por el tamaño de tu cluster. Si estos episodios te resultan útiles y quieres apoyar el programa, puedes buscar DevStoriesEU en Patreon. Eso es todo por este episodio. ¡Gracias por escuchar y sigue programando!
2

La revolución de Spark Connect

4m 09s

Explora la arquitectura de Spark Connect. Explicamos cómo PySpark ha desacoplado el cliente y el servidor, permitiéndote ejecutar aplicaciones Spark en cualquier lugar sin las pesadas dependencias de la JVM.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 2 de 21. Durante años, escribir código PySpark en local implicaba cargar con una Java Virtual Machine enorme y pesada solo para probar un simple script. Tenías que sincronizar perfectamente las versiones de Python, las configuraciones de Java y las dependencias del cluster antes de escribir una sola línea de lógica. La revolución de Spark Connect hace que eso quede totalmente obsoleto. Tradicionalmente, PySpark dependía de una arquitectura fuertemente acoplada. Tu script de Python y el motor de ejecución de Spark tenían que coexistir exactamente en la misma máquina física o virtual. Lanzar una sesión de PySpark significaba levantar una Java Virtual Machine en segundo plano usando una librería puente. Esta arquitectura sobrecargaba tu entorno de desarrollo local con todo el peso del motor de ejecución de Spark. Esto hacía que integrar PySpark en aplicaciones web, editores de código modernos o dispositivos edge fuera muy poco práctico. Spark Connect resuelve esto introduciendo una arquitectura cliente-servidor desacoplada. Tu entorno de Python ahora está estrictamente separado del servidor de Spark. El cliente local de PySpark se convierte en una librería ligera. Ya no requiere una instalación local de Java y no ejecuta las tareas de procesamiento de datos por sí mismo. Actúa puramente como una interfaz remota hacia el cluster real de Spark. Aquí está la clave. Cuando escribes operaciones de DataFrame con Spark Connect, el cliente ligero registra tus llamadas a métodos y las traduce en un plan lógico sin resolver. Puedes imaginarte este plan como un esquema abstracto de tu query, que describe estrictamente qué datos procesar sin preocuparte de cómo se procesan. El cliente empaqueta este esquema usando Protocol Buffers y lo transmite a través de una conexión de red gRPC al servidor remoto de Spark. El servidor desempaqueta el plan, gestiona toda la optimización compleja de la query, ejecuta el job en todo el cluster y, finalmente, devuelve los resultados calculados en forma de stream a tu script de Python. Configurar esto requiere un pequeño cambio en cómo inicias tu aplicación. Sigues usando el builder de SparkSession, pero en lugar de depender de configuraciones locales, llamas al método remote. Proporcionas un connection string que detalla dónde se encuentra el servidor de Spark. Este string utiliza un esquema de conexión dedicado que empieza con las letras s c. Así que, si te estás conectando a un servidor de pruebas local en el puerto por defecto, proporcionas el string s c dos puntos barra barra localhost dos puntos uno cinco cero cero dos. Después de ese único paso de conexión, escribes tu código de DataFrame de la misma forma que lo has hecho siempre. Como la ejecución es totalmente remota, puedes conectar múltiples clientes de Python diferentes, desde distintas aplicaciones, exactamente al mismo servidor de Spark de forma simultánea. El código de tu aplicación simplemente pide transformaciones de datos, y el trabajo pesado se queda completamente en el lado del servidor. Al aislar por completo el cliente de Python del runtime de ejecución, Spark Connect elimina los famosos conflictos de dependencias que solían romper los deploys, permitiéndote actualizar los entornos de tu aplicación de forma totalmente independiente del propio cluster de Spark. Gracias por pasar unos minutos conmigo. Hasta la próxima, cuídate.
3

DataFrames y Lazy Evaluation

4m 37s

Sumérgete en la abstracción fundamental de PySpark: el DataFrame. Hablamos sobre el concepto de lazy evaluation, la diferencia entre transformaciones y acciones, y por qué Spark planifica antes de ejecutar.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 3 de 21. ¿Qué pasaría si tu código no se ejecutara realmente al escribirlo, sino que esperara, analizara tu objetivo final y trazara la ruta más rápida posible? Encadenas filtros, agregaciones y joins, y tu máquina apenas se inmuta. Eso es porque no hace nada hasta que le obligas a actuar. Este mecanismo se llama lazy evaluation, y es el motor principal detrás de los DataFrames de PySpark. Un DataFrame de PySpark es una colección distribuida de datos organizada en columnas con nombre. Si estás familiarizado con pandas, el concepto es idéntico. La diferencia es que un DataFrame de PySpark divide sus datos entre múltiples compute nodes en un cluster. Históricamente, la estructura fundamental en Spark era el Resilient Distributed Dataset, comúnmente conocido como RDD. El ecosistema se ha alejado enormemente de la manipulación raw de RDDs. De hecho, a partir de la versión 4.0 de Spark, el uso directo de RDDs ya no está soportado en Spark Connect. Los DataFrames son ahora el estándar definitivo, proporcionando una API estricta que permite a Spark optimizar automáticamente tus queries. Esa optimización se basa completamente en la lazy evaluation. Cada operación que realizas en un DataFrame entra en una de dos categorías estrictas: una transformation o una action. Las transformations son comandos que devuelven un nuevo DataFrame. Algunos ejemplos incluyen seleccionar columnas específicas, filtrar filas según una condición, agrupar registros o hacer un join de dos tablas distintas. Cuando aplicas una transformation, PySpark no ejecuta el procesamiento de datos. Simplemente registra la operación. Actualiza un plano interno llamado logical execution plan. Puedes escribir cincuenta transformations consecutivas, y Spark simplemente validará rápidamente la sintaxis y actualizará su graph. Aquí está la clave. Al retrasar la ejecución real, PySpark le da a su query engine subyacente, el Catalyst Optimizer, la imagen completa de tu data pipeline. El optimizer inspecciona toda tu chain de transformations, las reorganiza para lograr la máxima eficiencia y elimina los pasos innecesarios por completo antes de que se lea un solo byte de datos del disco. Este plano permanece completamente inactivo hasta que invocas una action. Una action es un comando que exige un resultado concreto. O bien devuelve datos a tu driver program o escribe datos en el storage. Las actions comunes incluyen contar el número total de filas, hacer un collect de los datos en una lista local de Python, u ordenar al sistema que muestre los veinte primeros registros en tu pantalla. En el momento en que haces trigger de una action, el motor se pone en marcha. Traduce tu logical plan optimizado a un physical plan, distribuye las tareas a los workers del cluster y ejecuta el cálculo. Considera un data workflow estándar. Primero, creas un DataFrame apuntando a un archivo enorme. Luego, haces un join con una tabla separada de detalles de usuario. Después del join, filtras los resultados para incluir solo usuarios de una ciudad específica. Finalmente, le pides a Spark que muestre el output. Debido a la lazy evaluation, Spark no carga realmente el archivo completo, realiza un join distribuido masivo y luego filtra los resultados al final. En cambio, el optimizer examina tu petición final, detecta el filter y empuja esa operación de filter hacia arriba en la chain, mucho antes de que ocurra el join. Lee selectivamente solo los registros relevantes, reduciendo drásticamente el uso de memoria y el tráfico de red en todo el cluster. Tu script de PySpark nunca es una secuencia de comandos inmediatos. Es un conjunto de instrucciones que esbozan un plano arquitectónico, y el sistema solo comienza la construcción cuando finalmente exiges el resultado final. Eso es todo por hoy. Gracias por escuchar. Ve y construye algo genial.
4

Creación y visualización de DataFrames

4m 07s

Aprende a instanciar DataFrames a partir de objetos en bruto de Python, diccionarios y archivos, y cómo inspeccionar de forma segura tus datos distribuidos sin bloquear tu nodo driver.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 4 de 21. Llamar a un método específico en un dataset masivo es una forma garantizada de hacer crashear toda tu aplicación al instante con un error de out-of-memory. Saber cómo mover datos de forma segura hacia y desde Spark sin reventar tu driver node es fundamental. Eso es exactamente de lo que trata este episodio: crear y visualizar DataFrames. Toda aplicación de PySpark necesita datos con los que trabajar. Generalmente, creas DataFrames de tres maneras. Primero, puedes crearlos directamente a partir de estructuras de Python en memoria. Simplemente defines una lista de diccionarios, donde cada diccionario representa una fila y las keys son los nombres de las columnas, y se la pasas al método createDataFrame de tu SparkSession. Segundo, si ya tienes un DataFrame de pandas en memoria, puedes pasar ese mismo objeto de pandas al mismo método createDataFrame. PySpark gestiona la conversión automáticamente. La tercera forma, y la más común, es leer desde archivos externos. Usas el atributo read en tu SparkSession, seguido del formato que quieras, como csv o json, y proporcionas la ruta del archivo. Una vez que tus datos están cargados, necesitas verificarlos. Los DataFrames de PySpark son distribuidos, lo que significa que no puedes simplemente hacer un print de la variable y ver los datos como lo harías en un script de Python estándar. Para ver la estructura de tus datos, llamas al método printSchema. Esto genera un árbol de texto que muestra el nombre de cada columna y su tipo de dato correspondiente. Es la forma más rápida de comprobar que tu archivo se ha cargado correctamente. Para ver el contenido real, usas el método show. Por defecto, llamar a show muestra las primeras veinte filas en formato tabular. Presta atención a esta parte. Si tus columnas contienen strings largos, el método show los trunca. Puedes desactivar esto pasando un argumento truncate en false, o configurarlo con un número específico de caracteres. Si tu DataFrame tiene docenas de columnas, la vista de tabla estándar hace wrap en la pantalla y se vuelve ilegible. En ese caso, puedes pasar el argumento vertical en true. Esto imprime cada fila como un bloque vertical de pares key-value, haciendo que los datasets anchos sean mucho más fáciles de leer en una terminal. Ahora, llegamos al crash por out-of-memory que mencionamos antes. A veces necesitas traer los datos distribuidos de vuelta a objetos de Python normales. El método para hacer esto se llama collect. Aquí está la clave. El método collect coge cada fila de cada executor en todo tu clúster y las mete a la fuerza en la memoria de tu único driver node. Si tu DataFrame contiene mil millones de filas, tu driver se quedará sin memoria y crasheará al instante. Solo deberías llamar a collect cuando hayas agregado o filtrado tus datos hasta un tamaño pequeño. Cuando trabajes con datasets grandes, extrae siempre muestras más pequeñas. En lugar de collect, usa el método take, pasándole el número de filas que quieres. Esto devuelve una lista estándar de Python que contiene solo esas primeras filas. Si necesitas comprobar el final de tu dataset, usa el método tail para coger las últimas filas. Ambos métodos limitan de forma segura la cantidad de datos transferidos a tu driver. La regla para los datos distribuidos es simple: envía los cálculos al clúster, pero limita estrictamente el número de filas que traes de vuelta al driver. Eso es todo por este episodio. Gracias por escuchar, ¡y sigue programando!
5

Dominando los tipos de datos básicos

4m 20s

Un recorrido por los tipos numéricos y de cadena fundamentales de PySpark. Exploramos cómo definir esquemas explícitamente usando StructType y StructField para crear pipelines de datos robustos.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 5 de 21. Confiar en la inferencia automática de schema puede ahorrarte algunas líneas de código, pero te saldrá muy caro en el rendimiento en producción. El cluster a menudo tiene que leer todo tu dataset solo para adivinar qué hay dentro antes de hacer ningún trabajo real. Esto lo solucionas dominando los tipos de datos básicos y los schemas explícitos. Es común confundir los tipos estándar de Python con los tipos de datos de PySpark. Cuando declaras un integer o un string en Python estándar, ese objeto vive en la memoria de tu máquina local. Los tipos de PySpark operan a un nivel completamente distinto. Son instrucciones de mapeo para el optimizador Catalyst y la Java Virtual Machine subyacente. Cuando usas tipos de datos de PySpark, estás definiendo una estructura estricta y adaptada al cluster. Esto garantiza la consistencia de los datos a través de cientos de worker nodes distribuidos y dicta exactamente cómo se serializan los datos por la red. PySpark proporciona un tipo específico para cada forma de datos estándar, y elegir el correcto es crucial para el rendimiento. Para números, tienes ByteType para integers muy pequeños, IntegerType para números estándar y LongType para valores grandes. Elegir ByteType en lugar de LongType para un simple código de estado ahorra muchísima memoria cuando esa decisión se multiplica por miles de millones de filas. Para texto y lógica, usas StringType y BooleanType. Manejar el tiempo correctamente es otra área donde el tipado exacto importa. PySpark divide los datos temporales en DateType y TimestampType. Usas DateType cuando solo te importa la fecha del calendario, como el cumpleaños de un usuario. Usas TimestampType cuando necesitas puntos exactos en el tiempo, registrando tanto la fecha como la hora, el minuto y el segundo exactos en los que ocurrió un evento. Conocer estos tipos es solo la base. Tienes que aplicarlos directamente a tu proceso de ingesta de datos usando un schema explícito. Construyes este schema usando dos objetos específicos: StructType y StructField. Puedes pensar en un StructType como el plano de una fila entera de tu dataframe. Un StructField es el plano de una sola columna dentro de esa fila. Para construir un schema explícito, instancias un StructType y le pasas una colección de StructFields. Cada StructField requiere tres argumentos específicos. Primero, proporcionas el nombre de la columna como un string estándar. Segundo, pasas el tipo de datos específico de PySpark que quieres forzar, como IntegerType o StringType. Tercero, proporcionas un flag booleano que indica si a esta columna se le permite contener valores nulos. Por ejemplo, construyes un schema empezando con un StructField llamado user identifier, asignado a un StringType, y pones el flag de nulo en false. Después sigues con un StructField llamado account age, asignado a un IntegerType, poniendo el flag de nulo en true. Una vez que este objeto StructType está completamente ensamblado, se lo pasas directamente a tu lector de dataframes usando el método schema antes de llamar al comando load para leer tus archivos. Esta es la parte que importa. Cuando proporcionas este schema explícito por adelantado, PySpark se salta por completo la fase de escaneo de datos. Aplica tu plano directamente al data stream entrante. Esto reduce drásticamente el tiempo que tarda en leer un archivo. También actúa como un control de calidad inmediato. Si llega un archivo mal formado con texto en tu columna de integers, el pipeline lo maneja basándose en tu estructura definida en lugar de desplazar silenciosamente el schema inferido downstream y romper tus transformaciones. Definir tu schema explícitamente transforma una operación de lectura frágil y costosa en un paso del pipeline predecible y altamente optimizado. ¡Gracias por escuchar, feliz programación a todos!
6

Los peligros de la precisión

5m 01s

Descubre las diferencias críticas entre FloatType, DoubleType y DecimalType. Aprende por qué elegir el tipo numérico equivocado puede introducir errores de redondeo desastrosos en los datos financieros.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 6 de 21. Usar un float estándar puede parecer inofensivo, hasta que tu query de agregación calcula erróneamente, y en silencio, millones en transacciones financieras. Un código que funciona a la perfección puede producir números ligera y peligrosamente incorrectos. Por eso mismo tenemos que hablar de los peligros de la precisión. En PySpark, tienes tres formas principales de almacenar números con partes fraccionarias: FloatType, DoubleType y DecimalType. No son intercambiables. Un error común es dejar que PySpark infiera un schema a partir de tus datos en crudo. La inferencia suele asignar DoubleType a cualquier número con un punto decimal. Si estás calculando ingresos financieros, confiar en este comportamiento por defecto es un grave riesgo operativo. Para entender por qué, tenemos que ver cómo funcionan FloatType y DoubleType por debajo. FloatType utiliza matemáticas de coma flotante IEEE 754 de 32 bits. DoubleType utiliza la versión de 64 bits del mismo estándar. Ambos representan los números como fracciones binarias. Piensa en cómo la fracción un tercio no se puede escribir perfectamente usando decimales en base diez. Se convierte en una cadena interminable de treses. Exactamente la misma limitación existe en binario. Los números decimales comunes, como cero coma uno o cero coma dos, no se pueden representar perfectamente en base dos. El ordenador almacena una minúscula aproximación. Con DoubleType, tienes 64 bits de espacio, lo que significa que la aproximación es increíblemente cercana al número real. Si haces una query a una sola fila de datos, rara vez notarás la diferencia. Aquí está la clave. El error se acumula durante las agregaciones. Cuando calculas los ingresos financieros totales sumando miles de millones de filas individuales, esas imprecisiones microscópicas se van sumando. Una fracción de céntimo perdida o ganada en cada transacción acaba distorsionando el total agregado final en miles o incluso millones de dólares. Tu lógica de agregación es matemáticamente correcta, pero el tipo de dato subyacente corrompe el resultado. Si tu sistema está calculando simulaciones físicas o entrenando modelos de machine learning, FloatType y DoubleType son exactamente lo que buscas. Sacrifican la exactitud a cambio de un procesamiento de hardware a alta velocidad. Pero en el momento en que manejas dinero, necesitas una precisión estricta e inquebrantable. Esto nos lleva a DecimalType. DecimalType no utiliza aproximaciones de coma flotante. Almacena los números exactamente como los defines, usando una escala fija. Cuando configuras un DecimalType, defines dos parámetros distintos. Primero, especificas la precisión, que es el número máximo total de dígitos que puede contener el valor. Segundo, especificas la escala, que dicta el número exacto de dígitos permitidos a la derecha del punto decimal. Si configuras un DecimalType con una precisión de diez y una escala de dos, PySpark asigna el espacio exacto necesario para almacenar ese valor hasta el último céntimo. No hay fracciones binarias ni redondeos aproximados. En la práctica, implementas esto tomando un control estricto de tus schemas. Al leer registros financieros de un archivo fuente, no dejes que PySpark adivine los tipos. Primero, creas un objeto schema estricto. Luego, defines tus campos financieros, como ingresos o impuestos. Finalmente, les asignas explícitamente un DecimalType con la precisión y escala que hayas elegido. Una vez que tu dataframe se carga con este schema, tus agregaciones estándar de suma o promedio se ejecutarán a la perfección desde la primera fila hasta la milmillonésima. Sacrificas un poco de rendimiento de computación en comparación con un DoubleType estándar, pero garantizas que tus informes financieros sean absolutamente impecables. La regla es simple: usa tipos de coma flotante para mayor velocidad y aproximaciones científicas, pero en el momento en que un número represente moneda, asegúralo con un DecimalType. Gracias por escuchar. ¡Hasta la próxima!
7

Domando datos complejos y anidados

4m 40s

El Big Data no siempre es plano. Exploramos los tipos de datos complejos de PySpark, incluyendo ArrayType, StructType y MapType, que te permiten analizar de forma nativa JSON profundamente anidados.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 7 de 21. El big data del mundo real rara vez es una hoja de cálculo plana. A veces, necesitas un array de diccionarios anidados solo para parsear un único evento JSON. Para manejar esto, tenemos que hablar sobre cómo dominar los datos complejos y anidados. Los workflows relacionales prefieren tablas planas, pero los datos de eventos modernos llegan muy anidados. PySpark gestiona esto proporcionando tres tipos de datos complejos. Estos son ArrayType, StructType y MapType. Estos te permiten modelar explícitamente estructuras jerárquicas de forma nativa en el engine. Toma un perfil de cliente estándar para ver cómo funcionan estos tipos. El primer concepto es ArrayType. Este representa una colección de elementos. La regla estricta es que cada item dentro de un ArrayType debe compartir exactamente el mismo tipo de dato subyacente. No puedes mezclar strings e integers dentro del mismo array. Si tu perfil de cliente incluye una lista de IDs de pedidos recientes, defines esa columna como un ArrayType que contiene integers. El siguiente es StructType. Un StructType modela un registro jerárquico anidado, funcionando básicamente como una row incrustada dentro de otra row. Contiene fields específicos con nombre. A diferencia de un array, cada field dentro de un StructType puede tener un tipo de dato completamente distinto. Supón que tu cliente tiene una dirección. Esa dirección contiene el nombre de la calle como string, el código postal como integer, y un flag boolean que indica si es una propiedad comercial. Agrupas estos fields distintos en un único StructType. Aquí está la clave. Puedes anidar estos tipos complejos con la profundidad que quieras. Si un cliente tiene varias direcciones, no creas columnas planas numeradas. En su lugar, creas un ArrayType donde el tipo de elemento interno es exactamente ese StructType de dirección. Ahora tienes un array de structs, que mapea perfectamente con un array JSON estándar de objetos. La tercera estructura es MapType, diseñada específicamente para pares key-value. Se diferencia de un StructType en cómo maneja la estructura frente al schema. Un StructType requiere que hardcodees los nombres exactos de los fields por adelantado. Un MapType es flexible con el contenido de sus datos, pero estricto con sus tipos de datos. Cada key en el map debe ser de un tipo específico, y cada value debe ser de otro tipo específico. Podrías usar un MapType para almacenar las preferencias de la aplicación del cliente. Las keys podrían ser strings, como tema o idioma, y los values también podrían ser strings, como oscuro o inglés. Como es un MapType, la aplicación upstream puede inyectar keys de preferencia totalmente nuevas más adelante sin obligarte a alterar el schema central del DataFrame. Simplemente consultas los values dinámicamente por sus keys. Cuando construyes este schema complejo en tu código, lo montas de dentro hacia afuera. Primero, defines los fields internos del StructType de la dirección. Luego, pasas ese struct completado a una definición de ArrayType. A continuación, defines el MapType para las preferencias del usuario. Finalmente, envuelves todos estos componentes, junto con tipos escalares simples como el string del nombre del cliente, en un StructType maestro que define la row general del DataFrame. En lugar de aplanar estructuras anidadas en strings JSON desordenados, definir explícitamente estos schemas complejos permite al optimizador de Spark podar datos y filtrar en profundidad dentro de los fields anidados sin deserializar todo el payload en memoria. Gracias por escuchar. ¡Nos vemos en la próxima!
8

Type Casting y selección

4m 06s

Aprende a moldear activamente los esquemas de tus DataFrames. Cubrimos cómo seleccionar subconjuntos de columnas y cómo hacer cast de columnas de un tipo de datos a otro de forma segura.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 8 de 21. Un simple valor string oculto en una columna de enteros puede paralizar un cluster de mil nodos. Necesitas una forma fiable de garantizar estructuras de datos correctas y elegir exactamente qué datos se mueven por tu pipeline, por lo que hoy vamos a ver el Type Casting y la selección. Para manipular datos en PySpark, primero tienes que entender qué es realmente una columna. Una instancia de columna no es un array físico de datos cargado en memoria. Es una representación de una expresión evaluada de forma lazy. Cuando haces referencia a una columna en tu código, no estás tocando los datos subyacentes. Simplemente estás añadiendo un paso al logical plan de Spark. Los datos solo se mueven cuando se lanza una action más adelante. Para recuperar y dar forma a estos datos, usas el método select en tu DataFrame. Tienes dos formas principales de decirle al método select qué columnas quieres. La forma más sencilla es pasar los nombres de las columnas como strings de texto estándar. Si le pasas un string a select, Spark devuelve un nuevo DataFrame que contiene exactamente esa columna, completamente intacta. Esto funciona bien para una extracción básica, pero no deja margen para modificaciones. Para modificar los datos durante la selección, tienes que usar objetos Column en lugar de strings. Accedes a un objeto Column referenciándolo directamente desde el DataFrame. Puedes hacerlo usando dot notation, como dataframe punto age, o usando bracket notation con el nombre de la columna como un string dentro de los corchetes. La bracket notation es especialmente útil cuando los nombres de tus columnas contienen espacios o caracteres especiales que romperían la dot notation estándar. Esta es la parte que importa. Cuando pasas un objeto Column al método select, puedes encadenarle métodos para transformar los datos al vuelo. Una de las transformaciones más críticas es la conversión de tipos. Los datos a menudo llegan en el formato equivocado. Por ejemplo, podrías recibir métricas numéricas formateadas como strings de texto. Para corregir esto, usas el método cast. PySpark también ofrece un alias llamado astype, que ejecuta exactamente la misma lógica. Llamas al método cast directamente sobre tu objeto Column dentro del statement select. El método cast requiere un argumento, que es el tipo de dato de destino. Puedes definir este destino pasando una representación en string del tipo, como la palabra int, o pasando un objeto de tipo de dato específico de Spark, como IntegerType. Así es como fluye esto en un script real. Llamas al método select en tu DataFrame. Dentro de los paréntesis de ese método, referencias tu columna de destino usando bracket notation. Justo al lado de esa referencia de columna, llamas a punto cast y le pasas tu nuevo tipo. Al evaluarse, esto devuelve un DataFrame completamente nuevo donde tu columna seleccionada ahora está convertida de forma segura al tipo especificado. El DataFrame original permanece totalmente intacto porque los DataFrames son inmutables. La conclusión clave es que el Type Casting en PySpark no es un proceso independiente que se aplica a un dataset existente in place. Es una expresión de columna evaluada de forma lazy, intrínsecamente ligada al acto de seleccionar datos para construir un nuevo DataFrame fuertemente tipado. Si disfrutas del podcast y quieres ayudar a apoyar el programa, puedes buscar DevStoriesEU en Patreon. Eso es todo por este episodio. ¡Gracias por escuchar, y sigue construyendo!
9

Function Junction: Limpieza de datos sucios

3m 48s

Si entra basura, sale basura. Aprende las transformaciones esenciales de DataFrames para eliminar nulos, rellenar valores faltantes y manejar registros NaN de forma nativa en sistemas distribuidos.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 9 de 21. Si entra basura, sale basura. Pero, ¿qué haces cuando tu dataset basura tiene cientos de terabytes de tamaño y no puedes inspeccionar manualmente ni una sola fila? Necesitas una forma sistemática de sanearlo a gran escala. Eso es exactamente lo que cubrimos hoy en Function Junction: Limpiando datos sucios. El primer paso en la limpieza suele ser estandarizar tu schema. A menudo recibirás ficheros raw con espacios, caracteres especiales o errores tipográficos en las cabeceras. Usa el método llamado withColumnRenamed. Simplemente le pasas el string del nombre antiguo y el string del nuevo nombre deseado. Si tienes varias columnas que arreglar, haces chain de este método secuencialmente para cada columna antes de aplicar cualquier transformación compleja downstream. Antes de eliminar datos erróneos, debemos aclarar una confusión frecuente sobre null y NaN en PySpark. Null significa que falta un dato por completo. NaN significa Not a Number, que representa un resultado matemático indefinido, como dividir cero entre cero. En Python puro, estos requieren un manejo por separado. Sin embargo, PySpark los agrupa por comodidad. Cuando usas las funciones NA del dataframe, Spark evalúa los valores NaN como nulls con el propósito de hacer drop o fill. Para eliminar filas con valores faltantes, usas el método NA punto drop. Llamar a esta función completamente vacía hace drop de cualquier fila que contenga un null o NaN en cualquier columna. Este enfoque es muy destructivo en datasets anchos. Un solo valor faltante en una columna de metadatos opcional borrará una fila de datos de transacción por lo demás perfectos. Para evitar esto, pasa una lista de nombres de columnas al parámetro subset. PySpark entonces evaluará solo esas columnas críticas y específicas al decidir si hace drop de la fila. Hacer drop de filas no siempre está permitido por las reglas de negocio. A menudo, tienes que reemplazar los valores faltantes con valores por defecto seguros. Esto lo consigues usando NA punto fill. Aunque puedes pasar un único valor para hacer fill en todas las columnas, el mejor enfoque es pasar un diccionario. Las keys del diccionario representan los nombres de las columnas específicas, y los values representan tus reemplazos elegidos. Este patrón te permite hacer fill de una métrica numérica faltante con un cero, mientras reemplazas simultáneamente una categoría faltante con un string de texto como unknown. Hacer esto mediante un diccionario se ejecuta en una sola pasada, lo cual es muy eficiente. Finalmente, puede que tus datos estén completamente poblados pero sigan siendo inválidos. Los outliers y los valores físicamente imposibles requieren un filtrado lógico. Puedes aislar los datos buenos usando el método where para quedarte solo con las filas que cumplen una condición específica. Para límites numéricos o de fechas, el método between es tu mejor herramienta. Seleccionas tu columna, llamas a between y proporcionas los límites inferior y superior. Esto reemplaza la lógica verbosa de mayor que y menor que, haciendo que tu código sea más fácil de leer. Cualquier fila que caiga fuera de esos límites se filtra del dataframe resultante. Aquí está la clave. El orden importa muchísimo al limpiar a gran escala. Siempre renombra las columnas primero para fijar tu schema, haz drop o fill de los valores faltantes después para estabilizar tus tipos de datos, y filtra los outliers al final, solo cuando sepas que los datos subyacentes son estructuralmente sólidos. Eso es todo por este episodio. Gracias por escuchar, ¡y sigue construyendo!
10

Transformación y remodelación de datos

4m 33s

Toma el control de la forma de tus datos. Exploramos cómo generar nuevas columnas con funciones matemáticas, realizar manipulaciones de cadenas y aplanar arrays anidados usando explode.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 10 de 21. A veces, una sola fila de datos contiene un array de registros ocultos, y necesitas detonar ese array para analizarlo correctamente. Transformar y reestructurar los datos es cómo desempaquetas, formateas y estructuras esa información para su procesamiento posterior. Cuando necesitas modificar un dataframe en PySpark, no cambias los datos in place. Los dataframes son inmutables. En su lugar, creas nuevas versiones usando un método llamado withColumn. Este método recibe dos argumentos. El primero es un string que representa el nombre de la columna que quieres crear o reemplazar. El segundo es una expresión de columna que define los datos reales. Si pasas un nombre que ya existe en el dataframe, PySpark sobrescribe la columna original. Si el nombre es completamente nuevo, PySpark añade la nueva columna al lado derecho de tu dataset. Para definir qué va en esa nueva columna, normalmente usas las funciones integradas de PySpark. Estas se importan del módulo de funciones SQL y proporcionan operaciones altamente optimizadas que se ejecutan en todo tu cluster. Piensa en la manipulación de strings. Los datos de texto de fuentes externas rara vez vienen con un formato perfecto. Puede que tengas una columna con nombres de usuario escritos en una mezcla impredecible de mayúsculas y minúsculas. Puedes arreglar esto pasando tu columna existente a una función integrada como lower, que fuerza todo el texto a minúsculas. Alternativamente, puedes usar una función de capitalización para asegurarte de que la primera letra sea mayúscula y el resto minúsculas. En la práctica, integras estas operaciones directamente en las transformaciones de tu dataframe. Llamas a withColumn, nombras tu columna de destino y le asignas el resultado de la función lower aplicada a tu columna de entrada. PySpark evalúa esta expresión para cada una de las filas. Puedes encadenar varias llamadas a withColumn para aplicar varias transformaciones secuencialmente, pasando el dataframe actualizado progresivamente al siguiente paso cada vez. Ahora bien, la segunda parte de esto es el reshaping. Limpiar strings cambia los valores, pero ¿qué pasa cuando la forma fundamental de tus datos te impide analizarlos? Aquí es donde la cosa se pone interesante. Puede que recibas un dataset donde el identificador de una persona está en una columna, y sus ingresos mensuales de todo el año están empaquetados en un solo array en la columna de al lado. No puedes ejecutar agregaciones relacionales estándar en un array anidado. Necesitas cada valor de ingreso individual en su propia fila para calcular medias o encontrar mínimos. Resuelves este problema estructural usando una función integrada llamada explode. La función explode maneja específicamente arrays y maps. Llamas a withColumn, especificas el nombre de la columna que quieres para la salida, y pasas la función explode envolviendo tu columna de array. PySpark ejecuta esto cogiendo la única fila original y abriéndola. Si el array de ingresos contiene doce valores distintos, explode genera doce filas completamente separadas. En el nuevo dataframe, la columna de destino ahora contiene un único valor de ingreso plano por fila en lugar de una lista. Lo que es crucial es que PySpark duplica todas las demás columnas de la fila original. El identificador de usuario se copia exactamente en las doce nuevas filas. La relación lógica entre el usuario y sus ingresos permanece perfectamente intacta, pero los datos ahora son planos. Has reestructurado una estructura anidada en una tabla larga lista para operaciones estándar de agrupación y filtrado. El verdadero poder de las transformaciones de PySpark es que funciones como explode y lower no solo manipulan valores individuales; definen un plan de computación lógico que escala instantáneamente tanto si tienes cien filas como cien mil millones de filas, sin requerir nunca que escribas un solo bucle manual. Eso es todo por este episodio. ¡Nos vemos en la próxima!
11

La mecánica de la agrupación y agregación

3m 54s

Domina la estrategia split-apply-combine. Nos sumergimos en la agrupación de datos por claves y en la aplicación de potentes funciones de agregación para resumir datasets masivos.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 11 de 21. Cuando te enfrentas a miles de millones de registros individuales, leerlos fila por fila es imposible. Para extraer información útil, tienes que resumirlos. Hoy veremos exactamente cómo ocurre esto: la mecánica de la agrupación y la agregación. Internamente, PySpark procesa las agregaciones mediante una estrategia de datos clásica llamada split-apply-combine. Este patrón es exactamente lo que parece. Primero, PySpark divide el enorme dataset en distintos buckets lógicos basándose en una key que tú elijas. A continuación, aplica un cálculo específico a cada bucket de forma independiente en todo el cluster. Finalmente, combina esas respuestas independientes en un único resultado resumido. En tu código, activas la fase de split llamando al método group by en tu DataFrame. Simplemente proporcionas el nombre de la columna que quieres usar como tu grouping key. Por ejemplo, si tienes una tabla enorme de transacciones históricas, podrías agrupar por la columna del nombre de usuario. Aquí está la clave. Llamar a group by no devuelve un nuevo DataFrame. En su lugar, devuelve una estructura transitoria llamada objeto GroupedData. Como PySpark evalúa tu código de forma lazy, solo ha creado el execution plan para organizar estos buckets. En realidad, no moverá ningún dato hasta que le indiques qué operación matemática realizar en esos buckets. Para proporcionar esa operación matemática, encadenas el método aggregate, normalmente escrito como agg, directamente a tus datos agrupados. Esto gestiona las fases de apply y combine. Dentro del método aggregate, le indicas a PySpark qué calcular utilizando las herramientas del módulo PySpark SQL functions. Este módulo contiene docenas de operaciones de agregación optimizadas. Supongamos que quieres calcular la media de ingresos de cada uno de esos usuarios. Importarías la función average, normalmente conocida como avg. Le pasas el nombre de tu columna de ingresos a la función average y la colocas dentro del método aggregate. Cuando esto se ejecuta, PySpark calcula la media de ingresos para cada bucket de usuarios distinto simultáneamente. La fase de combine se activa entonces, devolviendo un DataFrame estándar y legible. Este nuevo DataFrame contiene una sola fila por usuario, emparejada con su media de ingresos recién calculada. En este punto, tienes una tabla perfectamente resumida. Sin embargo, dado que el cálculo ocurrió en paralelo a través de un cluster distribuido, las filas finales se devuelven en el orden aleatorio en que los nodos de procesamiento terminaron su trabajo. Si necesitas ver a los usuarios con mayores ingresos, el orden aleatorio es inútil. Para solucionar esto, encadenas el método order by al final de tu paso de agregación. Le pasas al método order by la columna que contiene tus nuevas medias y le indicas que ordene en orden descendente. PySpark tomará los resultados combinados, los clasificará y entregará una tabla limpia y ordenada. El patrón split-apply-combine es potente precisamente porque se adapta perfectamente al hardware distribuido, lo que permite resumir datasets masivos en segundos. Pero recuerda que agrupar datos es solo la mitad de la operación. La agrupación requiere una agregación para terminar el trabajo; de lo contrario, solo tendrás un cluster lleno de buckets vacíos esperando instrucciones. Gracias por pasar unos minutos conmigo. Hasta la próxima, cuídate.
12

Cuando los DataFrames colisionan: El arte de hacer Joins

3m 29s

Navegando por los matices de combinar datasets. Desglosamos los siete tipos diferentes de join en PySpark y explicamos cómo fusionar DataFrames de forma segura.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 12 de 21. Combinar dos tablas enormes es la operación más costosa en computación distribuida. Si aplicas una lógica de matching incorrecta, se convierte en la forma más fácil de tumbar tu clúster por quedarte sin memoria. Saber exactamente cómo combinar datasets de forma segura es de lo que trata Cuando los DataFrames colisionan: el arte de hacer joins. El mecanismo principal para combinar datos en PySpark es el método join. Lo llamas sobre tu DataFrame base, pasándole el DataFrame que quieres adjuntar, la columna o columnas específicas por las que quieres hacer el match, y el método join. Si no le pasas ningún método join, PySpark utiliza por defecto un inner join. Piensa en un escenario concreto. Tienes un DataFrame que registra las alturas de las personas y un segundo DataFrame que registra sus ingresos. Ambos datasets comparten una columna llamada name. Con un inner join, PySpark examina la columna name en ambos datasets y solo conserva las filas donde el nombre aparece en ambos sitios. Si una persona aparece en los datos de alturas pero falta en los de ingresos, su registro se descarta por completo del resultado. Para conservar los registros que no hacen match, cambias el tipo de join. Un left join conserva todas las filas de tu DataFrame inicial, que en este caso son los datos de alturas. Si PySpark encuentra un nombre que haga match en los datos de ingresos, añade ese ingreso. Si no encuentra un match, conserva la fila de la altura, pero coloca un valor null en la columna de ingresos. Un right join hace exactamente lo inverso, conserva todos los ingresos y rellena las alturas que faltan con nulls. Cuando necesitas absolutamente todo, usas un full join. PySpark conserva todos los registros de ambos DataFrames. Los nombres que hacen match se fusionan en una sola fila, y los nombres que existen solo en un dataset se conservan, rellenando los datos que faltan del otro lado con valores null. Aquí está la clave. Un cross join funciona de manera diferente porque ignora por completo la condición del join. Empareja cada fila del DataFrame heights con cada fila del DataFrame incomes, creando un producto cartesiano. Si ambas tablas tienen solo mil filas, un cross join genera un millón de filas. Este crecimiento explosivo es la razón por la que los cross joins están muy restringidos por defecto y a menudo requieren una configuración explícita para ejecutarse sin lanzar un error. Los dos últimos tipos de join son, en realidad, operaciones de filtrado más que verdaderas fusiones de datos. Un left semi join busca matches, devolviendo filas del DataFrame heights solo si el nombre también aparece en el DataFrame incomes. La diferencia crucial con un inner join es que un left semi join no se trae ninguna columna del lado derecho. Te quedas exactamente con las mismas columnas con las que empezaste, simplemente filtradas a los registros que tienen un match correspondiente. Un left anti join hace exactamente lo contrario. Devuelve filas del DataFrame heights solo si el nombre no existe en los datos de incomes. Descarta por completo las columnas del lado derecho. Esto convierte al left anti join en la forma más eficiente de identificar datos que faltan o encontrar registros que fallaron al procesarse downstream. La elección del join determina no solo qué datos recuperas, sino también cuántos datos tienen que moverse físicamente por tu red para generar el resultado. Gracias por escuchar. ¡Hasta la próxima!
13

Viejo SQL, nuevos trucos

3m 45s

¿Por qué aprender una nueva API cuando puedes usar SQL en bruto? Aprende a ejecutar consultas SQL estándar directamente contra DataFrames distribuidos de PySpark.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 13 de 21. Tienes un equipo de analistas que escribe un SQL excelente, pero tus datos están en un clúster distribuido masivo. Podrías obligarles a aprender una sintaxis de Python completamente nueva, o podrías dejarles usar el lenguaje que ya conocen. Aquí es donde entra en juego ejecutar strings de raw SQL directamente en PySpark, enseñando trucos nuevos al viejo SQL. PySpark te da un puente directo a SQL estándar a través de un único método en tu sesión de Spark, llamado simplemente sql. Le pasas un string de raw SQL a este método. El output no es texto plano. Es un DataFrame estándar de PySpark. Esto significa que puedes ejecutar una query de base de datos estándar, obtener un DataFrame de vuelta y pasarlo inmediatamente a otra función de Python. Es totalmente interoperable. Antes de poder hacer queries a los datos con SQL, PySpark necesita saber qué tablas existen. Tienes dos formas principales de exponer tus datos al motor SQL. Primero, si ya tienes un DataFrame en Python, puedes llamar a un método para registrarlo como una vista temporal. Le das un nombre en formato string y, de repente, actúa como una tabla en tus queries de SQL. Segundo, puedes crear tablas completamente dentro de tu string de SQL. Le pasas un statement create table al método sql. Dentro de ese string, defines el schema y le dices a PySpark exactamente dónde viven los archivos de datos subyacentes, como un path de cloud storage que contiene archivos Parquet. PySpark registra esto en su catálogo interno. A partir de ahí, le haces queries por nombre igual que a una tabla de base de datos tradicional. Compara cómo se ve la misma lógica en ambos enfoques. Imagina que necesitas sacar los nombres de los clientes, descartar a cualquiera con balance cero y hacer un merge del resultado con una tabla de orders. En la API de DataFrame, construyes un chain de métodos de Python. Llamas a select en tu dataset de clientes para elegir la columna name. Luego encadenas un método filter, comprobando si el balance es mayor que cero. Finalmente, añades un método join que hace referencia al dataset de orders mediante una key coincidente. Es muy programático. En el enfoque SQL, escribes un statement select estándar sacando la columna name, añades una cláusula where para el balance y escribes un inner join para la tabla de orders. Se queda en tu script como un único bloque de string legible. Aquí está la clave. Existe la idea equivocada de que escribir SQL dentro de strings de Python tiene que ser más lento o menos nativo que usar los métodos estructurados del DataFrame. Eso es falso. Tanto si encadenas métodos de Python como si pasas un string de raw SQL, PySpark los trata de forma idéntica. Ambos inputs se parsean inmediatamente, se traducen exactamente al mismo logical plan y se pasan al optimizador Catalyst. El motor de ejecución no sabe ni le importa qué API usaste para expresar tu intención. El rendimiento es exactamente el mismo. La elección entre la API de DataFrame y el raw SQL nunca tiene que ver con el rendimiento del clúster. Se trata puramente de lo que hace que tu equipo sea más rápido y tu codebase más fácil de mantener. Gracias por estar ahí. Espero que hayas aprendido algo nuevo.
14

Intercambiando DataFrames y SQL

3m 53s

Mezcla y combina SQL con Python sin problemas. Descubre cómo crear vistas temporales a partir de DataFrames, usar selectExpr y encadenar operaciones programáticas en los resultados de consultas SQL.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 14 de 21. Puede que te veas envuelto en un debate sobre si escribir tus transformaciones de datos en Python o en SQL. Obligarte a elegir estrictamente entre los dos te hace desaprovechar muchísima utilidad. La verdadera ventaja está en intercambiar DataFrames y SQL de forma fluida dentro del mismo pipeline. A veces, un conjunto complejo de nested joins es mucho más fácil de leer y mantener para tu equipo en raw SQL. Otras veces, necesitas iterar dinámicamente por los nombres de las columnas, algo imposible en SQL puro pero trivial en Python. PySpark te permite combinar ambos enfoques sin interrumpir tu flujo de datos. Para empezar a escribir SQL sobre un DataFrame de Python existente, primero debes exponer ese DataFrame al motor SQL de Spark. Lo consigues llamando al método create or replace temp view directamente sobre tu DataFrame. Le pasas un único argumento de tipo string, que se convierte en el nombre de la tabla. Esta operación no mueve ningún dato. No escribe en disco. Simplemente registra un pointer temporal en tu sesión actual de Spark. El motor SQL ahora sabe cómo resolver ese nombre de tabla para que apunte a tu DataFrame de Python. Ahora puedes hacerle una query. Llamas a spark punto sql y le pasas tu select statement estándar como un string, haciendo referencia al nombre de la tabla que acabas de crear. Aquí está la clave. El output de esa llamada a spark punto sql no es un resultado de texto estático, ni tampoco un tipo de objeto diferente. Devuelve un DataFrame estándar de PySpark. Esto significa que puedes hacer chain inmediatamente con métodos normales de DataFrame de Python directamente al final de tu llamada SQL. Puedes escribir un string SQL de cincuenta líneas para manejar una window function compleja, cerrar el paréntesis de spark punto sql, y añadir inmediatamente un método punto filter o punto group by. Pasas de Python a SQL y de vuelta a Python en un solo bloque de código. Si solo necesitas SQL para el cálculo de una columna específica, registrar una temp view completa es innecesario. En su lugar, usas el método select expression. Este método actúa como un puente. Funciona exactamente igual que un método select estándar de DataFrame, pero acepta expresiones de raw SQL string en lugar de column objects de Python. Si necesitas ejecutar un statement case-when, realizar funciones matemáticas, o hacer un cast de un data type usando sintaxis SQL nativa, le pasas esos strings SQL exactos a select expression. Spark coge esos strings, los parsea, y los ejecuta exactamente como lo haría dentro de una query SQL completa. Esto te permite mantenerte completamente dentro de la API chainable de DataFrame mientras te apoyas en la sintaxis SQL para la row-level logic compleja. La frontera entre estos dos paradigmas es completamente artificial. Tanto si haces chain de métodos de Python, escribes queries de raw SQL, o usas strings de select expression, Spark lo compila todo en exactamente el mismo execution plan optimizado. Si quieres ayudarnos a seguir creando estos episodios, puedes buscar DevStoriesEU en Patreon para apoyar el programa. Eso es todo por este episodio. ¡Gracias por escuchar, y a seguir programando!
15

Extendiendo Spark con UDFs de Python

4m 04s

Cuando las funciones integradas no son suficientes, entran en juego las User-Defined Functions. Exploramos cómo escribir lógica personalizada en Python para DataFrames, y por qué las UDFs escalares estándar esconden una penalización de rendimiento.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 15 de 21. Escribes una función personalizada en Python, la integras en tu data pipeline y funciona a la perfección con una pequeña muestra. Pero al ejecutarla con el dataset completo, el job se ralentiza considerablemente y el uso de la CPU se dispara. El código en sí está bien, pero estás pagando un coste de ejecución oculto. Hoy hablaremos de cómo extender Spark con UDFs de Python. Una User Defined Function, o UDF, te permite ejecutar lógica personalizada de Python directamente en un DataFrame de Spark. La usas cuando las funciones SQL integradas de Spark no cubren tu lógica de negocio específica. El proceso es sencillo. Empiezas escribiendo una función estándar de Python. Por ejemplo, escribes una función que recibe un string de texto, aplica una regla de formato personalizada compleja y devuelve el string modificado. Para que Spark reconozca esta función, importas la función udf del módulo functions de PySpark SQL y la aplicas como decorator justo encima de la definición de tu función de Python. También le pasas un return type al decorator, como un tipo string o un tipo integer. Si no proporcionas un return type, Spark usa por defecto un tipo string, lo que puede causar problemas de datos silenciosos si tu función realmente devuelve un número. Una vez aplicado el decorator, tu función de Python personalizada se comporta como una función nativa de Spark. Puedes pasarla a operaciones de DataFrame, como una sentencia select, pasándole los nombres de las columnas como argumentos. Aquí está la clave. Una scalar UDF estándar de Python opera estrictamente fila por fila. Recibe como entrada uno o más valores de columna de una sola fila, evalúa tu lógica personalizada de Python y devuelve exactamente un único valor de salida para esa fila específica. Si tu DataFrame contiene diez millones de filas, tu función de Python se invoca diez millones de veces distintas. Esta operación fila por fila es fácil de entender, pero crea el enorme cuello de botella de rendimiento que mencionamos al principio. Para comprender por qué es tan lenta, tienes que mirar cómo Spark ejecuta el código por debajo. Spark está construido en Scala, lo que significa que su motor principal se ejecuta dentro de una Java Virtual Machine, o JVM. Tu UDF personalizada está escrita en Python. La JVM no puede ejecutar código Python de forma nativa. Para aplicar tu UDF, Spark se ve obligado a iniciar procesos worker de Python independientes junto con sus propios executors. Luego, tiene que mover físicamente los datos fuera del espacio de memoria de la JVM hacia el proceso de Python. Spark utiliza una librería de serialización de Python llamada cloudpickle para gestionar esta compleja transferencia. Aquí es donde se cobra el impuesto de rendimiento. Para cada fila de tu dataset, Spark serializa las entradas en la JVM, envía esos datos binarios a través de un socket local al worker de Python y los deserializa en objetos estándar de Python. Finalmente, tu función personalizada se ejecuta sobre esos objetos. A continuación, todo el ciclo se repite a la inversa. Python serializa el valor de salida usando cloudpickle, lo envía de vuelta a través del socket y la JVM lo deserializa de nuevo al formato de memoria interna de Spark. Esta serialización y deserialización constante entre Java y Python es increíblemente costosa. El verdadero coste de una UDF estándar de Python rara vez es la lógica que escribes; es el overhead silencioso de traducir datos de un lado a otro entre dos entornos de runtime completamente diferentes para cada fila. Gracias por pasar unos minutos conmigo. Hasta la próxima, cuídate.
16

Acelerando las UDFs con Apache Arrow

3m 43s

Elimina el cuello de botella de la serialización de JVM a Python. Descubrimos cómo las Vectorized Pandas UDFs y los formatos de memoria de Apache Arrow potencian tus transformaciones personalizadas.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 16 de 21. ¿Qué pasaría si pudieras acelerar tus funciones personalizadas de Python en Spark por un factor de diez, con solo cambiar un único decorator? Las UDFs estándar de Python son conocidas por ser muy lentas, pero la solución no requiere reescribir tu lógica en Scala. Hoy vamos a ver cómo potenciar las UDFs con Apache Arrow. Cuando ejecutas una UDF estándar de Python, te chocas contra un muro de rendimiento enorme en la frontera entre los lenguajes. Spark opera dentro de la Java Virtual Machine, pero tu lógica personalizada se ejecuta en un worker process de Python independiente. Para pasar datos entre ellos, Spark extrae filas de su memoria interna, las serializa usando una librería llamada cloudpickle y las envía a Python. Python procesa los datos fila por fila, serializa el resultado y lo devuelve. Hacer esto para millones de filas individuales crea un cuello de botella de serialización insoportable. Apache Arrow cambia las reglas de este intercambio de datos. Arrow es un formato de datos in-memory, columnar y cross-language. Estandariza cómo se ven los datos en memoria, de modo que tanto la JVM como Python lo entienden de forma nativa sin traducciones complejas. En lugar de serializar los datos fila por fila, Spark empaqueta los datos en grandes batches columnares. Todos los valores para una columna específica se encuentran uno al lado del otro en memoria contigua. Spark envía estos grandes bloques a Python en un solo paso eficiente. Puedes aprovechar esto de dos maneras. Primero, puedes habilitar la optimización de Arrow para las UDFs estándar. Para ello, configuras la propiedad de configuración de Spark para la ejecución de Arrow a true, o especificas el parámetro useArrow igual a true al registrar tu UDF. Spark usará Arrow para transferir los datos en batches, reduciendo drásticamente el overhead de serialización, aunque técnicamente tu función de Python siga ejecutando la lógica fila por fila. Aquí está la clave. Para obtener el máximo aumento de velocidad, quieres que tu código Python procese esos batches de Arrow simultáneamente. Aquí es donde entran en juego las Pandas UDFs. Al envolver tu función personalizada con el decorator pandas UDF, cambias la forma en que la función recibe los datos. En lugar de recibir un único valor para una fila, tu función recibe una Pandas Series que contiene un batch entero de valores. Tu función aplica una operación vectorizada a todo ese batch y devuelve una nueva Pandas Series exactamente de la misma longitud. Piensa en una función llamada calculate tax. Aplicas el decorator pandas UDF y declaras que devuelve un tipo double. La función acepta una Pandas Series que contiene precios de productos. Dentro de la función, no escribes un bucle for. Simplemente escribes una sentencia return que multiplica la Series de entrada por uno punto dos. Como Pandas está respaldado por código C altamente optimizado por debajo, multiplica todo el bloque de precios al instante. Spark luego toma esa Series devuelta y la vuelve a fusionar sin problemas en el DataFrame usando Arrow. El verdadero poder de una Pandas UDF no es solo que evita el cuello de botella de serialización de cloudpickle, sino que traslada tu computación real de los lentos bucles de Python a una ejecución nativa y vectorizada. Gracias por escuchar. Cuidaos mucho.
17

Explotando filas con UDTFs de Python

4m 20s

Las UDFs estándar devuelven un valor por fila, pero ¿qué pasa si necesitas múltiples filas? Aprende cómo las User-Defined Table Functions (UDTFs) de Python resuelven problemas complejos de generación de uno a muchos.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 17 de 21. Las User-Defined Functions estándar están estrictamente limitadas a un mapeo uno a uno. Pasas un valor y obtienes exactamente un valor de salida. Pero, ¿qué pasa si una única entrada densa de log necesita expandirse en cien filas separadas? Para resolver esto, usas las User-Defined Table Functions de Python, o UDTFs. Una UDTF hace exactamente lo que su nombre indica. Devuelve una tabla entera a partir de un solo input. Mientras que una UDF estándar calcula un único valor escalar, una UDTF puede emitir múltiples filas y múltiples columnas. Esta es la herramienta a la que recurres cuando necesitas hacer un explode de un string JSON anidado, parsear un archivo de texto delimitado línea por línea, o generar una secuencia de fechas a partir de un único timestamp. Para crear una UDTF en PySpark, no escribes una función standalone básica. En su lugar, defines una clase de Python. Esta clase requiere un método específico llamado eval. El método eval es donde ocurre la transformación real. Cuando ejecutas la UDTF, Spark llama a este método para cada valor de input. Aquí está la clave. Dentro de ese método eval, no usas un return statement estándar. En su lugar, usas la keyword yield de Python. Cada vez que el método hace un yield de un valor, Spark traduce eso en una nueva fila en tu tabla de output. Si pasas un único string de input, el método eval podría iterar sobre él y hacer yield diez veces. Spark coge esos diez yields y produce diez filas distintas. Veamos un ejemplo concreto. Construyes una clase llamada ProcessWords. Tu objetivo es pasar una frase completa y obtener una tabla donde cada palabra tenga su propia fila. Escribes el método eval para que acepte un string de texto. Dentro del método, haces un split de la frase por espacios. Luego, iteras sobre las palabras resultantes. Por cada palabra, haces un yield de una tuple que contiene la propia palabra. Antes de que Spark pueda usar esta clase, le aplicas el decorator UDTF de PySpark. El decorator es obligatorio porque define tu schema de output. Declaras explícitamente los nombres de las columnas y los tipos de datos que genera tu función. Si haces un yield de un string, le dices al decorator que el output es una columna de tipo string. Si quieres hacer un yield de la palabra y su conteo de caracteres, haces un yield de una tuple de dos elementos, y tu decorator especifica un schema con una columna string y una columna integer. Más allá del método eval, una clase UDTF también puede incluir un método terminate opcional. Spark llama al método terminate exactamente una vez por cada partition de datos, después de que todas las filas de input hayan sido procesadas por el método eval. Esto es muy útil para hacer agregaciones. Si tu método eval rastrea un contador interno a través de múltiples filas de input, el método terminate puede hacer un yield de una fila final que contenga ese conteo total antes de que se cierre la partition. Cuando llamas a una UDTF en una operación de DataFrame, se comporta como una tabla inline. Si pasas una columna de DataFrame existente a la UDTF, Spark aplica la table function fila por fila. Como una table function genera múltiples filas por cada fila individual de input, combinar este output con tu dataset original requiere un lateral join implícito. Spark maneja esto por detrás, duplicando los datos de la fila original para que coincidan con las nuevas filas exploded generadas por tu clase de Python. El poder definitivo de una UDTF de Python es desvincular completamente tu volumen de input de tu volumen de output, permitiendo que un único data point florezca en un dataset completo de múltiples columnas. Eso es todo por este episodio. ¡Gracias por escuchar, y sigue construyendo!
18

La API de Pandas en Spark

4m 19s

Escala tus scripts de Pandas existentes hasta el infinito. Descubre cómo la API pyspark.pandas te permite ejecutar sintaxis estándar de Pandas de forma nativa en un clúster distribuido de Spark.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 18 de 21. Tienes un script de datos local que funciona a la perfección, pero de repente el tamaño de tu dataset se cuadruplica y tu máquina se queda sin memoria. Conoces la sintaxis a la perfección, pero reescribir todo para un framework distribuido lleva días. La API de pandas en Spark cubre exactamente este vacío. La API de pandas en Spark te permite ejecutar workloads estándar de pandas en un cluster distribuido. No se limita a emular pandas sin más. Intercepta tu código de pandas y lo traduce por debajo a planes de ejecución de Spark optimizados. Para usarla, importas el módulo llamado pyspark punto pandas. La convención estándar es asignarle el alias ps, imitando directamente el conocido alias pd que se usa en los workloads locales de data science. Si ya tienes un DataFrame local estándar de pandas en memoria, la transición es sencilla. Invocas una función llamada from pandas en tu módulo ps y le pasas tu DataFrame local. Esto convierte el objeto de un solo nodo en un DataFrame distribuido de pandas en Spark. A partir de ese momento, la sintaxis que usas para interactuar con este nuevo objeto sigue siendo idéntica a la que ya conoces. Esta consistencia se extiende a cómo se procesan los datos internamente. La API distribuida gestiona de forma nativa el missing data exactamente igual que el pandas local. Si tu dataset contiene valores Not-a-Number de NumPy, la API de pandas en Spark los gestiona correctamente durante las operaciones matemáticas o las transformaciones estructurales. No necesitas inventar una nueva lógica de limpieza de datos para tus jobs de Spark. Las operaciones estándar se traducen directamente. Si quieres agrupar tus datos por una columna específica, llamas a la función de agrupación estándar. Si quieres calcular la media o la suma, encadenas la función de agregación justo después. Incluso puedes llamar a las funciones de plotting directamente sobre el DataFrame distribuido. Spark procesa la computación pesada en todo el cluster, agrega los data points necesarios y devuelve la visualización igual que si estuvieras trabajando en una sola máquina. Aquí está la clave. La arquitectura por debajo es fundamentalmente diferente, y eso introduce un edge case crítico en cuanto a la generación de índices. Pandas local depende en gran medida de un índice secuencial y estrictamente ordenado para cada fila. Spark, en cambio, particiona los datos y los distribuye entre varias máquinas independientes. Forzar un índice secuencial estricto y globalmente ordenado en un sistema distribuido requiere comunicación constante entre los worker nodes. Cuando creas un DataFrame de pandas en Spark sin definir explícitamente una columna como índice, la API genera automáticamente un índice por defecto para imitar a la perfección el comportamiento estándar de pandas. Crear y mantener este índice por defecto requiere sincronizar el estado en todo el cluster. Si estás operando con un dataset masivo, esta sincronización introduce un overhead de rendimiento severo. La API a menudo emitirá un warning sobre este overhead interno cuando se ejecuta. Para evitar este cuello de botella, es muy recomendable asignar una columna existente como índice de inmediato o configurar la API para usar un tipo de índice amigable con sistemas distribuidos. La API de pandas en Spark te da la sintaxis exacta de pandas impulsada por el motor de ejecución distribuida de Spark, pero recordar que los índices secuenciales estrictos conllevan un alto coste de sincronización salvará a tu cluster de ralentizaciones innecesarias. Eso es todo por hoy. Gracias por escuchar, ve a construir algo genial.
19

Carga y descubre: Formatos de almacenamiento

4m 01s

No todos los formatos de archivo son iguales. Contrastamos los CSVs basados en filas con formatos columnares como Parquet y ORC, explorando opciones de lectura/escritura y técnicas de almacenamiento óptimas.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 19 de 21. Guardar un dataset masivo como CSV es lo más fácil del mundo, pero también una de las cosas más destructivas que puedes hacerle al rendimiento de tu data lake. Pagas por más storage, pagas por más compute, y cada query downstream va lentísima. La solución está en cómo gestionas Load and Behold: Storage Formats, y por qué cómo guardas tus datos importa tanto como cómo los transformas. PySpark utiliza una interfaz unificada para leer y escribir datos en docenas de sistemas de almacenamiento. Llamas al atributo read o write en tu sesión de Spark o en tu DataFrame, especificas un formato, le pasas un chain de opciones y lo apuntas a un path de archivo. Es un patrón predecible, pero las opciones que elijas dictan cuánto trabajo tendrá que hacer tu cluster después. Empecemos por los formatos legibles para humanos, CSV y JSON. Estos son formatos basados en filas. Cuando lees un CSV, Spark parsea los datos línea por línea. A menudo necesitas hacer un chain de opciones específicas para darle sentido al texto. Por ejemplo, puedes hacer un chain de una opción para decirle a Spark que el archivo tiene un header, otra opción para configurar un delimitador personalizado como un pipe o un tab, y una tercera opción para definir exactamente qué aspecto tiene un valor null, quizás pasándole un string específico para que Spark lo mapee correctamente a un valor vacío en lugar de tratarlo como texto literal. JSON es un poco mejor porque maneja estructuras anidadas de forma nativa, pero repite las keys del schema para cada record, inflando muchísimo el tamaño del archivo. Ambos formatos obligan a Spark a leer la fila entera del disco, incluso si tu query solo pide una única columna. Aquí es donde entran en juego los formatos columnares como Parquet y ORC. Presta atención a esta parte. Las queries analíticas rara vez necesitan todas las columnas de una tabla ancha. Generalmente, necesitan columnas específicas a lo largo de millones de filas para ejecutar agregaciones. Parquet y ORC almacenan los datos organizados por columna, no por fila. Si haces una query de tres columnas de un total de cien, Spark solo lee los chunks del archivo que contienen esas tres columnas. Se salta el resto por completo, reduciendo el input y output de disco a una fracción de lo que requiere un CSV. Como los datos del mismo tipo se almacenan juntos, los formatos columnares también se comprimen de maravilla. Un directorio de archivos JSON puede reducirse en un setenta por ciento o más al convertirse a Parquet. Además, incrustan el schema exacto y los tipos de datos en la metadata del archivo, lo que significa que Spark no tiene que adivinar ni inferir los tipos en el load. Cuando estés listo para escribir estos datos de vuelta, tienes que gestionar el state en el destino. Por defecto, si intentas escribir en un path donde ya existen datos, Spark lanza un error para evitar la pérdida accidental de datos. Puedes controlar esto usando el método mode antes de lanzar el save. Si pasas el string overwrite, Spark elimina los datos existentes en el path de destino y los reemplaza con tu DataFrame actual. Si pasas append, Spark simplemente añade tus nuevos part files al directorio existente. También existe un modo ignore, que silenciosamente no hace nada si el directorio ya contiene datos. Escribir datos limpios, tipados y en formato columnar hoy le ahorrará a tu cluster horas de tiempo de procesamiento desperdiciado mañana. Si quieres ayudar a que sigan saliendo estos episodios, puedes apoyar el programa buscando DevStoriesEU en Patreon. Gracias por pasar unos minutos conmigo. Hasta la próxima, cuídate.
20

Cazando bugs: Planes físicos y Joins

3m 48s

Echa un vistazo bajo el capó del motor de ejecución de Spark. Aprende a depurar consultas usando DataFrame.explain() y cómo eliminar costosos shuffles utilizando Broadcast joins.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 20 de 21. Tu job de PySpark no es lento porque esté procesando datos. Es lento porque se pasa todo el tiempo moviendo datos por la red. Cuando un simple join ralentiza tu clúster al máximo, la solución está en cazar bugs: planes físicos y joins. Cuando escribes un script de PySpark, defines operaciones lógicas. Le dices a Spark qué quieres, no cómo hacerlo. Pero cuando un job no rinde bien, necesitas saber exactamente cómo ha ejecutado Spark tu petición. Esto lo haces llamando al método explain en tu DataFrame. Llamar a explain imprime el plan físico. Este es el esquema de las tareas reales que Spark ejecuta en tu clúster. Lees este plan de abajo arriba, rastreando los datos desde los archivos de origen hasta el output final. Si miras el plan físico de un join estándar entre dos DataFrames, probablemente verás un paso llamado SortMergeJoin. Para hacer un SortMergeJoin, Spark debe asegurarse de que las filas con las mismas join keys estén físicamente en el mismo executor. Para conseguirlo, Spark hace un Exchange. Exchange es el término del plan físico para un shuffle de red. Significa que Spark está sacando datos de las particiones, enviándolos por la red y escribiéndolos en disco para que los otros executors puedan leerlos. El shuffle es la operación más costosa en la computación distribuida. Aquí está la clave. Si estás haciendo un join de una fact table enorme con una lookup table pequeña, hacer un shuffle de la tabla grande es un desperdicio masivo de recursos. En lugar de hacer shuffle de ambas tablas para alinear las keys, puedes simplemente enviar toda la tabla pequeña a cada executor. Esto lo haces usando la función broadcast del módulo de funciones SQL de PySpark. Cuando llamas a tu método join, simplemente envuelves el DataFrame más pequeño en la función broadcast. Al envolver la tabla pequeña, le das a Spark una directiva estricta. Spark hará un collect del DataFrame pequeño al driver node, y luego transmitirá una copia completa a la memoria de cada executor. Ahora, cuando se procesa el DataFrame grande, los executors ya tienen todos los datos de lookup que necesitan ahí mismo en la RAM. Simplemente recorren sus particiones existentes y hacen match de las filas localmente. No hace falta ordenar nada, y ningún dato de la tabla grande se mueve por la red. Si llamas a explain en este nuevo join con broadcast, el plan físico se ve completamente diferente. El SortMergeJoin ha desaparecido. El costoso paso de Exchange está completamente ausente. En su lugar, verás un BroadcastExchange y un BroadcastHashJoin. El BroadcastExchange solo mueve la tabla pequeña una vez, y el propio join ocurre completamente en el mismo lugar. La forma más fácil de duplicar la velocidad de un job de Spark es dejar de mover datos que no necesitan moverse. Lee tus planes físicos, detecta los Exchanges de red y haz broadcast de tus tablas pequeñas. Eso es todo por hoy. Gracias por escuchar. Ve y construye algo genial.
21

Profiling de memoria y rendimiento en PySpark

4m 41s

Concluimos nuestro viaje por PySpark presentando herramientas de profiling nativas. Aprende a rastrear el consumo de memoria línea por línea y a exponer tracebacks internos ocultos de Python.

Descargar
Hola, soy Alex de DEV STORIES DOT EU. Fundamentos de PySpark, episodio 21 de 21. Depurar código Python distribuido suele implicar revisar miles de líneas de errores de Java sin sentido, intentando adivinar por qué falló tu función o por qué consumió toda la memoria de tu clúster. Ya no tienes que adivinar. Hoy veremos cómo hacer profiling de la memoria y la performance de PySpark, además de simplificar los stack traces. Cuando escribes una User Defined Function, o UDF, en PySpark, tu código Python se ejecuta sobre una infraestructura de Java Virtual Machine. Si tu código Python divide por cero o hace referencia a una key de un diccionario que no existe, esa simple excepción de Python queda silenciada. Se devuelve a través del daemon de PySpark, por la red, y se envuelve en enormes excepciones de Java. Encontrar el error real de Python en tus logs es tedioso. Puedes solucionar esto habilitando los tracebacks simplificados. Cuando pones la configuración de Spark para el traceback simplificado en true, PySpark cambia la forma en que informa de los errores. Elimina todos los logs de interoperabilidad de Java y el ruido de los procesos worker. La próxima vez que falle una UDF, tu consola mostrará un stack trace de Python estándar y limpio, indicando el número exacto de línea en tu archivo Python donde se produjo la excepción. Corregir los crashes es solo la mitad de la batalla. Corregir código lento o que consume mucha memoria es mucho más difícil. Si escribes una UDF de Pandas que procesa millones de filas, puede que se ejecute correctamente, pero que tarde demasiado o que provoque errores out-of-memory en tus nodos executor. Históricamente, encontrar el cuello de botella requería añadir logging manual o adivinar qué operación de Pandas era ineficiente. Spark 4.0 cambia esto al introducir profilers integrados para las UDF de Python. Aquí está la clave. Ahora puedes hacer profiling de tu código Python distribuido línea por línea, directamente dentro de PySpark. Para usar esto, configuras el profiler de la UDF en uno de dos modos: performance o memory. Si configuras el profiler con la palabra perf, Spark activa el profiler de performance. Luego, ejecutas tu job de Spark normalmente. Mientras los nodos worker ejecutan tu UDF de Pandas, Spark registra el tiempo de ejecución de cada línea de tu función de Python. Una vez que termina tu job, llamas al método show en el objeto profile de Spark. Spark imprimirá un informe detallado en tu consola. Para cada línea de tu código, verás exactamente cuántas veces se llamó y el tiempo total empleado en ejecutarla. Puedes ver al instante si una manipulación de strings o una operación matemática específica está ralentizando todo tu pipeline. Si estás lidiando con límites de memoria, configuras el profiler de la UDF con la palabra memory en su lugar. El workflow es exactamente el mismo, pero el output cambia. Al ver el informe del profile, Spark te muestra el incremento exacto en megabytes causado por cada línea de tu código Python. Puedes ver exactamente dónde se están reservando arrays grandes y dónde no se está liberando la memoria. Esta visibilidad línea por línea elimina las conjeturas al optimizar transformaciones de datos complejas. Puedes identificar la causa exacta de tus problemas de performance sin salir de tu entorno de PySpark. Como este es el último episodio de nuestra serie sobre PySpark, te animo a consultar la documentación oficial de Spark y a probar estas herramientas de debugging en la práctica. Si tienes ideas sobre qué tecnologías deberíamos tratar en nuestra próxima serie, pásate por devstories.eu y cuéntanoslo. Gracias por pasar unos minutos conmigo. Hasta la próxima, cuídate.