diff --git a/native/core/src/execution/memory_pools/mod.rs b/native/core/src/execution/memory_pools/mod.rs index c1fa5bbca7..3e40dc6923 100644 --- a/native/core/src/execution/memory_pools/mod.rs +++ b/native/core/src/execution/memory_pools/mod.rs @@ -42,7 +42,8 @@ pub(crate) fn create_memory_pool( match memory_pool_config.pool_type { MemoryPoolType::Unified => { // Set Comet memory pool for native - let memory_pool = CometUnifiedMemoryPool::new(comet_task_memory_manager); + let memory_pool = + CometUnifiedMemoryPool::new(comet_task_memory_manager, task_attempt_id); Arc::new(TrackConsumersPool::new( memory_pool, NonZeroUsize::new(NUM_TRACKED_CONSUMERS).unwrap(), diff --git a/native/core/src/execution/memory_pools/unified_pool.rs b/native/core/src/execution/memory_pools/unified_pool.rs index bbde722582..88b2731072 100644 --- a/native/core/src/execution/memory_pools/unified_pool.rs +++ b/native/core/src/execution/memory_pools/unified_pool.rs @@ -40,6 +40,7 @@ use log::warn; pub struct CometUnifiedMemoryPool { task_memory_manager_handle: Arc, used: AtomicUsize, + task_attempt_id: i64, } impl Debug for CometUnifiedMemoryPool { @@ -51,9 +52,13 @@ impl Debug for CometUnifiedMemoryPool { } impl CometUnifiedMemoryPool { - pub fn new(task_memory_manager_handle: Arc) -> CometUnifiedMemoryPool { + pub fn new( + task_memory_manager_handle: Arc, + task_attempt_id: i64, + ) -> CometUnifiedMemoryPool { Self { task_memory_manager_handle, + task_attempt_id, used: AtomicUsize::new(0), } } @@ -82,7 +87,10 @@ impl Drop for CometUnifiedMemoryPool { fn drop(&mut self) { let used = self.used.load(Relaxed); if used != 0 { - warn!("CometUnifiedMemoryPool dropped with {used} bytes still reserved"); + warn!( + "Task {} dropped CometUnifiedMemoryPool with {used} bytes still reserved", + self.task_attempt_id + ); } } } @@ -96,13 +104,20 @@ impl MemoryPool for CometUnifiedMemoryPool { } fn shrink(&self, _: &MemoryReservation, size: usize) { - self.release_to_spark(size) - .unwrap_or_else(|_| panic!("Failed to release {size} bytes")); + if let Err(e) = self.release_to_spark(size) { + panic!( + "Task {} failed to return {size} bytes to Spark: {e:?}", + self.task_attempt_id + ); + } if let Err(prev) = self .used .fetch_update(Relaxed, Relaxed, |old| old.checked_sub(size)) { - panic!("overflow when releasing {size} of {prev} bytes"); + panic!( + "Task {} overflow when releasing {size} of {prev} bytes", + self.task_attempt_id + ); } } @@ -116,7 +131,8 @@ impl MemoryPool for CometUnifiedMemoryPool { self.release_to_spark(acquired as usize)?; return Err(resources_datafusion_err!( - "Failed to acquire {} bytes, only got {}. Reserved: {}", + "Task {} failed to acquire {} bytes, only got {}. Reserved: {}", + self.task_attempt_id, additional, acquired, self.reserved() @@ -127,7 +143,8 @@ impl MemoryPool for CometUnifiedMemoryPool { .fetch_update(Relaxed, Relaxed, |old| old.checked_add(acquired as usize)) { return Err(resources_datafusion_err!( - "Failed to acquire {} bytes due to overflow. Reserved: {}", + "Task {} failed to acquire {} bytes due to overflow. Reserved: {}", + self.task_attempt_id, additional, prev )); diff --git a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java index 23c753dd5e..3f344da68f 100644 --- a/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java +++ b/spark/src/main/java/org/apache/spark/CometTaskMemoryManager.java @@ -40,12 +40,15 @@ public class CometTaskMemoryManager { /** The id uniquely identifies the native plan this memory manager is associated to */ private final long id; + private final long taskAttemptId; + public final TaskMemoryManager internal; private final NativeMemoryConsumer nativeMemoryConsumer; private final AtomicLong used = new AtomicLong(); - public CometTaskMemoryManager(long id) { + public CometTaskMemoryManager(long id, long taskAttemptId) { this.id = id; + this.taskAttemptId = taskAttemptId; this.internal = TaskContext$.MODULE$.get().taskMemoryManager(); this.nativeMemoryConsumer = new NativeMemoryConsumer(); } @@ -53,9 +56,20 @@ public CometTaskMemoryManager(long id) { // Called by Comet native through JNI. // Returns the actual amount of memory (in bytes) granted. public long acquireMemory(long size) { + if (logger.isTraceEnabled()) { + logger.trace("Task {} requested {} bytes", taskAttemptId, size); + } long acquired = internal.acquireExecutionMemory(size, nativeMemoryConsumer); - used.addAndGet(acquired); + long newUsed = used.addAndGet(acquired); if (acquired < size) { + logger.warn( + "Task {} requested {} bytes but only received {} bytes. Current allocation is {} and " + + "the total memory consumption is {} bytes.", + taskAttemptId, + size, + acquired, + newUsed, + internal.getMemoryConsumptionForThisTask()); // If memory manager is not able to acquire the requested size, log memory usage internal.showMemoryUsage(); } @@ -64,10 +78,16 @@ public long acquireMemory(long size) { // Called by Comet native through JNI public void releaseMemory(long size) { + if (logger.isTraceEnabled()) { + logger.trace("Task {} released {} bytes", taskAttemptId, size); + } long newUsed = used.addAndGet(-size); if (newUsed < 0) { logger.error( - "Used memory is negative: " + newUsed + " after releasing memory chunk of: " + size); + "Task {} used memory is negative ({}) after releasing {} bytes", + taskAttemptId, + newUsed, + size); } internal.releaseExecutionMemory(size, nativeMemoryConsumer); } diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 0bd41d05ec..f62dd13e81 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -68,7 +68,8 @@ class CometExecIterator( private val memoryMXBean = ManagementFactory.getMemoryMXBean private val nativeLib = new Native() private val nativeUtil = new NativeUtil() - private val cometTaskMemoryManager = new CometTaskMemoryManager(id) + private val taskAttemptId = TaskContext.get().taskAttemptId + private val cometTaskMemoryManager = new CometTaskMemoryManager(id, taskAttemptId) private val cometBatchIterators = inputs.map { iterator => new CometBatchIterator(iterator, nativeUtil) }.toArray @@ -116,7 +117,7 @@ class CometExecIterator( memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE.get(), memoryLimit, memoryLimitPerTask, - taskAttemptId = TaskContext.get().taskAttemptId, + taskAttemptId, debug = COMET_DEBUG_ENABLED.get(), explain = COMET_EXPLAIN_NATIVE_ENABLED.get(), tracingEnabled) @@ -175,6 +176,11 @@ class CometExecIterator( }) } catch { case e: CometNativeException => + // it is generally considered bad practice to log and then rethrow an + // exception, but it really helps debugging to be able to see which task + // threw the exception, so we log the exception with taskAttemptId here + logError(s"Native execution for task $taskAttemptId failed", e) + val fileNotFoundPattern: Regex = ("""^External: Object at location (.+?) not found: No such file or directory """ + """\(os error \d+\)$""").r