Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Oct 8, 2025

Which issue does this PR close?

N/A

Rationale for this change

Now that Comet requires off-heap mode (except for when we are running tests), remove all of the confusing documentation about configuring Comet on-heap memory pools.

Also, add a new config for controlling what percentage of the off-heap memory pool can be used by Comet (required because Comet memory accounting is not accurate).

With this PR, there are now only two user-facing memory pool configs:

  • spark.comet.exec.memoryPool
  • spark.comet.exec.memoryPool.fraction

What changes are included in this PR?

  • There are now two separate memory pool configs - one public config for off-heap mode, and one internal config for on-heap mode for testing
  • All on-heap memory configs are now marked as internal, therefore no longer appear in the user guide
  • All on-heap related content has been removed from the user guide
  • Add new config option COMET_EXEC_MEMORY_POOL_FRACTION for limiting % of off-heap pool that Comet can use (required because Comet memory accounting is not accurate)
  • Improved logging of memory configuration in executors:
INFO CometExecIterator: memoryPoolType=fair_unified, offHeapSize=16384 MB, memoryFraction=0.9, memoryLimit=14745 MB, memoryLimitPerTask=1843 MB

How are these changes tested?

@codecov-commenter
Copy link

codecov-commenter commented Oct 8, 2025

Codecov Report

❌ Patch coverage is 72.72727% with 18 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.17%. Comparing base (f09f8af) to head (4816437).
⚠️ Report is 643 commits behind head on main.

Files with missing lines Patch % Lines
...ain/scala/org/apache/comet/CometExecIterator.scala 65.30% 12 Missing and 5 partials ⚠️
...park/src/main/scala/org/apache/spark/Plugins.scala 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2538      +/-   ##
============================================
+ Coverage     56.12%   59.17%   +3.04%     
- Complexity      976     1458     +482     
============================================
  Files           119      146      +27     
  Lines         11743    13685    +1942     
  Branches       2251     2363     +112     
============================================
+ Hits           6591     8098    +1507     
- Misses         4012     4360     +348     
- Partials       1140     1227      +87     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines -292 to -306
// The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released,
// so it will report:
// Caused by: java.lang.IllegalStateException: Memory was leaked by query.
// Memory leaked: (516) Allocator(ROOT) 0/516/808/9223372036854775807 (res/actual/peak/limit)
// Suspect this seems a false positive leak, because there is no reported memory leak at JVM
// when profiling. `allocator` reports a leak because it calculates the accumulated number
// of memory allocated for ArrowArray and ArrowSchema. But these exported ones will be
// released in native side later.
// More to clarify it. For ArrowArray and ArrowSchema, Arrow will put a release field into the
// memory region which is a callback function pointer (C function) that could be called to
// release these structs in native code too. Once we wrap their memory addresses at native
// side using FFI ArrowArray and ArrowSchema, and drop them later, the callback function will
// be called to release the memory.
// But at JVM, the allocator doesn't know about this fact so it still keeps the accumulated
// number.
// Tried to manually do `release` and `close` that can make the allocator happy, but it will
// cause JVM runtime failure.

// allocator.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed this comment since it refers to an allocator that no longer exists in this code.

@andygrove andygrove changed the title feat: Add new config to limit Comet memory pool usage per task + bug fixes [WIP] feat: Various improvements to off-heap memory pool configuration, logging, and documentation Oct 8, 2025
@andygrove andygrove marked this pull request as ready for review October 10, 2025 02:45
"Only applies to off-heap mode. " +
s"$TUNING_GUIDE.")
.doubleConf
.createWithDefault(1.0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is 1.0 so that this change is not a breaking change

object CometExecIterator extends Logging {

def getMemoryConfig(conf: SparkConf): MemoryConfig = {
val numCores = numDriverOrExecutorCores(conf).toFloat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can number of cores be fractional? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an intConf. I updated this.


def getMemoryConfig(conf: SparkConf): MemoryConfig = {
val numCores = numDriverOrExecutorCores(conf).toFloat
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is an intConf. I updated this.

if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}

val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to comment what expression is looking for like local[*] pseudocode?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments

@andygrove andygrove changed the title feat: Various improvements to off-heap memory pool configuration, logging, and documentation feat: Various improvements to memory pool configuration, logging, and documentation Oct 14, 2025
@andygrove
Copy link
Member Author

@parthchandra @comphead @mbutrovich This is now ready for review. There have been some changes in scope today, so please re-read the PR description.

@andygrove andygrove requested a review from wForget October 14, 2025 00:18
"The type of memory pool to be used for Comet native execution. " +
"When running Spark in on-heap mode, available pool types are 'greedy', 'fair_spill', " +
"The type of memory pool to be used for Comet native execution " +
"hen running Spark in on-heap mode. Available pool types are 'greedy', 'fair_spill', " +
Copy link
Contributor

@coderfender coderfender Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick : Guess hen is a typo for when ? (probably no caps in available too ?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks. 🐔

Copy link
Contributor

@coderfender coderfender left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor typo

.doc(
"The type of memory pool to be used for Comet native execution " +
"when running Spark in off-heap mode. Available pool types are 'greedy', 'fair_spill', " +
"'greedy_task_shared', 'fair_spill_task_shared', 'greedy_global', 'fair_spill_global', " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we not limiting the available memory pools?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, but as a separate PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops .. this was actually a copy-paste error - these are the on-heap pools. Updated.

@comphead
Copy link
Contributor

Thanks @andygrove I think the PR is good, its waiting for make format

```

When running in on-heap mode, Comet will use its own dedicated memory pools that are not shared with Spark.
`fair_unified_global` allows any task to use the full off-heap memory pool.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is fair_unified_global used? I don’t seem to find it in the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I have updated this.

val offHeapSize = ByteUnit.MiB.toBytes(conf.getSizeAsMb("spark.memory.offHeap.size"))
val memoryFraction = CometConf.COMET_EXEC_MEMORY_POOL_FRACTION.get()
val memoryLimit = (offHeapSize * memoryFraction).toLong
val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask / numCores).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to use toDouble instead of toFloat here. I'm not super worried about rounding errors or overflow here, but better safe than sorry and we won't see a performance difference.

val memoryLimit = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
val memoryLimitPerTask = (memoryLimit.toFloat * coresPerTask / numCores).toLong
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about toDouble.

Copy link
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a huge improvement in configuring Comet! Thanks @andygrove!

Comet Performance

- Comet requires at least 5 GB of RAM in off-heap mode and 6 GB RAM in on-heap mode, but performance at this level
- Comet requires at least 5 GB of RAM, but performance at this level
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it onheap or offheap?

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove

@mbutrovich mbutrovich merged commit 68d756a into apache:main Oct 14, 2025
144 of 145 checks passed
@andygrove andygrove deleted the refactor-mem-config branch October 14, 2025 19:23
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants