Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions docs/details/error-driver-out-of-memory.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
#Driver ran out of memory

IF you see ```java.lang.OutOfMemoryError:``` in the driver log/stderr, it is most likely from driver JVM running out of memory. [This](https://manuals.netflix.net/view/sparkdocs/mkdocs/master/memory-configuration/#driver-ran-out-of-jvm-memory) article has the memory config for increasing the driver memory. One reason you could run into this error is
if you are reading from a table with too many splits(s3 files) and overwhelming the driver with a lot of metadata.
### Symptom
I see ```java.lang.OutOfMemoryError:``` in the driver log/stderr

OOMs can happen during different stages of the Spark app.

### Potential cause: too many splits on read

Another cause for driver out of memory errors is when the number of partitions is too high and you trigger a `sort` or `shuffle` where Spark samples the data, but then runs out of memory while collecting the sample. To solve this `repartition` to a lower number of partitions or if your in RDDs `coalesce` is a more efficent option (in DataFrames coalesce can have impact upstream in the query plan).
If you see ```java.lang.OutOfMemoryError:``` in the driver log/stderr, it is most likely from driver JVM running out of memory. [This](https://manuals.netflix.net/view/sparkdocs/mkdocs/master/memory-configuration/#driver-ran-out-of-jvm-memory) article has the memory config for increasing the driver memory. One reason you could run into this error is if you are reading from a table with too many splits(s3 files) and overwhelming the driver with a lot of metadata. So adjust the driver memory as a band-aid, and consider fewer splits and other ways to reduce memory consumption as a short term fix.


### Potential cause: OOM while running a `collect` action

Check if you're running a `collect` or similar action, which is a frequent cause of driver OOMs as `collect` sends the entire RDD or DF to the driver. If you must collect the entire dataset to the driver, reduce the input dataset to a size that fits into the driver memory. [This](https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/dont_call_collect_on_a_very_large_rdd.html) article has a couple more suggestions.


### Potential cause: OOM while sampling data

Another potential cause for ```java.lang.OutOfMemoryError:``` could be the driver runs out of memory while obtaining a data sample. Perhaps you're working with an RDD or DF in which the number of partitions is too high (eg 20,000) and you trigger a `sort` or `shuffle` where Spark samples some data from each partition. In this case the driver can run out of memory while sampling the data even though your code does NOT `collect` the entire dataset to the driver. To solve this `repartition` to a lower number of partitions. Coalesce would be a good option if you're using RDDs. However if you're using DataFrames, coalesce might not be the best option as coalesce can have impacts upstream in the query plan.


These are three of the common causes for driver OOMs. Submit your (least) favorite driver OOM to the Spark-Flowchart for inclusion here!
11 changes: 11 additions & 0 deletions docs/details/slow-job-slow-cluster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### Slow Cluster

How do I know if and when my job is waiting for cluster resources??


Sometimes the cluster manager may choke or otherwise not be able to allocate resources and we don't have a good way of detecting this situation making it difficult for the user to debug and tell apart from Spark not scaling up correctly.

As of Spark3.4, an executor will note when and for how long it waits for cluster resources. Check the JVM metrics for this information.

### Reference link:
https://issues.apache.org/jira/browse/SPARK-36664
2 changes: 1 addition & 1 deletion docs/details/slow-job.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Spark job can be slow for various reasons but here is a couple of reasons

1. Slow stage(s): Go to [Slow Stage](../slow_stage/) section to identify the slow stage. In most cases, a job is slow because one or more of the stages are slow.
1. Slow stage(s): Go to [Slow Stage](../slow-stage/) section to identify the slow stage. In most cases, a job is slow because one or more of the stages are slow.
2. Too big DAG: Go to [TooBigDAG](../toobigdag/) section for more details on this topic


70 changes: 69 additions & 1 deletion docs/details/slow-stage.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,77 @@
# Identify the slow stage

### When you have an event log from an earlier "good run"

You can compare the slow and the fast runs.
For this you can even use your local pyspark and calculate a ratio between slow and fast run for each stage metrics:

```
# Helper methods (just copy-paste it)

def createEventView(eventLogFile, eventViewName):
sql("CREATE OR REPLACE TEMPORARY VIEW {} USING org.apache.spark.sql.json OPTIONS (path '{}')".format(eventViewName, eventLogFile))


def createStageMetricsView(eventViewName, stageMetricsViewName):
sql("CREATE OR REPLACE TEMPORARY VIEW {} AS select `Submission Time`, `Completion Time`, `Stage ID`, t3.col.* from (select `Stage Info`.* from {} where Event='SparkListenerStageCompleted') lateral view explode(Accumulables) t3".format(stageMetricsViewName, eventViewName))


def showDiffInStage(fastStagesTable, slowStagesTable, stageID):
sql("select {fastStages}.Name, {fastStages}.Value as Fast, {slowStages}.Value as Slow, {slowStages}.Value / {fastStages}.Value as `Slow / Fast` from {fastStages} INNER JOIN {slowStages} ON {fastStages}.ID = {slowStages}.ID where {fastStages}.`Stage ID` = {stageID} and {slowStages}.`Stage ID` = {stageID}".format(fastStages=fastStagesTable, slowStages=slowStagesTable, stageID=stageID)).show(40, False)


# Creating the views from the event logs (just an example, you have to specify your own paths)

createEventView("<path_to_the_fast_run_event_log>", "FAST_EVENTS")
createStageMetricsView("FAST_EVENTS", "FAST_STAGE_METRICS")

createEventView("<path_to_the_slow_run_event_log>", "SLOW_EVENTS")
createStageMetricsView("SLOW_EVENTS", "SLOW_STAGE_METRICS")

>>> sql("SELECT DISTINCT `Stage ID` from FAST_STAGE_METRICS").show()
+--------+
|Stage ID|
+--------+
| 0|
| 1|
| 2|
+--------+

>>> sql("SELECT DISTINCT `Stage ID` from SLOW_STAGE_METRICS").show()
+--------+
|Stage ID|
+--------+
| 0|
| 1|
| 2|
+--------+

>>> showDiffInStage("FAST_STAGE_METRICS", "SLOW_STAGE_METRICS", 2)
+-------------------------------------------+-------------+-------------+------------------+
|Name |Fast |Slow |Slow / Fast |
+-------------------------------------------+-------------+-------------+------------------+
|scan time total (min, med, max) |1095931 |1628308 |1.485776020570638 |
|internal.metrics.executorRunTime |7486648 |12990126 |1.735105750931525 |
|duration total (min, med, max) |7017645 |12322243 |1.7558943206731032|
|internal.metrics.jvmGCTime |220325 |1084412 |4.921874503574266 |
|internal.metrics.output.bytesWritten |34767744411 |34767744411 |1.0 |
|internal.metrics.input.recordsRead |149652381 |149652381 |1.0 |
|internal.metrics.executorDeserializeCpuTime|5666230304 |7760682789 |1.3696377260771504|
|internal.metrics.resultSize |625598 |626415 |1.0013059504665935|
|internal.metrics.executorCpuTime |6403420405851|8762799691603|1.3684560963069305|
|internal.metrics.input.bytesRead |69488204276 |69488204276 |1.0 |
|number of output rows |149652381 |149652381 |1.0 |
|internal.metrics.resultSerializationTime |36 |72 |2.0 |
|internal.metrics.output.recordsWritten |149652381 |149652381 |1.0 |
|internal.metrics.executorDeserializeTime |6024 |11954 |1.9843957503320053|
+-------------------------------------------+-------------+-------------+------------------+
```

### When there is no event log from a good run

Steps:

1. Navigate to Spark UI using spark history URL(found in genie stderr)
1. Navigate to Spark UI using spark history URL
2. Click on `Stages` and sort the stages(click on `Duration`) in descending order to find the longest running stage.

![IdentifySlowStage](../imgs/identify-slow-stage.png)
Expand Down
3 changes: 3 additions & 0 deletions docs/flowchart/slow.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ SlowStage --> SlowReduce[Slow Shuffle/Reducer/Exchange]
SlowStage --> SLOWWRITESTOSTORAGE[Slow writes to storage]

SlowJob --> TOOBIGDAG[Too Big DAG]
SlowJob --> SlowCluster[Slow Cluster]

SlowReduce --> PAGGS[Partial aggregates]

Expand All @@ -25,6 +26,8 @@ click SlowJob "../../details/slow-job"
click SlowStage "../../details/slow-stage"
click SlowMap "../../details/slow-map"
click SlowReduce "../../details/slow-reduce"
click SlowCluster "../../details/slow-job-slow-cluster"
click TOOBIGDAG "../../details/toobigdag"

click TooFewShuffleTasks "../../details/slow-reduce/#not-enough-shuffle-tasks"
click TooManyShuffleTasks "../../details/slow-reduce/#too-many-shuffle-tasks"
Expand Down