Skip to content
Merged
3 changes: 2 additions & 1 deletion native/core/src/execution/memory_pools/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool(
match memory_pool_config.pool_type {
MemoryPoolType::Unified => {
// Set Comet memory pool for native
let memory_pool = CometUnifiedMemoryPool::new(comet_task_memory_manager);
let memory_pool =
CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id);
Arc::new(TrackConsumersPool::new(
memory_pool,
NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(),
Expand Down
31 changes: 24 additions & 7 deletions native/core/src/execution/memory_pools/unified_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use log::warn;
pub struct CometUnifiedMemoryPool {
task_memory_manager_handle: Arc<GlobalRef>,
used: AtomicUsize,
task_attempt_id: i64,
}

impl Debug for CometUnifiedMemoryPool {
Expand All @@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool {
}

impl CometUnifiedMemoryPool {
pub fn new(task_memory_manager_handle: Arc<GlobalRef>) -> CometUnifiedMemoryPool {
pub fn new(
task_memory_manager_handle: Arc<GlobalRef>,
task_attempt_id: i64,
) -> CometUnifiedMemoryPool {
Self {
task_memory_manager_handle,
task_attempt_id,
used: AtomicUsize::new(0),
}
}
Expand Down Expand Up @@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool {
fn drop(&mut self) {
let used = self.used.load(Relaxed);
if used != 0 {
warn!("CometUnifiedMemoryPool dropped with {used} bytes still reserved");
warn!(
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",
"Task ID {} dropped CometUnifiedMemoryPool with {used} bytes still reserved",

self.task_attempt_id
);
}
}
}
Expand All @@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool {
}

fn shrink(&self, _: &MemoryReservation, size: usize) {
self.release_to_spark(size)
.unwrap_or_else(|_| panic!("Failed to release {size} bytes"));
if let Err(e) = self.release_to_spark(size) {
panic!(
"Task {} failed to return {size} bytes to Spark: {e:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} failed to return {size} bytes to Spark: {e:?}",
"Task ID {} failed to return {size} bytes to Spark: {e:?}",

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following Spark's convention for logging so that we can use grep to search for Task 1234 and view the combined logging from both Spark and Comet.

~/git/apache/apache-spark-3.5.6$ find . -name "*.scala" -exec grep taskAttemptId {} \; | grep log
              logDebug(s"Starting pushing blocks for the task ${context.taskAttemptId()}")
      logWarning(s"Task ${taskAttemptId.get} already completed, not releasing lock for $blockId")
    logTrace(s"Task $taskAttemptId trying to acquire read lock for $blockId")
        logTrace(s"Task $taskAttemptId acquired read lock for $blockId")
    logTrace(s"Task $taskAttemptId trying to acquire write lock for $blockId")
        logTrace(s"Task $taskAttemptId acquired write lock for $blockId")
    logTrace(s"Task $taskAttemptId downgrading write lock for $blockId")
    logTrace(s"Task $taskAttemptId releasing lock for $blockId")
    logTrace(s"Task $taskAttemptId trying to remove block $blockId")
        logInfo(s"Task ${TaskContext.get().taskAttemptId} force spilling in-memory map to disk " +
        logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map to disk and " +
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
    logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " +
        logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " +
      logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) finished " +
        logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) failed " +
``

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

self.task_attempt_id
);
}
if let Err(prev) = self
.used
.fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size))
{
panic!("overflow when releasing {size} of {prev} bytes");
panic!(
"Task {} overflow when releasing {size} of {prev} bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} overflow when releasing {size} of {prev} bytes",
"Task ID {} overflow when releasing {size} of {prev} bytes",

self.task_attempt_id
);
}
}

Expand All @@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
self.release_to_spark(acquired as usize)?;

return Err(resources_datafusion_err!(
"Failed to acquire {} bytes, only got {}. Reserved: {}",
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} failed to acquire {} bytes, only got {}. Reserved: {}",
"Task ID {} failed to acquire {} bytes, only got {}. Reserved: {}",

self.task_attempt_id,
additional,
acquired,
self.reserved()
Expand All @@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool {
.fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired as usize))
{
return Err(resources_datafusion_err!(
"Failed to acquire {} bytes due to overflow. Reserved: {}",
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} failed to acquire {} bytes due to overflow. Reserved: {}",
"Task ID {} failed to acquire {} bytes due to overflow. Reserved: {}",

self.task_attempt_id,
additional,
prev
));
Expand Down
26 changes: 23 additions & 3 deletions spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,36 @@ public class CometTaskMemoryManager {
/** The id uniquely identifies the native plan this memory manager is associated to */
private final long id;

private final long taskAttemptId;

public final TaskMemoryManager internal;
private final NativeMemoryConsumer nativeMemoryConsumer;
private final AtomicLong used = new AtomicLong();

public CometTaskMemoryManager(long id) {
public CometTaskMemoryManager(long id, long taskAttemptId) {
this.id = id;
this.taskAttemptId = taskAttemptId;
this.internal = TaskContext$.MODULE$.get().taskMemoryManager();
this.nativeMemoryConsumer = new NativeMemoryConsumer();
}

// Called by Comet native through JNI.
// Returns the actual amount of memory (in bytes) granted.
public long acquireMemory(long size) {
if (logger.isTraceEnabled()) {
logger.trace("Task {} requested {} bytes", taskAttemptId, size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("Task {} requested {} bytes", taskAttemptId, size);
logger.trace("Task ID {} requested {} bytes", taskAttemptId, size);

}
long acquired = internal.acquireExecutionMemory(size, nativeMemoryConsumer);
used.addAndGet(acquired);
long newUsed = used.addAndGet(acquired);
if (acquired < size) {
logger.warn(
"Task {} requested {} bytes but only received {} bytes. Current allocation is {} and "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} requested {} bytes but only received {} bytes. Current allocation is {} and "
"Task ID {} requested {} bytes but only received {} bytes. Current allocation is {} and "

+ "the total memory consumption is {} bytes.",
taskAttemptId,
size,
acquired,
newUsed,
internal.getMemoryConsumptionForThisTask());
// If memory manager is not able to acquire the requested size, log memory usage
internal.showMemoryUsage();
}
Expand All @@ -64,10 +78,16 @@ public long acquireMemory(long size) {

// Called by Comet native through JNI
public void releaseMemory(long size) {
if (logger.isTraceEnabled()) {
logger.trace("Task {} released {} bytes", taskAttemptId, size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.trace("Task {} released {} bytes", taskAttemptId, size);
logger.trace("Task ID {} released {} bytes", taskAttemptId, size);

}
long newUsed = used.addAndGet(-size);
if (newUsed < 0) {
logger.error(
"Used memory is negative: " + newUsed + " after releasing memory chunk of: " + size);
"Task {} used memory is negative ({}) after releasing {} bytes",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Task {} used memory is negative ({}) after releasing {} bytes",
"Task ID {} used memory is negative ({}) after releasing {} bytes",

taskAttemptId,
newUsed,
size);
}
internal.releaseExecutionMemory(size, nativeMemoryConsumer);
}
Expand Down
10 changes: 8 additions & 2 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ class CometExecIterator(
private val memoryMXBean = ManagementFactory.getMemoryMXBean
private val nativeLib = new Native()
private val nativeUtil = new NativeUtil()
private val cometTaskMemoryManager = new CometTaskMemoryManager(id)
private val taskAttemptId = TaskContext.get().taskAttemptId
private val cometTaskMemoryManager = new CometTaskMemoryManager(id, taskAttemptId)
private val cometBatchIterators = inputs.map { iterator =>
new CometBatchIterator(iterator, nativeUtil)
}.toArray
Expand Down Expand Up @@ -116,7 +117,7 @@ class CometExecIterator(
memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(),
memoryLimit,
memoryLimitPerTask,
taskAttemptId = TaskContext.get().taskAttemptId,
taskAttemptId,
debug = COMET_DEBUG_ENABLED.get(),
explain = COMET_EXPLAIN_NATIVE_ENABLED.get(),
tracingEnabled)
Expand Down Expand Up @@ -175,6 +176,11 @@ class CometExecIterator(
})
} catch {
case e: CometNativeException =>
// it is generally considered bad practice to log and then rethrow an
// exception, but it really helps debugging to be able to see which task
// threw the exception, so we log the exception with taskAttemptId here
logError(s"Native execution for task $taskAttemptId failed", e)

val fileNotFoundPattern: Regex =
("""^External: Object at location (.+?) not found: No such file or directory """ +
"""\(os error \d+\)$""").r
Expand Down
Loading