PySpark brings the power of Apache Spark to Python, allowing data scientists and engineers to process big data efficiently. It is fast, scalable, and fault-tolerant, making it ideal for large-scale data processing, machine learning, and real-time analytics.
In this post, we’ll explore why PySpark is great, how it works under the hood, and include code examples to demonstrate its capabilities.
Note, in many cases it’s possible to do similar operations with Pandas, but PySpark is designed to handle big data that doesn’t fit into memory on a single machine.
Another possibility is to SQL cloud service such as Google BigQuery, AWS RedShift or Azure Synapse Analytics. However PySpark is more flexible and can be run on-premises or in the cloud in a bespoke and Pythonic way.
🚀 Why is PySpark So Great?
🔥 Scalability
PySpark distributes data across multiple nodes, allowing it to process terabytes or even petabytes efficiently.
⚡ Speed
Uses in-memory computation, reducing disk I/O.
Optimized DAG execution minimizes redundant computations.
Parallelism speeds up operations across distributed clusters.
🐍 Pythonic & Flexible
Works seamlessly with Pandas, NumPy, and MLlib.
Supports multiple data formats: CSV, Parquet, JSON, Delta Lake, etc.
📊 SQL + ML Support
Spark SQL lets you query big data using SQL.
MLlib enables scalable machine learning.
☁️ Cloud & On-Premise Compatibility
Runs on AWS, Azure, GCP, Kubernetes, and local clusters.
🔍 How Does PySpark Achieve This?
1️⃣ Resilient Distributed Datasets (RDDs) – The Core
PySpark’s core data structure is the RDD (Resilient Distributed Dataset), which: - Distributes data across multiple nodes for parallelism. - Supports fault tolerance by tracking lineage. - Uses lazy evaluation, meaning computations are only executed when needed.
Example: Creating an RDD
from pyspark.sql import SparkSession# Initialize Sparkspark = SparkSession.builder.appName("RDD Example").getOrCreate()# Create an RDD from a Python listdata = ["apple", "banana", "cherry", "date"]rdd = spark.sparkContext.parallelize(data)# Transform and collect resultsupper_rdd = rdd.map(lambda x: x.upper())print(upper_rdd.collect())
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/27 18:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
['APPLE', 'BANANA', 'CHERRY', 'DATE']
[Stage 0:> (0 + 10) / 10]
2️⃣ Directed Acyclic Graph (DAG) Execution
Instead of executing step-by-step like MapReduce, Spark builds a DAG of transformations, optimizing execution by: - Pipelining operations. - Reducing redundant computations.
3️⃣ In-Memory Computation for Speed
Unlike Hadoop, which writes intermediate data to disk, Spark keeps it in RAM whenever possible.
Example: DataFrame Operations
from pyspark.sql.functions import col# Create a DataFramedata = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]columns = ["Name", "Age"]df = spark.createDataFrame(data, columns)# Perform a transformationdf_filtered = df.filter(col("Age") >28)df_filtered.show()
The Driver manages execution and breaks tasks into stages.
Workers execute tasks across a cluster manager (YARN, Kubernetes, Mesos, or Standalone).
5️⃣ Fault Tolerance with Lineage
If a node fails, Spark recomputes only the lost partitions using the RDD’s lineage graph, avoiding the need to restart the entire job.
Example: Handling Fault Tolerance
# Simulating failure recoveryrdd_with_failure = rdd.map(lambda x: 1/ (len(x) -5)) # Will cause division by zerotry:print(rdd_with_failure.collect())exceptExceptionas e:print("Error handled:", e)
Error handled: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 1 times, most recent failure: Lost task 2.0 in stage 4.0 (TID 22) (10.149.106.125 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
process()
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
serializer.dump_stream(out_iter, outfile)
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
vs = list(itertools.islice(iterator, batch))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
process()
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
serializer.dump_stream(out_iter, outfile)
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
vs = list(itertools.islice(iterator, batch))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
25/03/27 18:36:00 ERROR Executor: Exception in task 2.0 in stage 4.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
process()
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
serializer.dump_stream(out_iter, outfile)
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
vs = list(itertools.islice(iterator, batch))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
25/03/27 18:36:00 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 22) (10.149.106.125 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
process()
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
serializer.dump_stream(out_iter, outfile)
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
vs = list(itertools.islice(iterator, batch))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
25/03/27 18:36:00 ERROR TaskSetManager: Task 2 in stage 4.0 failed 1 times; aborting job
🔥 PySpark in Action: End-to-End Example
Here’s a real-world PySpark example where we: 1. Read a CSV file. 2. Perform transformations. 3. Run SQL queries.
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import avg# Initialize Spark Sessionspark = SparkSession.builder.appName("ExampleApp").getOrCreate()# Read CSV into DataFramedf = spark.read.csv("data.csv", header=True, inferSchema=True)# Transform: Filter and Aggregatefiltered_df = df.filter(col("age") >30)avg_salary = filtered_df.groupBy("department").agg(avg("salary").alias("avg_salary"))# Run SQL Queryfiltered_df.createOrReplaceTempView("employees")sql_result = spark.sql("SELECT department, AVG(salary) AS avg_salary FROM employees GROUP BY department")# Show resultsavg_salary.show()sql_result.show()
25/03/27 18:36:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
PySpark is a powerful, scalable, and fast big data processing framework. It achieves this through: - RDDs for distributed computing. - DAG execution for optimization. - In-memory computation for speed. - Fault tolerance via lineage tracking.
If you work with big data, PySpark is a game-changer! 🚀