Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -517,10 +517,19 @@ object CometConf extends ShimCometConf {
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not limiting the available memory pools?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, but as a separate PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops .. this was actually a copy-paste error - these are the on-heap pools. Updated.

"and `unbounded`. When running Spark in off-heap mode, available pool types are " +
"'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` " +
s"for on-heap mode and `unified` for off-heap mode. $TUNING_GUIDE.")
s"for on-heap mode and `fair_unified` for off-heap mode. $TUNING_GUIDE.")
.stringConf
.createWithDefault("default")

val COMET_EXEC_MEMORY_POOL_FRACTION: ConfigEntry[Double] =
conf("spark.comet.exec.memoryPool.fraction")
.doc(
"Fraction of off-heap memory pool that is available to Comet. " +
"Only applies to off-heap mode. " +
s"$TUNING_GUIDE.")
.doubleConf
.createWithDefault(1.0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is 1.0 so that this change is not a breaking change


val COMET_SCAN_PREFETCH_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.scan.preFetch.enabled")
.doc("Whether to enable pre-fetching feature of CometScan.")
Expand Down
3 changes: 2 additions & 1 deletion docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ Comet provides the following configuration settings.
| spark.comet.exec.globalLimit.enabled | Whether to enable globalLimit by default. | true |
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.memoryPool | The type of memory pool to be used for Comet native execution. When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', 'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', and `unbounded`. When running Spark in off-heap mode, available pool types are 'greedy_unified' and `fair_unified`. The default pool type is `greedy_task_shared` for on-heap mode and `fair_unified` for off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | default |
| spark.comet.exec.memoryPool.fraction | Fraction of off-heap memory pool that is available to Comet. Only applies to off-heap mode. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | 1.0 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.compression.codec | The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and snappy are supported. Compression can be disabled by setting spark.shuffle.compress=false. | lz4 |
Expand Down
55 changes: 40 additions & 15 deletions docs/source/user-guide/latest/tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,17 @@ and requiring shuffle memory to be separately configured.
The recommended way to allocate memory for Comet is to set `spark.memory.offHeap.enabled=true`. This allows
Comet to share an off-heap memory pool with Spark, reducing the overall memory overhead. The size of the pool is
specified by `spark.memory.offHeap.size`. For more details about Spark off-heap memory mode, please refer to
Spark documentation: https://spark.apache.org/docs/latest/configuration.html.
[Spark documentation]. For full details on configuring Comet memory in off-heap mode, see the [Advanced Memory Tuning]
section of this guide.

[Spark documentation]: https://spark.apache.org/docs/latest/configuration.html

### Configuring Comet Memory in On-Heap Mode

```{warning}
Support for on-heap memory pools is deprecated and will be removed from a future release.
```

When running in on-heap mode, Comet memory can be allocated by setting `spark.comet.memoryOverhead`. If this setting
is not provided, it will be calculated by multiplying the current Spark executor memory by
`spark.comet.memory.overhead.factor` (default value is `0.2`) which may or may not result in enough memory for
Expand All @@ -59,10 +66,13 @@ Comet supports native shuffle and columnar shuffle (these terms are explained in
In on-heap mode, columnar shuffle memory must be separately allocated using `spark.comet.columnar.shuffle.memorySize`.
If this setting is not provided, it will be calculated by multiplying `spark.comet.memoryOverhead` by
`spark.comet.columnar.shuffle.memory.factor` (default value is `1.0`). If a shuffle exceeds this amount of memory
then the query will fail.
then the query will fail. For full details on configuring Comet memory in on-heap mode, see the [Advanced Memory Tuning]
section of this guide.

[shuffle]: #shuffle

[Advanced Memory Tuning]: #advanced-memory-tuning

### Determining How Much Memory to Allocate

Generally, increasing the amount of memory allocated to Comet will improve query performance by reducing the
Expand Down Expand Up @@ -102,36 +112,43 @@ Workarounds for this problem include:

## Advanced Memory Tuning

### Configuring spark.executor.memoryOverhead in On-Heap Mode

In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
that it is possible to allocate off-heap memory when running in on-heap mode.

Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
resource managers respect Apache Spark memory configuration before starting the containers.

### Configuring Off-Heap Memory Pools

Comet implements multiple memory pool implementations. The type of pool can be specified with `spark.comet.exec.memoryPool`.

The valid pool types for off-heap mode are:

- `fair_unified` (default when `spark.memory.offHeap.enabled=true` is set)
- `fair_unified_global` (default when `spark.memory.offHeap.enabled=true` is set)
- `fair_unified`
- `greedy_unified`

Both of these pools share off-heap memory between Spark and Comet. This approach is referred to as
All of these pools share off-heap memory between Spark and Comet. This approach is referred to as
unified memory management. The size of the pool is specified by `spark.memory.offHeap.size`.

The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.
Comet's memory accounting isn't 100% accurate and this can result in Comet using more memory than it reserves,
leading to out-of-memory exceptions. To work around this issue, it is possible to
set `spark.comet.exec.memoryPool.fraction` to a value less than `1.0` to restrict the amount of memory that can be
reserved by Comet.

The `fair_unified` pool type prevents operators from using more than an even fraction of the available memory
The `fair_unified` pool types prevents operators from using more than an even fraction of the available memory
(i.e. `pool_size / num_reservations`). This pool works best when you know beforehand
the query has multiple operators that will likely all need to spill. Sometimes it will cause spills even
when there is sufficient memory in order to leave enough memory for other operators.

`fair_unified_global` allows any task to use the full off-heap memory pool.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is fair_unified_global used? I don’t seem to find it in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have updated this.


`fair_unified` restricts each task to using a fraction of the off-heap memory pool based on number of cores
and cores per task.

The `greedy_unified` pool type implements a greedy first-come first-serve limit. This pool works well for queries that do not
need to spill or have a single spillable operator.

### Configuring On-Heap Memory Pools

```{warning}
Support for on-heap memory pools is deprecated and will be removed from a future release.
```

When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.

The type of pool can be specified with `spark.comet.exec.memoryPool`. The default setting is `greedy_task_shared`.
Expand Down Expand Up @@ -172,6 +189,14 @@ adjusting how much memory to allocate.
[FairSpillPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.FairSpillPool.html
[UnboundedMemoryPool]: https://docs.rs/datafusion/latest/datafusion/execution/memory_pool/struct.UnboundedMemoryPool.html

### Configuring spark.executor.memoryOverhead in On-Heap Mode

In some environments, such as Kubernetes and YARN, it is important to correctly set `spark.executor.memoryOverhead` so
that it is possible to allocate off-heap memory when running in on-heap mode.

Comet will automatically set `spark.executor.memoryOverhead` based on the `spark.comet.memory*` settings so that
resource managers respect Apache Spark memory configuration before starting the containers.

## Optimizing Joins

Spark often chooses `SortMergeJoin` over `ShuffledHashJoin` for stability reasons. If the build-side of a
Expand Down
6 changes: 2 additions & 4 deletions native/core/src/execution/memory_pools/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ pub(crate) fn parse_memory_pool_config(
let pool_size = memory_limit as usize;
let memory_pool_config = if off_heap_mode {
match memory_pool_type.as_str() {
"default" | "fair_unified" => {
MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size)
}
"fair_unified" => MemoryPoolConfig::new(MemoryPoolType::FairUnified, pool_size),
"greedy_unified" => {
// the `unified` memory pool interacts with Spark's memory pool to allocate
// memory therefore does not need a size to be explicitly set. The pool size
Expand All @@ -84,7 +82,7 @@ pub(crate) fn parse_memory_pool_config(
"fair_spill_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::FairSpillTaskShared, pool_size_per_task)
}
"default" | "greedy_task_shared" => {
"greedy_task_shared" => {
MemoryPoolConfig::new(MemoryPoolType::GreedyTaskShared, pool_size_per_task)
}
"fair_spill_global" => {
Expand Down
10 changes: 8 additions & 2 deletions native/core/src/execution/memory_pools/fair_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ impl MemoryPool for CometFairMemoryPool {
if additional > 0 {
let mut state = self.state.lock();
let num = state.num;
let limit = self.pool_size.checked_div(num).unwrap();
let limit = self
.pool_size
.checked_div(num)
.expect("overflow in checked_div");
let size = reservation.size();
if limit < size + additional {
return resources_err!(
Expand All @@ -155,7 +158,10 @@ impl MemoryPool for CometFairMemoryPool {
state.used
);
}
state.used = state.used.checked_add(additional).unwrap();
state.used = state
.used
.checked_add(additional)
.expect("overflow in checked_add");
}
Ok(())
}
Expand Down
153 changes: 84 additions & 69 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,31 +86,13 @@ class CometExecIterator(
val conf = SparkEnv.get.conf
val localDiskDirs = SparkEnv.get.blockManager.getLocalDiskDirs

val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
val memoryLimit = if (offHeapMode) {
// in unified mode we share off-heap memory with Spark
ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
} else {
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
// and `memory_fraction` below.
CometSparkSessionExtensions.getCometMemoryOverhead(conf)
}

// serialize Comet related Spark configs in protobuf format
val builder = ConfigMap.newBuilder()
conf.getAll.filter(_._1.startsWith(CometConf.COMET_PREFIX)).foreach { case (k, v) =>
builder.putEntries(k, v)
}
val protobufSparkConfigs = builder.build().toByteArray

val memoryLimitPerTask = if (offHeapMode) {
// this per-task limit is not used in native code when using unified memory
// so we can skip calculating it and avoid logging irrelevant information
0
} else {
getMemoryLimitPerTask(conf)
}

// Create keyUnwrapper if encryption is enabled
val keyUnwrapper = if (encryptedFilePaths.nonEmpty) {
val unwrapper = new CometFileKeyUnwrapper()
Expand All @@ -124,6 +106,8 @@ class CometExecIterator(
null
}

val memoryConfig = CometExecIterator.getMemoryConfig(conf)

nativeLib.createPlan(
id,
cometBatchIterators,
Expand All @@ -135,10 +119,10 @@ class CometExecIterator(
cometTaskMemoryManager,
localDiskDirs,
batchSize = COMET_BATCH_SIZE.get(),
offHeapMode,
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
memoryLimit,
memoryLimitPerTask,
memoryConfig.offHeapMode,
memoryConfig.memoryPoolType,
memoryConfig.memoryLimit,
memoryConfig.memoryLimitPerTask,
taskAttemptId,
keyUnwrapper)
}
Expand All @@ -148,34 +132,6 @@ class CometExecIterator(
private var currentBatch: ColumnarBatch = null
private var closed: Boolean = false

private def getMemoryLimitPerTask(conf: SparkConf): Long = {
val numCores = numDriverOrExecutorCores(conf).toFloat
val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
logInfo(
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)")
limit
}

private def numDriverOrExecutorCores(conf: SparkConf): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}

val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
val master = conf.get("spark.master")
master match {
case "local" => 1
case LOCAL_N_REGEX(threads) => convertToInt(threads)
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => conf.get("spark.executor.cores", "1").toInt
}
}

private def getNextBatch: Option[ColumnarBatch] = {
assert(partitionIndex >= 0 && partitionIndex < numParts)

Expand Down Expand Up @@ -247,6 +203,8 @@ class CometExecIterator(

nextBatch = getNextBatch

logTrace(s"Task $taskAttemptId memory pool usage is ${cometTaskMemoryManager.getUsed} bytes")

if (nextBatch.isEmpty) {
close()
false
Expand Down Expand Up @@ -285,25 +243,11 @@ class CometExecIterator(
traceMemoryUsage()
}

// The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released,
// so it will report:
// Caused by: java.lang.IllegalStateException: Memory was leaked by query.
// Memory leaked: (516) Allocator(ROOT) 0/516/808/9223372036854775807 (res/actual/peak/limit)
// Suspect this seems a false positive leak, because there is no reported memory leak at JVM
// when profiling. `allocator` reports a leak because it calculates the accumulated number
// of memory allocated for ArrowArray and ArrowSchema. But these exported ones will be
// released in native side later.
// More to clarify it. For ArrowArray and ArrowSchema, Arrow will put a release field into the
// memory region which is a callback function pointer (C function) that could be called to
// release these structs in native code too. Once we wrap their memory addresses at native
// side using FFI ArrowArray and ArrowSchema, and drop them later, the callback function will
// be called to release the memory.
// But at JVM, the allocator doesn't know about this fact so it still keeps the accumulated
// number.
// Tried to manually do `release` and `close` that can make the allocator happy, but it will
// cause JVM runtime failure.

// allocator.close()
Comment on lines -288 to -306
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment since it refers to an allocator that no longer exists in this code.

val memInUse = cometTaskMemoryManager.getUsed
if (memInUse != 0) {
logWarning(s"CometExecIterator closed with non-zero memory usage : $memInUse")
}

closed = true
}
}
Expand All @@ -318,3 +262,74 @@ class CometExecIterator(
nativeLib.logMemoryUsage(s"task_memory_spark_$threadId", sparkTaskMemory)
}
}

object CometExecIterator extends Logging {

def getMemoryConfig(conf: SparkConf): MemoryConfig = {
val numCores = numDriverOrExecutorCores(conf).toFloat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can number of cores be fractional? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an intConf. I updated this.

val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an intConf. I updated this.

// there are different paths for on-heap vs off-heap mode
val offHeapMode = CometSparkSessionExtensions.isOffHeapEnabled(conf)
if (offHeapMode) {
// in off-heap mode, Comet uses unified memory management to share off-heap memory with Spark
val offHeapSize = ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get()
val memoryLimit = (offHeapSize * memoryFraction).toLong
val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask / numCores).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to use toDouble instead of toFloat here. I'm not super worried about rounding errors or overflow here, but better safe than sorry and we won't see a performance difference.

val memoryPoolType = getMemoryPoolType(defaultValue = "fair_unified")
logInfo(
s"memoryPoolType=$memoryPoolType, " +
s"offHeapSize=${toMB(offHeapSize)}, " +
s"memoryFraction=$memoryFraction, " +
s"memoryLimit=${toMB(memoryLimit)}, " +
s"memoryLimitPerTask=${toMB(memoryLimitPerTask)}")
MemoryConfig(offHeapMode, memoryPoolType = memoryPoolType, memoryLimit, memoryLimitPerTask)
} else {
// we'll use the built-in memory pool from DF, and initializes with `memory_limit`
// and `memory_fraction` below.
val memoryLimit = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask / numCores).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about toDouble.

val memoryPoolType = getMemoryPoolType(defaultValue = "greedy_task_shared")
logInfo(
s"memoryPoolType=$memoryPoolType, " +
s"memoryLimit=${toMB(memoryLimit)}, " +
s"memoryLimitPerTask=${toMB(memoryLimitPerTask)}")
MemoryConfig(offHeapMode, memoryPoolType = memoryPoolType, memoryLimit, memoryLimitPerTask)
}
}

private def getMemoryPoolType(defaultValue: String): String = {
COMET_EXEC_MEMORY_POOL_TYPE.get() match {
case "default" => defaultValue
case other => other
}
}

private def numDriverOrExecutorCores(conf: SparkConf): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}

val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to comment what expression is looking for like local[*] pseudocode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments

val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
val master = conf.get("spark.master")
master match {
case "local" => 1
case LOCAL_N_REGEX(threads) => convertToInt(threads)
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => conf.get("spark.executor.cores", "1").toInt
}
}

private def toMB(n: Long): String = {
s"${(n.toDouble / 1024.0 / 1024.0).toLong} MB"
}
}

case class MemoryConfig(
offHeapMode: Boolean,
memoryPoolType: String,
memoryLimit: Long,
memoryLimitPerTask: Long)
Loading