Skip to content

Commit cf13b79

Browse files
Merge pull request #115559 from hrasheed-msft/hdi_underperf_docs
reorganizing apache spark optimization
2 parents 99d5059 + cf4e3f2 commit cf13b79

File tree

6 files changed

+300
-206
lines changed

6 files changed

+300
-206
lines changed

articles/hdinsight/TOC.yml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,18 @@
330330
href: ./spark/spark-best-practices.md
331331
- name: Configure Apache Spark settings
332332
href: ./spark/apache-spark-settings.md
333-
- name: Optimize Apache Spark jobs
334-
href: ./spark/apache-spark-perf.md
333+
- name: Optimization
334+
items:
335+
- name: Optimize Apache Spark jobs
336+
href: ./spark/apache-spark-perf.md
337+
- name: Optimize data processing
338+
href: ./spark/optimize-data-processing.md
339+
- name: Optimize data storage
340+
href: ./spark/optimize-data-storage.md
341+
- name: Optimize memory usage
342+
href: ./spark/optimize-memory-usage.md
343+
- name: Optimize cluster configuration
344+
href: ./spark/optimize-cluster-configuration.md
335345
- name: Migrate to Spark 2.3 or 2.4
336346
href: ./spark/migrate-versions.md
337347
- name: How to

articles/hdinsight/spark/apache-spark-perf.md

Lines changed: 11 additions & 204 deletions
Original file line numberDiff line numberDiff line change
@@ -6,222 +6,29 @@ ms.author: hrasheed
66
ms.reviewer: jasonh
77
ms.service: hdinsight
88
ms.topic: conceptual
9-
ms.custom: hdinsightactive,seoapr2020
10-
ms.date: 04/17/2020
9+
ms.date: 05/21/2020
1110
---
12-
1311
# Optimize Apache Spark jobs in HDInsight
1412

15-
Learn how to optimize Apache Spark cluster configuration for your particular workload. The most common challenge is memory pressure, because of improper configurations (such as wrong-sized executors). Also, long-running operations, and tasks that result in Cartesian operations. You can speed up jobs with appropriate caching, and by allowing for [data skew](#optimize-joins-and-shuffles). For best performance, monitor and review long-running and resource-consuming Spark job executions. For information on getting started with Apache Spark on HDInsight, see [Create Apache Spark cluster using Azure portal](apache-spark-jupyter-spark-sql-use-portal.md).
16-
17-
The following sections describe common Spark job optimizations and recommendations.
18-
19-
## Choose the data abstraction
20-
21-
Earlier Spark versions use RDDs to abstract data, Spark 1.3, and 1.6 introduced DataFrames and DataSets, respectively. Consider the following relative merits:
22-
23-
* **DataFrames**
24-
* Best choice in most situations.
25-
* Provides query optimization through Catalyst.
26-
* Whole-stage code generation.
27-
* Direct memory access.
28-
* Low garbage collection (GC) overhead.
29-
* Not as developer-friendly as DataSets, as there are no compile-time checks or domain object programming.
30-
* **DataSets**
31-
* Good in complex ETL pipelines where the performance impact is acceptable.
32-
* Not good in aggregations where the performance impact can be considerable.
33-
* Provides query optimization through Catalyst.
34-
* Developer-friendly by providing domain object programming and compile-time checks.
35-
* Adds serialization/deserialization overhead.
36-
* High GC overhead.
37-
* Breaks whole-stage code generation.
38-
* **RDDs**
39-
* You don't need to use RDDs, unless you need to build a new custom RDD.
40-
* No query optimization through Catalyst.
41-
* No whole-stage code generation.
42-
* High GC overhead.
43-
* Must use Spark 1.x legacy APIs.
44-
45-
## Use optimal data format
46-
47-
Spark supports many formats, such as csv, json, xml, parquet, orc, and avro. Spark can be extended to support many more formats with external data sources - for more information, see [Apache Spark packages](https://spark-packages.org).
48-
49-
The best format for performance is parquet with *snappy compression*, which is the default in Spark 2.x. Parquet stores data in columnar format, and is highly optimized in Spark.
50-
51-
## Select default storage
52-
53-
When you create a new Spark cluster, you can select Azure Blob Storage or Azure Data Lake Storage as your cluster's default storage. Both options give you the benefit of long-term storage for transient clusters. So your data doesn't get automatically deleted when you delete your cluster. You can recreate a transient cluster and still access your data.
54-
55-
| Store Type | File System | Speed | Transient | Use Cases |
56-
| --- | --- | --- | --- | --- |
57-
| Azure Blob Storage | **wasb:**//url/ | **Standard** | Yes | Transient cluster |
58-
| Azure Blob Storage (secure) | **wasbs:**//url/ | **Standard** | Yes | Transient cluster |
59-
| Azure Data Lake Storage Gen 2| **abfs:**//url/ | **Faster** | Yes | Transient cluster |
60-
| Azure Data Lake Storage Gen 1| **adl:**//url/ | **Faster** | Yes | Transient cluster |
61-
| Local HDFS | **hdfs:**//url/ | **Fastest** | No | Interactive 24/7 cluster |
62-
63-
For a full description of storage options, see [Compare storage options for use with Azure HDInsight clusters](../hdinsight-hadoop-compare-storage-options.md).
64-
65-
## Use the cache
66-
67-
Spark provides its own native caching mechanisms, which can be used through different methods such as `.persist()`, `.cache()`, and `CACHE TABLE`. This native caching is effective with small data sets and in ETL pipelines where you need to cache intermediate results. However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. A more generic and reliable caching technique is *storage layer caching*.
68-
69-
* Native Spark caching (not recommended)
70-
* Good for small datasets.
71-
* Doesn't work with partitioning, which may change in future Spark releases.
72-
73-
* Storage level caching (recommended)
74-
* Can be implemented on HDInsight using the [IO Cache](apache-spark-improve-performance-iocache.md) feature.
75-
* Uses in-memory and SSD caching.
76-
77-
* Local HDFS (recommended)
78-
* `hdfs://mycluster` path.
79-
* Uses SSD caching.
80-
* Cached data will be lost when you delete the cluster, requiring a cache rebuild.
81-
82-
## Use memory efficiently
83-
84-
Spark operates by placing data in memory. So managing memory resources is a key aspect of optimizing the execution of Spark jobs. There are several techniques you can apply to use your cluster's memory efficiently.
85-
86-
* Prefer smaller data partitions and account for data size, types, and distribution in your partitioning strategy.
87-
* Consider the newer, more efficient [`Kryo data serialization`](https://github.com/EsotericSoftware/kryo), rather than the default Java serialization.
88-
* Prefer using YARN, as it separates `spark-submit` by batch.
89-
* Monitor and tune Spark configuration settings.
90-
91-
For your reference, the Spark memory structure and some key executor memory parameters are shown in the next image.
92-
93-
### Spark memory considerations
94-
95-
If you're using Apache Hadoop YARN, then YARN controls the memory used by all containers on each Spark node. The following diagram shows the key objects and their relationships.
96-
97-
![YARN Spark Memory Management](./media/apache-spark-perf/apache-yarn-spark-memory.png)
98-
99-
To address 'out of memory' messages, try:
100-
101-
* Review DAG Management Shuffles. Reduce by map-side reducting, pre-partition (or bucketize) source data, maximize single shuffles, and reduce the amount of data sent.
102-
* Prefer `ReduceByKey` with its fixed memory limit to `GroupByKey`, which provides aggregations, windowing, and other functions but it has ann unbounded memory limit.
103-
* Prefer `TreeReduce`, which does more work on the executors or partitions, to `Reduce`, which does all work on the driver.
104-
* Use DataFrames rather than the lower-level RDD objects.
105-
* Create ComplexTypes that encapsulate actions, such as "Top N", various aggregations, or windowing operations.
106-
107-
For additional troubleshooting steps, see [OutOfMemoryError exceptions for Apache Spark in Azure HDInsight](apache-spark-troubleshoot-outofmemory.md).
108-
109-
## Optimize data serialization
110-
111-
Spark jobs are distributed, so appropriate data serialization is important for the best performance. There are two serialization options for Spark:
112-
113-
* Java serialization is the default.
114-
* `Kryo` serialization is a newer format and can result in faster and more compact serialization than Java. `Kryo` requires that you register the classes in your program, and it doesn't yet support all Serializable types.
115-
116-
## Use bucketing
117-
118-
Bucketing is similar to data partitioning. But each bucket can hold a set of column values rather than just one. This method works well for partitioning on large (in the millions or more) numbers of values, such as product identifiers. A bucket is determined by hashing the bucket key of the row. Bucketed tables offer unique optimizations because they store metadata about how they were bucketed and sorted.
119-
120-
Some advanced bucketing features are:
121-
122-
* Query optimization based on bucketing meta-information.
123-
* Optimized aggregations.
124-
* Optimized joins.
125-
126-
You can use partitioning and bucketing at the same time.
127-
128-
## Optimize joins and shuffles
129-
130-
If you have slow jobs on a Join or Shuffle, the cause is probably *data skew*. Data skew is asymmetry in your job data. For example, a map job may take 20 seconds. But running a job where the data is joined or shuffled takes hours. To fix data skew, you should salt the entire key, or use an *isolated salt* for only some subset of keys. If you're using an isolated salt, you should further filter to isolate your subset of salted keys in map joins. Another option is to introduce a bucket column and pre-aggregate in buckets first.
131-
132-
Another factor causing slow joins could be the join type. By default, Spark uses the `SortMerge` join type. This type of join is best suited for large data sets. But is otherwise computationally expensive because it must first sort the left and right sides of data before merging them.
133-
134-
A `Broadcast` join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general.
135-
136-
You can change the join type in your configuration by setting `spark.sql.autoBroadcastJoinThreshold`, or you can set a join hint using the DataFrame APIs (`dataframe.join(broadcast(df2))`).
137-
138-
```scala
139-
// Option 1
140-
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)
141-
142-
// Option 2
143-
val df1 = spark.table("FactTableA")
144-
val df2 = spark.table("dimMP")
145-
df1.join(broadcast(df2), Seq("PK")).
146-
createOrReplaceTempView("V_JOIN")
147-
148-
sql("SELECT col1, col2 FROM V_JOIN")
149-
```
150-
151-
If you're using bucketed tables, then you have a third join type, the `Merge` join. A correctly pre-partitioned and pre-sorted dataset will skip the expensive sort phase from a `SortMerge` join.
152-
153-
The order of joins matters, particularly in more complex queries. Start with the most selective joins. Also, move joins that increase the number of rows after aggregations when possible.
154-
155-
To manage parallelism for Cartesian joins, you can add nested structures, windowing, and perhaps skip one or more steps in your Spark Job.
156-
157-
## Customize cluster configuration
158-
159-
Depending on your Spark cluster workload, you may determine a non-default Spark configuration would result in more optimized Spark job execution. Do benchmark testing with sample workloads to validate any non-default cluster configurations.
160-
161-
Here are some common parameters you can adjust:
162-
163-
|Parameter |Description |
164-
|---|---|
165-
|--num-executors|Sets the appropriate number of executors.|
166-
|--executor-cores|Sets the number of cores for each executor. Typically you should have middle-sized executors, as other processes consume some of the available memory.|
167-
|--executor-memory|Sets the memory size for each executor, which controls the heap size on YARN. Leave some memory for execution overhead.|
168-
169-
### Select the correct executor size
170-
171-
When deciding your executor configuration, consider the Java garbage collection (GC) overhead.
172-
173-
* Factors to reduce executor size:
174-
1. Reduce heap size below 32 GB to keep GC overhead < 10%.
175-
2. Reduce the number of cores to keep GC overhead < 10%.
176-
177-
* Factors to increase executor size:
178-
1. Reduce communication overhead between executors.
179-
2. Reduce the number of open connections between executors (N2) on larger clusters (>100 executors).
180-
3. Increase heap size to accommodate for memory-intensive tasks.
181-
4. Optional: Reduce per-executor memory overhead.
182-
5. Optional: Increase usage and concurrency by oversubscribing CPU.
183-
184-
As a general rule, when selecting the executor size:
185-
186-
1. Start with 30 GB per executor and distribute available machine cores.
187-
2. Increase the number of executor cores for larger clusters (> 100 executors).
188-
3. Modify size based both on trial runs and on the preceding factors such as GC overhead.
189-
190-
When running concurrent queries, consider:
191-
192-
1. Start with 30 GB per executor and all machine cores.
193-
2. Create multiple parallel Spark applications by oversubscribing CPU (around 30% latency improvement).
194-
3. Distribute queries across parallel applications.
195-
4. Modify size based both on trial runs and on the preceding factors such as GC overhead.
196-
197-
For more information on using Ambari to configure executors, see [Apache Spark settings - Spark executors](apache-spark-settings.md#configuring-spark-executors).
198-
199-
Monitor query performance for outliers or other performance issues, by looking at the timeline view. Also SQL graph, job statistics, and so forth. For information on debugging Spark jobs using YARN and the Spark History server, see [Debug Apache Spark jobs running on Azure HDInsight](apache-spark-job-debugging.md). For tips on using YARN Timeline Server, see [Access Apache Hadoop YARN application logs](../hdinsight-hadoop-access-yarn-app-logs-linux.md).
200-
201-
Sometimes one or a few of the executors are slower than the others, and tasks take much longer to execute. This slowness frequently happens on larger clusters (> 30 nodes). In this case, divide the work into a larger number of tasks so the scheduler can compensate for slow tasks. For example, have at least twice as many tasks as the number of executor cores in the application. You can also enable speculative execution of tasks with `conf: spark.speculation = true`.
13+
This article provides an overview of strategies to optimize Apache Spark jobs on Azure HDInsight.
20214

203-
## Optimize job execution
15+
## Overview
20416

205-
* Cache as necessary, for example if you use the data twice, then cache it.
206-
* Broadcast variables to all executors. The variables are only serialized once, resulting in faster lookups.
207-
* Use the thread pool on the driver, which results in faster operation for many tasks.
17+
The performance of your Apache Spark jobs depends on multiple factors. These performance factors include: how your data is stored, how the cluster is configured, and the operations that are used when processing the data.
20818

209-
Monitor your running jobs regularly for performance issues. If you need more insight into certain issues, consider one of the following performance profiling tools:
19+
Common challenges you might face include memory constraints due to improperly sized executors, long-running operations, and tasks that result in Cartesian operations.
21020

211-
* [Intel PAL Tool](https://github.com/intel-hadoop/PAT) monitors CPU, storage, and network bandwidth usage.
212-
* [Oracle Java 8 Mission Control](https://www.oracle.com/technetwork/java/javaseproducts/mission-control/java-mission-control-1998576.html) profiles Spark and executor code.
21+
There are also various strategies that can help you overcome these challenges, such as caching, and allowing for data skew.
21322

214-
Key to Spark 2.x query performance is the Tungsten engine, which depends on whole-stage code generation. In some cases, whole-stage code generation may be disabled. For example, if you use a non-mutable type (`string`) in the aggregation expression, `SortAggregate` appears instead of `HashAggregate`. For example, for better performance, try the following and then re-enable code generation:
23+
In each of the following articles, you can find common challenges and solutions for a different aspect of spark optimization.
21524

216-
```sql
217-
MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))
218-
```
25+
* [Optimize data storage](optimize-data-storage.md)
26+
* [Optimize data processing](optimize-data-processing.md)
27+
* [Optimize memory usage](optimize-memory-usage.md)
28+
* [Optimize cluster configuration](optimize-cluster-configuration.md)
21929

22030
## Next steps
22131

22232
* [Debug Apache Spark jobs running on Azure HDInsight](apache-spark-job-debugging.md)
22333
* [Manage resources for an Apache Spark cluster on HDInsight](apache-spark-resource-manager.md)
22434
* [Configure Apache Spark settings](apache-spark-settings.md)
225-
* [Tuning Apache Spark](https://spark.apache.org/docs/latest/tuning.html)
226-
* [How to Actually Tune Your Apache Spark Jobs So They Work](https://www.slideshare.net/ilganeli/how-to-actually-tune-your-spark-jobs-so-they-work)
227-
* [`Kryo Serialization`](https://github.com/EsotericSoftware/kryo)

0 commit comments

Comments
 (0)