Logo

dev-resources.site

for different kinds of informations.

Avoid These Top 10 Mistakes When Using Apache Spark

Published at
8/28/2024
Categories
dataengineering
development
coding
dbt
Author
sudo_pradip
Author
11 person written this
sudo_pradip
open
Avoid These Top 10 Mistakes When Using Apache Spark

We all know how easy it is to overlook small parts of our code, especially when we have powerful tools like Apache Spark to handle the heavy lifting. Spark's core engine is great at optimizing our messy, complex code into a sleek, efficient physical plan. But here's the catch: Spark isn't flawless. It's on a journey to perfection, sure, but it still has its limits. And Spark is upfront about those limitations, listing them out in the documentation (sometimes as little notes).

But let’s be honest—how often do we skip the docs and head straight to Stack Overflow or ChatGPT for quick answers? I've been there too. The thing is, while these shortcuts can be useful, they don't always tell the whole story. So, if you're ready to dive in, let's talk about some common mistakes and how to avoid them. Stay with me; this is going to be a ride!


Table of Content


Mistake #1: Adding Columns the Wrong Way

client: "Hey, can you add 5 columns? Make it quick, okay?"

Developer: "Sure, I'll just use withColumn() in a loop 5 times!"

Client: (Happy) "Great! Now, can you add 10 more columns? Make it quick, and keep the code short!"

Developer: "No problem! I'll loop 15 times now."

Spark: "Sorry I can't optimize"

But wait—according to Spark's documentation...

Don't use withColumn in loop

Apache Spark scala doc of withColumn with it's limitation highlighted

Solution: SelectExpr or Select
here is complete solution,

def addOrReplaceColumns(newColumns: List[Column], sourceColumns: List[String]): List[Column] = {
  val (columnsToBeReplace, newColumns) = newColumns.partition(column => sourceColumns.contains(column.toString()))
  val restOfColumns = sourceColumns.diff(columnsToBeReplace.map(column => column.toString())).map(col => col)

  (columnsToBeReplace ++ newColumns ++ restOfColumns).toList
}
Enter fullscreen mode Exit fullscreen mode

Mistake #2: Order of Narrow and Wide Transformation

Normally we focus on business logic when developing a data solution and it's common to ignore the order of narrow and wide transformation but things is spark recommended to combine all narrow first and then wide, for example,
if you have

narrow, wide, narrow, narrow, wide, narrow 
Enter fullscreen mode Exit fullscreen mode

then try to arrange like,

narrow, narrow, narrow, wide, wide
Enter fullscreen mode Exit fullscreen mode

then spark will optimize your code more accurately for example all narrow transformation will happen as pipeline operation and only one shuffle will required.


Mistake #3: Overlooking Data Serialization Format

By default, Spark uses Java serialization, which is not the most efficient option. Switching to Kryo serialization can lead to better performance, as it is faster and uses less memory. Use the following configuration to enable Kryo:

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Enter fullscreen mode Exit fullscreen mode

But does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.


Mistake #4: Not Using Parallel Listing on Input Paths

When reading files from storage systems like Amazon S3, Azure Data Lake Storage (ADLS), or even local storage, Spark needs to list and find all matching files in the input directory before starting the next task. This listing process can become a bottleneck, especially when dealing with large directories or a vast number of files. By default, Spark uses only a single thread to list files, which can significantly slow down the start of your job.

To mitigate this, you can increase the number of threads used for listing files by setting the spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads property. This allows Spark to parallelize the file listing process, speeding up the initialization phase of your job.

spark.conf.set("spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads", 10)
Enter fullscreen mode Exit fullscreen mode

Mistake #5: Ignoring Data Locality

Data locality significantly impacts the performance of Spark jobs.

When data and the code processing it are close together, computation is faster, as there is less need to move large chunks of data. Spark scheduling prioritizes data locality to minimize data movement, following levels of locality from best to worst: PROCESS_LOCAL (data and code in the same JVM), NODE_LOCAL (data on the same node), RACK_LOCAL (data on the same rack but different node), and ANY (data elsewhere on the network).

Spark tries to schedule tasks at the highest locality level possible, but this isn't always feasible. If no idle executors have unprocessed data at the desired locality level, Spark can either wait for a busy executor to free up or fall back to a lower locality level by moving data to an idle executor. The time Spark waits before falling back can be adjusted using the spark.locality.wait settings. Adjusting these settings can help improve performance in scenarios with long-running tasks or when data locality is poor.

In case of medium data skew or cluster with ample resources or using .catch() then increasing would benefits rather than going to lower locality.

spark.conf.set("spark.locality.wait", "10s")
Enter fullscreen mode Exit fullscreen mode

Mistake #6: Relying on Default Number of Shuffle Partitions

By default, Spark uses 200 partitions for shuffle operations (e.g., join, groupBy). This number might be too high or too low, depending on your dataset and cluster size.

AQE (enabled by default from 7.3 LTS + onwards) adjusts the shuffle partition number automatically at each stage of the query, based on the size of the map-side shuffle output.

But it's advisable to update shuffle partition before performing a wide transformation, if you need accurate optimization and if you are unsure spark recommended to set shuffle partition value to number of cores in your cluster.

spark.conf.set("spark.sql.shuffle.partitions", "num_core_in_cluster")
Enter fullscreen mode Exit fullscreen mode

And don't forgot to tune spark.default.parallelism this setting accordingly as well.


Mistake #7: Overlooking Broadcast Join Thresholds

Scenario:

Developer: "I thought small lookup tables would be broadcasted automatically and my each of executors has 32GB of memory! Why are my joins so slow?"

Spark: "Sorry, your lookup table is just above the default threshold."

Broadcast joins can drastically speed up join operations when one of the tables is small enough to fit into memory on each worker node. However, if you don't adjust the broadcast join threshold, Spark might not broadcast tables that could be effectively broadcasted, leading to unnecessary shuffling.

Solution:

Adjust the broadcast join threshold using spark.sql.autoBroadcastJoinThreshold. If your lookup table is slightly larger than the default 10MB limit, increase the threshold.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) // 50MB
Enter fullscreen mode Exit fullscreen mode

When setting the broadcast join threshold, don't base it only on executor memory. The driver loads the small table into memory first before distributing it to executors. Make sure the threshold is suitable for both driver and executor memory capacities to prevent memory issues and optimize performance.


Mistake #8: Relying on default storage level for Cache

It’s crucial to select the appropriate storage level for caching and persisting data based on the type of executors in your cluster,

Choosing the right storage level based on the executor type and objectives is crucial for optimizing Spark performance and resource utilization. By understanding the trade-offs between speed, memory usage, and fault tolerance, you can tailor your Spark configuration to meet the specific needs of your application.

Executor Type Primary Objective Recommended Storage Level Description Alternative for Fault Tolerance Notes
Memory-Optimized Fast access, low memory usage MEMORY_ONLY_SER Stores RDD as serialized objects in memory. Balances speed and memory efficiency. MEMORY_ONLY_SER_2 Use MEMORY_ONLY if serialization overhead is not a concern.
MEMORY_ONLY Stores RDD as deserialized objects in memory. Fastest access, highest memory usage. MEMORY_ONLY_2 Use for small datasets that fit comfortably in memory.
CPU-Optimized Balanced memory and disk MEMORY_AND_DISK_SER Serialized storage in memory, spills to disk if needed. Good for large datasets. MEMORY_AND_DISK_SER_2 Preferred when memory is limited; avoids out-of-memory errors.
MEMORY_AND_DISK Deserialized storage in memory, spills to disk. Faster access than MEMORY_AND_DISK_SER. MEMORY_AND_DISK_2 Use when memory can accommodate deserialized objects, with fallback to disk.
General Purpose Flexibility, moderate size datasets MEMORY_AND_DISK Deserialized in-memory, spills to disk. Good balance for general use cases. MEMORY_AND_DISK_2 Good for mixed workloads; balances speed and fault tolerance.
MEMORY_ONLY_SER Serialized in-memory storage. Optimized for memory efficiency and speed. MEMORY_ONLY_SER_2 Suitable for datasets that fit well in memory after serialization.
Disk-Optimized Low memory, high fault tolerance DISK_ONLY Stores RDD partitions only on disk. Minimizes memory usage but slowest access. DISK_ONLY_2 Suitable for very large datasets where memory is a constraint.
MEMORY_AND_DISK_SER Serialized storage in memory with spillover to disk. More efficient than deserialized. MEMORY_AND_DISK_SER_2 Balances disk usage and memory efficiency.
  • The _2 options (e.g., MEMORY_ONLY_2, MEMORY_AND_DISK_2) are useful for scenarios where fault tolerance is crucial. They replicate data across two nodes, ensuring data is not lost if a node fails. This is particularly valuable in environments where reliability is prioritized over resource efficiency, such as production systems handling critical data or real-time processing pipelines.

  • The _SER option (e.g., MEMORY_AND_DISK_SER) Stores RDD as serialized Java objects (one byte array per partition) in memory. More memory-efficient than MEMORY_ONLY, but slower due to serialization/deserialization overhead.


Mistake #9: Misconfiguring Spark Memory Settings

Scenario:

Developer: "My Spark job keeps failing with out-of-memory errors. I gave it all the memory available!"

Spark: "Memory isn't just for you; I need some for myself, too."

Many users allocate almost all available memory to the executor heap space (spark.executor.memory) without considering Spark's overhead memory, causing frequent out-of-memory errors. Additionally, insufficient memory can lead to excessive garbage collection (GC) pauses, slowing down jobs.

Solution:

Properly configure memory settings by tuning spark.executor.memory and spark.executor.memoryOverhead.

--conf spark.executor.memory=4g --spark.executor.memoryOverhead=512m
Enter fullscreen mode Exit fullscreen mode

Ensure you leave enough memory overhead to accommodate Spark's internal needs (shuffle, RDD storage, etc.). Typically, 10-15% of the total memory should be allocated as overhead.

spark.memory.fraction expresses the size of M as a fraction of the (JVM heap space - 300MiB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.


Mistake #10: Relying Only on Cache and Persist

Many Spark developers are familiar with the cache() and persist() methods for improving performance, but they often overlook the value of checkpoint(). While cache() and persist() keep data in memory or on disk to speed up processing, they don’t provide fault tolerance in the case of a failure. checkpoint(), on the other hand, saves the RDD to a reliable storage system, allowing for fault recovery and optimizing job execution.

Using checkpoint() not only ensures that your job can recover from failures but also helps Spark optimize the execution of other jobs that share the same lineage. This can lead to improved performance and resource utilization.

spark.sparkContext.setCheckpointDir("path/to/checkpoint/dir")
df.checkpoint()
Enter fullscreen mode Exit fullscreen mode
dbt Article's
30 articles in total
Favicon
Parte 1: Introdução ao dbt
Favicon
Explorer l'API de 360Learning : de l'agilité de Power Query à la robustesse de la Modern Data Stack
Favicon
Cross-Project Dependencies Handling with DBT in AWS MWAA
Favicon
Building a User-Friendly, Budget-Friendly Alternative to dbt Cloud
Favicon
Working with Gigantic Google BigQuery Partitioned Tables in DBT
Favicon
dbt (Data Build Tool). Data Engineering Student's point of view.
Favicon
An End-to-End Guide to dbt (Data Build Tool) with a Use Case Example
Favicon
Avoid These Top 10 Mistakes When Using Apache Spark
Favicon
DBT and Software Engineering
Favicon
Analyzing Svenskalag Data using DBT and DuckDB
Favicon
Becoming an Analytics Engineer I
Favicon
Final project part 5
Favicon
Visualization in dbt
Favicon
Building a project in DBT
Favicon
DBT (Data Build Tool)
Favicon
Production and CI/CD in dbt
Favicon
Comparing Snowflake Dynamic Tables with dbt
Favicon
A 2024 Explainer dbt Core vs dbt Cloud (Enterprise)
Favicon
Testing and documenting DBT models
Favicon
Introduction to dbt
Favicon
Creating a Self-Service Data Model
Favicon
Simplifying Data Transformation in Redshift: An Approach with DBT and Airflow
Favicon
Avoiding the DBT Monolith
Favicon
How Starburst’s data engineering team builds resilient telemetry data pipelines
Favicon
Building ETL/ELT Pipelines For Data Engineers.
Favicon
Running Transformations on BigQuery using dbt Cloud: step by step
Favicon
Managing UDFs in dbt
Favicon
Building a Modern Data Pipeline: A Deep Dive into Terraform, AWS Lambda and S3, Snowflake, DBT, Mage AI, and Dash
Favicon
End-to-End Data Ingestion, Transformation and Orchestration with Airbyte, dbt and Kestra
Favicon
Build an Open Source LakeHouse with minimun code effort (Spark + Hudi + DBT+ Hivemetastore + Trino)

Featured ones: