Spark memory-linked errors

Amit Singh Rathore
3 min readMay 6, 2024

--

Common memory-related issues in Apache Spark applications

There are many potential reasons for memory problems:

  • Too few shuffle partitions
  • Large broadcast
  • UDFs
  • Window function without PARTITION BY statement
  • Skew
  • Streaming State

Out-of-Memory Errors (OOM)

java.lang.OutOfMemoryError

  • Driver OOM: The Spark driver runs the main program and holds the metadata of the application, broadcast objects, spark plan etc. If the driver’s memory is not properly configured, it can lead to driver OOM errors.
  • Executor OOM: This occurs when an executor runs out of memory while processing data. It could happen if the data processed or cached in memory is larger than the available executor memory.

GC Overhead Limit Exceeded

java.lang.OutOfMemoryError: GC overhead limit exceeded

This error occurs when too much time is spent on garbage collection (GC) activities compared to the actual application processing. It can result in long pauses and slow application performance.

SparkOutOfMemoryError : error while calling spill() on

  • HashedRelation
  • HashAggregateExec
  • No space left of device (spill)
  • There is no enough memory to build hash map
  • HDFS issues
  • Can be due to coalesce(1)
  • Can also be due to window unbound partition

Heap Space Errors

java.lang.OutOfMemoryError: Java heap space

  • Java Heap Space: If the JVM heap space is not configured properly, tasks and data processing can cause the heap space to be exhausted, leading to errors.
  • Off-Heap Space: Allocating memory off-heap (outside the JVM heap) can help with reducing GC pauses, but improper configuration can still lead to off-heap memory issues.

Memory Leaks

Improper use of accumulators, closures, or other programming constructs can lead to memory leaks in Spark applications. This can gradually consume all available memory.

  • Avoid loop(s) over a Spark Dataframe
  • Don’t chain custom functions manually, use transform
  • Enable spark.executor.extraJavaOptions: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/hdfs/path/heapDumps
  • Don’t pass around the large object(s) to function(s) getting executed on the executor, rather broadcast and get the object in code using bc_object.value
  • Ref: https://blog.devgenius.io/debugging-a-memory-leak-in-spark-application-22140630877d

Memory-Intensive Transformations and Actions

Certain transformations and actions, such as collect(), broadcast, can bring large amounts of data to the driver or cause data shuffling, leading to excessive memory usage and failures.

There are scenarios where a transformation/operation can lead to the explosion of data size. JSON flattening, explode, cartesian joins, these are some examples of data size increase during job.

Certain operations need extra memory to do their job. E.g. some Joins need the external sorter, too many chained withcolumn needs extra memory for all intermediate dataframe objects. These can also cause memory errors.

Broadcast Variables

While broadcast variables can reduce data shuffling, they require memory on each executor & driver. Broadcasting large data can consume excessive memory on the drivers/executors.

Data Skew

In distributed processing, data skew occurs when some partitions have significantly more data than others. This can some executors to have more memory pressure, and leading to OOM errors.

Skew might also happen when doing window operations if the boundary for the window is not given correctly.

Task Deserialization Errors

If the size of serialized data sent to tasks exceeds the task’s available memory, deserialization errors can occur. This can happen when the data being sent between stages is too large.

To address memory-related issues follow these:

  • Use Spark UI to Monitor your application’s memory footprint
  • Tune the memory configuration for executors and the driver based on workload characteristics. Do not use one-size-fits-all configurations.
  • Minimize data shuffling by using suitable operations, and broadcast variables.
  • Handle skew by using salting, repartitioning, and enabling AQE.
  • Use caching wisely and consider memory overhead when configuring memory settings.
  • Profile and monitor garbage collection to prevent GC overhead issues.
  • Use data compression techniques to reduce memory consumption.

Total size of serialized results of 36 tasks (1038.9 MiB) is bigger than spark.driver.maxResultSize 1024.0 MiB

Spark internally uses accumulators for TaskMetrics & SQLMetrics. These are used to get the stats of Task / Stage / Job / SQL level metrics. The same is also used in physical plans and Spark Web UI. Sometimes when running complex SQL operations with a large number of tasks in one stage, the accumulator size might exceed the default 1G maxResultSize limit and result in the above-mentioned error.

Spark memory tunning related configuration

  • spark.memory.fraction
  • spark.memory.storageFraction
  • spark.executor.memoryOverhead
  • spark.memory.offHeap.enabled
  • spark.memory.offHeap.size
  • spark.executor.extraJavaOptions
  • spark.shuffle.compress=true
  • spark.shuffle.spill.compress=true
  • spark.broadcast.compress=true
  • spark.checkpoint.compress=true

--

--

Amit Singh Rathore

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML