You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: articles/hdinsight/spark/apache-spark-perf.md
+28-26Lines changed: 28 additions & 26 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -5,14 +5,14 @@ author: hrasheed-msft
5
5
ms.author: hrasheed
6
6
ms.reviewer: jasonh
7
7
ms.service: hdinsight
8
-
ms.custom: hdinsightactive
9
8
ms.topic: conceptual
10
-
ms.date: 02/12/2020
9
+
ms.custom: hdinsightactive
10
+
ms.date: 04/17/2020
11
11
---
12
12
13
13
# Optimize Apache Spark jobs in HDInsight
14
14
15
-
Learn how to optimize [Apache Spark](https://spark.apache.org/) cluster configuration for your particular workload. The most common challenge is memory pressure, because of improper configurations (particularly wrong-sized executors), long-running operations, and tasks that result in Cartesian operations. You can speed up jobs with appropriate caching, and by allowing for [data skew](#optimize-joins-and-shuffles). For the best performance, monitor and review long-running and resource-consuming Spark job executions. For information on getting started with Apache Spark on HDInsight, see [Create Apache Spark cluster using Azure portal](apache-spark-jupyter-spark-sql-use-portal.md).
15
+
Learn how to optimize Apache Spark cluster configuration for your particular workload. The most common challenge is memory pressure, because of improper configurations (such as wrong-sized executors). Also, long-running operations, and tasks that result in Cartesian operations. You can speed up jobs with appropriate caching, and by allowing for [data skew](#optimize-joins-and-shuffles). For best performance, monitor and review long-running and resource-consuming Spark job executions. For information on getting started with Apache Spark on HDInsight, see [Create Apache Spark cluster using Azure portal](apache-spark-jupyter-spark-sql-use-portal.md).
16
16
17
17
The following sections describe common Spark job optimizations and recommendations.
18
18
@@ -50,7 +50,7 @@ The best format for performance is parquet with *snappy compression*, which is t
50
50
51
51
## Select default storage
52
52
53
-
When you create a new Spark cluster, you can select Azure Blob Storage or Azure Data Lake Storage as your cluster's default storage. Both options give you the benefit of long-term storage for transient clusters, so your data doesn't get automatically deleted when you delete your cluster. You can recreate a transient cluster and still access your data.
53
+
When you create a new Spark cluster, you can select Azure Blob Storage or Azure Data Lake Storage as your cluster's default storage. Both options give you the benefit of long-term storage for transient clusters. So your data doesn't get automatically deleted when you delete your cluster. You can recreate a transient cluster and still access your data.
54
54
55
55
| Store Type | File System | Speed | Transient | Use Cases |
56
56
| --- | --- | --- | --- | --- |
@@ -60,11 +60,11 @@ When you create a new Spark cluster, you can select Azure Blob Storage or Azure
60
60
| Azure Data Lake Storage Gen 1|**adl:**//url/ |**Faster**| Yes | Transient cluster |
61
61
| Local HDFS |**hdfs:**//url/ |**Fastest**| No | Interactive 24/7 cluster |
62
62
63
-
For a full description of the storage options available for HDInsight clusters, see [Compare storage options for use with Azure HDInsight clusters](../hdinsight-hadoop-compare-storage-options.md).
63
+
For a full description of storage options, see [Compare storage options for use with Azure HDInsight clusters](../hdinsight-hadoop-compare-storage-options.md).
64
64
65
65
## Use the cache
66
66
67
-
Spark provides its own native caching mechanisms, which can be used through different methods such as `.persist()`, `.cache()`, and `CACHE TABLE`. This native caching is effective with small data sets as well as in ETL pipelines where you need to cache intermediate results. However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. A more generic and reliable caching technique is *storage layer caching*.
67
+
Spark provides its own native caching mechanisms, which can be used through different methods such as `.persist()`, `.cache()`, and `CACHE TABLE`. This native caching is effective with small data sets and in ETL pipelines where you need to cache intermediate results. However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. A more generic and reliable caching technique is *storage layer caching*.
68
68
69
69
* Native Spark caching (not recommended)
70
70
* Good for small datasets.
@@ -81,18 +81,18 @@ Spark provides its own native caching mechanisms, which can be used through diff
81
81
82
82
## Use memory efficiently
83
83
84
-
Spark operates by placing data in memory, so managing memory resources is a key aspect of optimizing the execution of Spark jobs. There are several techniques you can apply to use your cluster's memory efficiently.
84
+
Spark operates by placing data in memory. So managing memory resources is a key aspect of optimizing the execution of Spark jobs. There are several techniques you can apply to use your cluster's memory efficiently.
85
85
86
86
* Prefer smaller data partitions and account for data size, types, and distribution in your partitioning strategy.
87
-
* Consider the newer, more efficient [Kryo data serialization](https://github.com/EsotericSoftware/kryo), rather than the default Java serialization.
87
+
* Consider the newer, more efficient [`Kryo data serialization`](https://github.com/EsotericSoftware/kryo), rather than the default Java serialization.
88
88
* Prefer using YARN, as it separates `spark-submit` by batch.
89
89
* Monitor and tune Spark configuration settings.
90
90
91
91
For your reference, the Spark memory structure and some key executor memory parameters are shown in the next image.
92
92
93
93
### Spark memory considerations
94
94
95
-
If you're using [Apache Hadoop YARN](https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html), then YARN controls the maximum sum of memory used by all containers on each Spark node. The following diagram shows the key objects and their relationships.
95
+
If you're using Apache Hadoop YARN, then YARN controls the memory used by all containers on each Spark node. The following diagram shows the key objects and their relationships.
@@ -101,7 +101,7 @@ To address 'out of memory' messages, try:
101
101
* Review DAG Management Shuffles. Reduce by map-side reducting, pre-partition (or bucketize) source data, maximize single shuffles, and reduce the amount of data sent.
102
102
* Prefer `ReduceByKey` with its fixed memory limit to `GroupByKey`, which provides aggregations, windowing, and other functions but it has ann unbounded memory limit.
103
103
* Prefer `TreeReduce`, which does more work on the executors or partitions, to `Reduce`, which does all work on the driver.
104
-
*Leverage DataFrames rather than the lower-level RDD objects.
104
+
*Use DataFrames rather than the lower-level RDD objects.
105
105
* Create ComplexTypes that encapsulate actions, such as "Top N", various aggregations, or windowing operations.
106
106
107
107
For additional troubleshooting steps, see [OutOfMemoryError exceptions for Apache Spark in Azure HDInsight](apache-spark-troubleshoot-outofmemory.md).
@@ -111,11 +111,11 @@ For additional troubleshooting steps, see [OutOfMemoryError exceptions for Apach
111
111
Spark jobs are distributed, so appropriate data serialization is important for the best performance. There are two serialization options for Spark:
112
112
113
113
* Java serialization is the default.
114
-
* Kryo serialization is a newer format and can result in faster and more compact serialization than Java. Kryo requires that you register the classes in your program, and it doesn't yet support all Serializable types.
114
+
*`Kryo` serialization is a newer format and can result in faster and more compact serialization than Java. `Kryo` requires that you register the classes in your program, and it doesn't yet support all Serializable types.
115
115
116
116
## Use bucketing
117
117
118
-
Bucketing is similar to data partitioning, but each bucket can hold a set of column values rather than just one. Bucketing works well for partitioning on large (in the millions or more) numbers of values, such as product identifiers. A bucket is determined by hashing the bucket key of the row. Bucketed tables offer unique optimizations because they store metadata about how they were bucketed and sorted.
118
+
Bucketing is similar to data partitioning. But each bucket can hold a set of column values rather than just one. This method works well for partitioning on large (in the millions or more) numbers of values, such as product identifiers. A bucket is determined by hashing the bucket key of the row. Bucketed tables offer unique optimizations because they store metadata about how they were bucketed and sorted.
119
119
120
120
Some advanced bucketing features are:
121
121
@@ -127,9 +127,9 @@ You can use partitioning and bucketing at the same time.
127
127
128
128
## Optimize joins and shuffles
129
129
130
-
If you have slow jobs on a Join or Shuffle, the cause is probably *data skew*, which is asymmetry in your job data. For example, a map job may take 20 seconds, but running a job where the data is joined or shuffled takes hours. To fix data skew, you should salt the entire key, or use an *isolated salt* for only some subset of keys. If you're using an isolated salt, you should further filter to isolate your subset of salted keys in map joins. Another option is to introduce a bucket column and pre-aggregate in buckets first.
130
+
If you have slow jobs on a Join or Shuffle, the cause is probably *data skew*. Data skew is asymmetry in your job data. For example, a map job may take 20 seconds. But running a job where the data is joined or shuffled takes hours. To fix data skew, you should salt the entire key, or use an *isolated salt* for only some subset of keys. If you're using an isolated salt, you should further filter to isolate your subset of salted keys in map joins. Another option is to introduce a bucket column and pre-aggregate in buckets first.
131
131
132
-
Another factor causing slow joins could be the join type. By default, Spark uses the `SortMerge` join type. This type of join is best suited for large data sets, but is otherwise computationally expensive because it must first sort the left and right sides of data before merging them.
132
+
Another factor causing slow joins could be the join type. By default, Spark uses the `SortMerge` join type. This type of join is best suited for large data sets. But is otherwise computationally expensive because it must first sort the left and right sides of data before merging them.
133
133
134
134
A `Broadcast` join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general.
135
135
@@ -156,13 +156,15 @@ To manage parallelism for Cartesian joins, you can add nested structures, window
156
156
157
157
## Customize cluster configuration
158
158
159
-
Depending on your Spark cluster workload, you may determine that a non-default Spark configuration would result in more optimized Spark job execution. Perform benchmark testing with sample workloads to validate any non-default cluster configurations.
159
+
Depending on your Spark cluster workload, you may determine a non-default Spark configuration would result in more optimized Spark job execution. Do benchmark testing with sample workloads to validate any non-default cluster configurations.
160
160
161
161
Here are some common parameters you can adjust:
162
162
163
-
*`--num-executors` sets the appropriate number of executors.
164
-
*`--executor-cores` sets the number of cores for each executor. Typically you should have middle-sized executors, as other processes consume some of the available memory.
165
-
*`--executor-memory` sets the memory size for each executor, which controls the heap size on YARN. You should leave some memory for execution overhead.
163
+
|Parameter |Description |
164
+
|---|---|
165
+
|--num-executors|Sets the appropriate number of executors.|
166
+
|--executor-cores|Sets the number of cores for each executor. Typically you should have middle-sized executors, as other processes consume some of the available memory.|
167
+
|--executor-memory|Sets the memory size for each executor, which controls the heap size on YARN. Leave some memory for execution overhead.|
166
168
167
169
### Select the correct executor size
168
170
@@ -177,15 +179,15 @@ When deciding your executor configuration, consider the Java garbage collection
177
179
2. Reduce the number of open connections between executors (N2) on larger clusters (>100 executors).
178
180
3. Increase heap size to accommodate for memory-intensive tasks.
179
181
4. Optional: Reduce per-executor memory overhead.
180
-
5. Optional: Increase utilization and concurrency by oversubscribing CPU.
182
+
5. Optional: Increase usage and concurrency by oversubscribing CPU.
181
183
182
-
As a general rule of thumb when selecting the executor size:
184
+
As a general rule, when selecting the executor size:
183
185
184
186
1. Start with 30 GB per executor and distribute available machine cores.
185
187
2. Increase the number of executor cores for larger clusters (> 100 executors).
186
188
3. Modify size based both on trial runs and on the preceding factors such as GC overhead.
187
189
188
-
When running concurrent queries, consider the following:
190
+
When running concurrent queries, consider:
189
191
190
192
1. Start with 30 GB per executor and all machine cores.
191
193
2. Create multiple parallel Spark applications by oversubscribing CPU (around 30% latency improvement).
@@ -194,9 +196,9 @@ When running concurrent queries, consider the following:
194
196
195
197
For more information on using Ambari to configure executors, see [Apache Spark settings - Spark executors](apache-spark-settings.md#configuring-spark-executors).
196
198
197
-
Monitor your query performance for outliers or other performance issues, by looking at the timeline view, SQL graph, job statistics, and so forth. For information on debugging Spark jobs using YARN and the Spark History server, see [Debug Apache Spark jobs running on Azure HDInsight](apache-spark-job-debugging.md). For tips on using YARN Timeline Server, see [Access Apache Hadoop YARN application logs](../hdinsight-hadoop-access-yarn-app-logs-linux.md).
199
+
Monitor query performance for outliers or other performance issues, by looking at the timeline view. Also SQL graph, job statistics, and so forth. For information on debugging Spark jobs using YARN and the Spark History server, see [Debug Apache Spark jobs running on Azure HDInsight](apache-spark-job-debugging.md). For tips on using YARN Timeline Server, see [Access Apache Hadoop YARN application logs](../hdinsight-hadoop-access-yarn-app-logs-linux.md).
198
200
199
-
Sometimes one or a few of the executors are slower than the others, and tasks take much longer to execute. This frequently happens on larger clusters (> 30 nodes). In this case, divide the work into a larger number of tasks so the scheduler can compensate for slow tasks. For example, have at least twice as many tasks as the number of executor cores in the application. You can also enable speculative execution of tasks with `conf: spark.speculation = true`.
201
+
Sometimes one or a few of the executors are slower than the others, and tasks take much longer to execute. This slowness frequently happens on larger clusters (> 30 nodes). In this case, divide the work into a larger number of tasks so the scheduler can compensate for slow tasks. For example, have at least twice as many tasks as the number of executor cores in the application. You can also enable speculative execution of tasks with `conf: spark.speculation = true`.
200
202
201
203
## Optimize job execution
202
204
@@ -206,7 +208,7 @@ Sometimes one or a few of the executors are slower than the others, and tasks ta
206
208
207
209
Monitor your running jobs regularly for performance issues. If you need more insight into certain issues, consider one of the following performance profiling tools:
208
210
209
-
*[Intel PAL Tool](https://github.com/intel-hadoop/PAT) monitors CPU, storage, and network bandwidth utilization.
211
+
*[Intel PAL Tool](https://github.com/intel-hadoop/PAT) monitors CPU, storage, and network bandwidth usage.
210
212
*[Oracle Java 8 Mission Control](https://www.oracle.com/technetwork/java/javaseproducts/mission-control/java-mission-control-1998576.html) profiles Spark and executor code.
211
213
212
214
Key to Spark 2.x query performance is the Tungsten engine, which depends on whole-stage code generation. In some cases, whole-stage code generation may be disabled. For example, if you use a non-mutable type (`string`) in the aggregation expression, `SortAggregate` appears instead of `HashAggregate`. For example, for better performance, try the following and then re-enable code generation:
@@ -219,7 +221,7 @@ MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))
219
221
220
222
*[Debug Apache Spark jobs running on Azure HDInsight](apache-spark-job-debugging.md)
221
223
*[Manage resources for an Apache Spark cluster on HDInsight](apache-spark-resource-manager.md)
222
-
*[Use the Apache Spark REST API to submit remote jobs to an Apache Spark cluster](apache-spark-livy-rest-interface.md)
0 commit comments