Why PySpark?

Big Data
PySpark
Distributed Computing
Author

Ravi Kalia

Published

March 27, 2025

Introduction

PySpark brings the power of Apache Spark to Python, allowing data scientists and engineers to process big data efficiently. It is fast, scalable, and fault-tolerant, making it ideal for large-scale data processing, machine learning, and real-time analytics.

In this post, we’ll explore why PySpark is great, how it works under the hood, and include code examples to demonstrate its capabilities.

Note, in many cases it’s possible to do similar operations with Pandas, but PySpark is designed to handle big data that doesn’t fit into memory on a single machine.

Another possibility is to SQL cloud service such as Google BigQuery, AWS RedShift or Azure Synapse Analytics. However PySpark is more flexible and can be run on-premises or in the cloud in a bespoke and Pythonic way.


🚀 Why is PySpark So Great?

🔥 Scalability

PySpark distributes data across multiple nodes, allowing it to process terabytes or even petabytes efficiently.

Speed

  • Uses in-memory computation, reducing disk I/O.
  • Optimized DAG execution minimizes redundant computations.
  • Parallelism speeds up operations across distributed clusters.

🐍 Pythonic & Flexible

  • Works seamlessly with Pandas, NumPy, and MLlib.
  • Supports multiple data formats: CSV, Parquet, JSON, Delta Lake, etc.

📊 SQL + ML Support

  • Spark SQL lets you query big data using SQL.
  • MLlib enables scalable machine learning.

☁️ Cloud & On-Premise Compatibility

  • Runs on AWS, Azure, GCP, Kubernetes, and local clusters.

🔍 How Does PySpark Achieve This?

1️⃣ Resilient Distributed Datasets (RDDs) – The Core

PySpark’s core data structure is the RDD (Resilient Distributed Dataset), which: - Distributes data across multiple nodes for parallelism. - Supports fault tolerance by tracking lineage. - Uses lazy evaluation, meaning computations are only executed when needed.

Example: Creating an RDD

from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("RDD Example").getOrCreate()

# Create an RDD from a Python list
data = ["apple", "banana", "cherry", "date"]
rdd = spark.sparkContext.parallelize(data)

# Transform and collect results
upper_rdd = rdd.map(lambda x: x.upper())
print(upper_rdd.collect())
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/27 18:35:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
['APPLE', 'BANANA', 'CHERRY', 'DATE']
[Stage 0:>                                                        (0 + 10) / 10]                                                                                

2️⃣ Directed Acyclic Graph (DAG) Execution

Instead of executing step-by-step like MapReduce, Spark builds a DAG of transformations, optimizing execution by: - Pipelining operations. - Reducing redundant computations.

3️⃣ In-Memory Computation for Speed

Unlike Hadoop, which writes intermediate data to disk, Spark keeps it in RAM whenever possible.

Example: DataFrame Operations

from pyspark.sql.functions import col

# Create a DataFrame
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Perform a transformation
df_filtered = df.filter(col("Age") > 28)

df_filtered.show()
+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 30|
|Charlie| 35|
+-------+---+

4️⃣ Distributed Task Scheduling

  • The Driver manages execution and breaks tasks into stages.
  • Workers execute tasks across a cluster manager (YARN, Kubernetes, Mesos, or Standalone).

5️⃣ Fault Tolerance with Lineage

If a node fails, Spark recomputes only the lost partitions using the RDD’s lineage graph, avoiding the need to restart the entire job.

Example: Handling Fault Tolerance

# Simulating failure recovery
rdd_with_failure = rdd.map(lambda x: 1 / (len(x) - 5))  # Will cause division by zero
try:
    print(rdd_with_failure.collect())
except Exception as e:
    print("Error handled:", e)
Error handled: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 1 times, most recent failure: Lost task 2.0 in stage 4.0 (TID 22) (10.149.106.125 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
25/03/27 18:36:00 ERROR Executor: Exception in task 2.0 in stage 4.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
25/03/27 18:36:00 WARN TaskSetManager: Lost task 2.0 in stage 4.0 (TID 22) (10.149.106.125 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ravikalia/Code/local-playground/.venv/lib/python3.11/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/var/folders/p9/vwq0gfs15vb07tg6xw1r14180000gn/T/ipykernel_61226/2293831032.py", line 2, in <lambda>
ZeroDivisionError: division by zero

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
    at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

25/03/27 18:36:00 ERROR TaskSetManager: Task 2 in stage 4.0 failed 1 times; aborting job

🔥 PySpark in Action: End-to-End Example

Here’s a real-world PySpark example where we: 1. Read a CSV file. 2. Perform transformations. 3. Run SQL queries.

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Initialize Spark Session
spark = SparkSession.builder.appName("ExampleApp").getOrCreate()

# Read CSV into DataFrame
df = spark.read.csv("data.csv", header=True, inferSchema=True)

# Transform: Filter and Aggregate
filtered_df = df.filter(col("age") > 30)
avg_salary = filtered_df.groupBy("department").agg(avg("salary").alias("avg_salary"))

# Run SQL Query
filtered_df.createOrReplaceTempView("employees")
sql_result = spark.sql(
    "SELECT department, AVG(salary) AS avg_salary FROM employees GROUP BY department"
)

# Show results
avg_salary.show()
sql_result.show()
25/03/27 18:36:00 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
+-----------+----------+
| department|avg_salary|
+-----------+----------+
|Engineering|   95000.0|
|         HR|  105000.0|
+-----------+----------+

+-----------+----------+
| department|avg_salary|
+-----------+----------+
|Engineering|   95000.0|
|         HR|  105000.0|
+-----------+----------+

🎯 Conclusion

PySpark is a powerful, scalable, and fast big data processing framework. It achieves this through: - RDDs for distributed computing. - DAG execution for optimization. - In-memory computation for speed. - Fault tolerance via lineage tracking.

If you work with big data, PySpark is a game-changer! 🚀

🔗 Further Reading