Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Sep 26, 2025

Which issue does this PR close?

Part of #2453

Rationale for this change

This additional information in log messages and exceptions has been helpful to me in tracking down memory issues. I think it could be helpful in the future.

What changes are included in this PR?

How are these changes tested?

@andygrove andygrove changed the title minor: improve logging when acquireMemory returns less memory than requested minor: improve logging when acquireMemory returns less memory than requested [WIP] Sep 26, 2025
@codecov-commenter
Copy link

codecov-commenter commented Sep 26, 2025

Codecov Report

❌ Patch coverage is 35.00000% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.33%. Comparing base (f09f8af) to head (439b5dd).
⚠️ Report is 563 commits behind head on main.

Files with missing lines Patch % Lines
.../java/org/apache/spark/CometTaskMemoryManager.java 18.75% 11 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #2467      +/-   ##
============================================
+ Coverage     56.12%   58.33%   +2.20%     
- Complexity      976     1438     +462     
============================================
  Files           119      146      +27     
  Lines         11743    13518    +1775     
  Branches       2251     2350      +99     
============================================
+ Hits           6591     7886    +1295     
- Misses         4012     4400     +388     
- Partials       1140     1232      +92     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove andygrove changed the title minor: improve logging when acquireMemory returns less memory than requested [WIP] minor: include taskAttemptId in log messages and exceptions Oct 1, 2025
@andygrove andygrove marked this pull request as ready for review October 1, 2025 16:27
@andygrove andygrove changed the title minor: include taskAttemptId in log messages and exceptions minor: include taskAttemptId in log messages Oct 1, 2025
Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @andygrove

if used != 0 {
warn!("CometUnifiedMemoryPool dropped with {used} bytes still reserved");
warn!(
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
"Task ID {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",

.unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
if let Err(e) = self.release_to_spark(size) {
panic!(
"Task {} failed to return {size} bytes to Spark: {e:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} failed to return {size} bytes to Spark: {e:?}",
"Task ID {} failed to return {size} bytes to Spark: {e:?}",

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following Spark's convention for logging so that we can use grep to search for Task 1234 and view the combined logging from both Spark and Comet.

~/git/apache/apache-spark-3.5.6$ find . -name "*.scala" -exec grep taskAttemptId {} \; | grep log
              logDebug(s"Starting pushing blocks for the task ${context.taskAttemptId()}")
      logWarning(s"Task ${taskAttemptId.get} already completed, not releasing lock for $blockId")
    logTrace(s"Task $taskAttemptId trying to acquire read lock for $blockId")
        logTrace(s"Task $taskAttemptId acquired read lock for $blockId")
    logTrace(s"Task $taskAttemptId trying to acquire write lock for $blockId")
        logTrace(s"Task $taskAttemptId acquired write lock for $blockId")
    logTrace(s"Task $taskAttemptId downgrading write lock for $blockId")
    logTrace(s"Task $taskAttemptId releasing lock for $blockId")
    logTrace(s"Task $taskAttemptId trying to remove block $blockId")
        logInfo(s"Task ${TaskContext.get().taskAttemptId} force spilling in-memory map to disk " +
        logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
    logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " +
        logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " +
      logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
        logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) failed " +
``

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

{
panic!("overflow when releasing {size} of {prev} bytes");
panic!(
"Task {} overflow when releasing {size} of {prev} bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} overflow when releasing {size} of {prev} bytes",
"Task ID {} overflow when releasing {size} of {prev} bytes",


return Err(resources_datafusion_err!(
"Failed to acquire {} bytes, only got {}. Reserved: {}",
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
"Task ID {} failed to acquire {} bytes, only got {}. Reserved: {}",

{
return Err(resources_datafusion_err!(
"Failed to acquire {} bytes due to overflow. Reserved: {}",
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
"Task ID {} failed to acquire {} bytes due to overflow. Reserved: {}",

// Returns the actual amount of memory (in bytes) granted.
public long acquireMemory(long size) {
if (logger.isTraceEnabled()) {
logger.trace("Task {} requested {} bytes", taskAttemptId, size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("Task {} requested {} bytes", taskAttemptId, size);
logger.trace("Task ID {} requested {} bytes", taskAttemptId, size);

long newUsed = used.addAndGet(acquired);
if (acquired < size) {
logger.warn(
"Task {} requested {} bytes but only received {} bytes. Current allocation is {} and "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} requested {} bytes but only received {} bytes. Current allocation is {} and "
"Task ID {} requested {} bytes but only received {} bytes. Current allocation is {} and "

// Called by Comet native through JNI
public void releaseMemory(long size) {
if (logger.isTraceEnabled()) {
logger.trace("Task {} released {} bytes", taskAttemptId, size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("Task {} released {} bytes", taskAttemptId, size);
logger.trace("Task ID {} released {} bytes", taskAttemptId, size);

if (newUsed < 0) {
logger.error(
"Used memory is negative: " + newUsed + " after releasing memory chunk of: " + size);
"Task {} used memory is negative ({}) after releasing {} bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} used memory is negative ({}) after releasing {} bytes",
"Task ID {} used memory is negative ({}) after releasing {} bytes",

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@andygrove andygrove merged commit def45fa into apache:main Oct 1, 2025
102 checks passed
@andygrove andygrove deleted the improve-logging-acquire-mem-failed branch October 1, 2025 23:36
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants