Skip to content

Commit f63ccce

Browse files
AngersZhuuuucloud-fan
authored andcommitted
[SPARK-54818][SQL] TaskMemoryManager allocate failed should log errorstack to help check memory usage
### What changes were proposed in this pull request? These days I am checking cluster's OOM failed APP, I found allocate failed didn't log out the error stack. A little not friendly for user check app. Such as user set 200M broadcast threshold, but allocate 4G memory ``` 25/12/21 07:08:13 WARN [broadcast-exchange-4] TaskMemoryManager: Failed to allocate a page (4294967296 bytes), try again. 25/12/21 07:08:58 WARN [broadcast-exchange-4] TaskMemoryManager: Failed to allocate a page (4294967296 bytes), try again. ``` ### Why are the changes needed? Help user debug ### Does this PR introduce _any_ user-facing change? User can know allocate what memory failed ### How was this patch tested? **Before** ``` 11:45:10.693 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (67108848 bytes), try again. ``` **After** ``` 11:45:10.693 WARN org.apache.spark.memory.TaskMemoryManager: Failed to allocate a page (67108848 bytes), try again. java.lang.OutOfMemoryError: Failed to allocate 67108848 at org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate(HeapMemoryAllocator.java:49) at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:398) at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:359) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118) at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.acquirePage(RowBasedKeyValueBatch.java:129) at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.<init>(RowBasedKeyValueBatch.java:108) at org.apache.spark.sql.catalyst.expressions.FixedLengthRowBasedKeyValueBatch.<init>(FixedLengthRowBasedKeyValueBatch.java:1 at org.apache.spark.scheduler.Task.run(Task.scala:147) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:842) java.lang.OutOfMemoryError: Java heap space at org.apache.spark.unsafe.memory.HeapMemoryAllocator.allocate(HeapMemoryAllocator.java:49) at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:398) at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:359) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:118) at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.acquirePage(RowBasedKeyValueBatch.java:129) at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.<init>(RowBasedKeyValueBatch.java:108) at org.apache.spark.sql.catalyst.expressions.FixedLengthRowBasedKeyValueBatch.<init>(FixedLengthRowBasedKeyValueBatch.java:169) at org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch.allocate(RowBasedKeyValueBatch.java:91) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$hashAgg_FastHashMap_0.<init>(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50) at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:153) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:180) at org.apache.spark.scheduler.Task.run(Task.scala:147) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:716) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:86) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:83) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:97) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:719) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:842) ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #53578 from AngersZhuuuu/SPARK-54818. Authored-by: Angerszhuuuu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 24207e2 commit f63ccce

File tree

1 file changed

+18
-4
lines changed

1 file changed

+18
-4
lines changed

core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,10 @@ public long pageSizeBytes() {
355355
return memoryManager.pageSizeBytes();
356356
}
357357

358+
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
359+
return allocatePage(size, consumer, 0);
360+
}
361+
358362
/**
359363
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
360364
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
@@ -364,7 +368,10 @@ public long pageSizeBytes() {
364368
*
365369
* @throws TooLargePageException
366370
*/
367-
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
371+
private MemoryBlock allocatePage(
372+
long size,
373+
MemoryConsumer consumer,
374+
int retryCount) {
368375
assert(consumer != null);
369376
assert(consumer.getMode() == tungstenMemoryMode);
370377
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
@@ -390,16 +397,23 @@ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
390397
try {
391398
page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
392399
} catch (OutOfMemoryError e) {
393-
logger.warn("Failed to allocate a page ({} bytes), try again.",
394-
MDC.of(LogKeys.PAGE_SIZE, acquired));
400+
if (retryCount == 0) {
401+
logger.warn("Failed to allocate a page ({} bytes) for {} times, try again.", e,
402+
MDC.of(LogKeys.PAGE_SIZE, acquired),
403+
MDC.of(LogKeys.NUM_RETRY, retryCount));
404+
} else {
405+
logger.warn("Failed to allocate a page ({} bytes) for {} times, try again.",
406+
MDC.of(LogKeys.PAGE_SIZE, acquired),
407+
MDC.of(LogKeys.NUM_RETRY, retryCount));
408+
}
395409
// there is no enough memory actually, it means the actual free memory is smaller than
396410
// MemoryManager thought, we should keep the acquired memory.
397411
synchronized (this) {
398412
acquiredButNotUsed += acquired;
399413
allocatedPages.clear(pageNumber);
400414
}
401415
// this could trigger spilling to free some pages.
402-
return allocatePage(size, consumer);
416+
return allocatePage(size, consumer, retryCount + 1);
403417
}
404418
page.pageNumber = pageNumber;
405419
pageTable[pageNumber] = page;

0 commit comments

Comments
 (0)