@@ -86,31 +86,13 @@ class CometExecIterator(
8686 val conf = SparkEnv .get.conf
8787 val localDiskDirs = SparkEnv .get.blockManager.getLocalDiskDirs
8888
89- val offHeapMode = CometSparkSessionExtensions .isOffHeapEnabled(conf)
90- val memoryLimit = if (offHeapMode) {
91- // in unified mode we share off-heap memory with Spark
92- ByteUnit .MiB .toBytes(conf.getSizeAsMb(" spark.memory.offHeap.size" ))
93- } else {
94- // we'll use the built-in memory pool from DF, and initializes with `memory_limit`
95- // and `memory_fraction` below.
96- CometSparkSessionExtensions .getCometMemoryOverhead(conf)
97- }
98-
9989 // serialize Spark conf in protobuf format
10090 val builder = ConfigMap .newBuilder()
10191 conf.getAll.foreach { case (k, v) =>
10292 builder.putEntries(k, v)
10393 }
10494 val protobufSparkConfigs = builder.build().toByteArray
10595
106- val memoryLimitPerTask = if (offHeapMode) {
107- // this per-task limit is not used in native code when using unified memory
108- // so we can skip calculating it and avoid logging irrelevant information
109- 0
110- } else {
111- getMemoryLimitPerTask(conf)
112- }
113-
11496 // Create keyUnwrapper if encryption is enabled
11597 val keyUnwrapper = if (encryptedFilePaths.nonEmpty) {
11698 val unwrapper = new CometFileKeyUnwrapper ()
@@ -124,6 +106,8 @@ class CometExecIterator(
124106 null
125107 }
126108
109+ val memoryConfig = CometExecIterator .getMemoryConfig(conf)
110+
127111 nativeLib.createPlan(
128112 id,
129113 cometBatchIterators,
@@ -135,10 +119,10 @@ class CometExecIterator(
135119 cometTaskMemoryManager,
136120 localDiskDirs,
137121 batchSize = COMET_BATCH_SIZE .get(),
138- offHeapMode,
139- memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE .get() ,
140- memoryLimit,
141- memoryLimitPerTask,
122+ memoryConfig. offHeapMode,
123+ memoryConfig.memoryPoolType ,
124+ memoryConfig. memoryLimit,
125+ memoryConfig. memoryLimitPerTask,
142126 taskAttemptId,
143127 debug = COMET_DEBUG_ENABLED .get(),
144128 explain = COMET_EXPLAIN_NATIVE_ENABLED .get(),
@@ -152,34 +136,6 @@ class CometExecIterator(
152136 private var currentBatch : ColumnarBatch = null
153137 private var closed : Boolean = false
154138
155- private def getMemoryLimitPerTask (conf : SparkConf ): Long = {
156- val numCores = numDriverOrExecutorCores(conf).toFloat
157- val maxMemory = CometSparkSessionExtensions .getCometMemoryOverhead(conf)
158- val coresPerTask = conf.get(" spark.task.cpus" , " 1" ).toFloat
159- // example 16GB maxMemory * 16 cores with 4 cores per task results
160- // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
161- val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
162- logInfo(
163- s " Calculated per-task memory limit of $limit ( $maxMemory * $coresPerTask / $numCores) " )
164- limit
165- }
166-
167- private def numDriverOrExecutorCores (conf : SparkConf ): Int = {
168- def convertToInt (threads : String ): Int = {
169- if (threads == " *" ) Runtime .getRuntime.availableProcessors() else threads.toInt
170- }
171-
172- val LOCAL_N_REGEX = """ local\[([0-9]+|\*)\]""" .r
173- val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""" .r
174- val master = conf.get(" spark.master" )
175- master match {
176- case " local" => 1
177- case LOCAL_N_REGEX (threads) => convertToInt(threads)
178- case LOCAL_N_FAILURES_REGEX (threads, _) => convertToInt(threads)
179- case _ => conf.get(" spark.executor.cores" , " 1" ).toInt
180- }
181- }
182-
183139 private def getNextBatch : Option [ColumnarBatch ] = {
184140 assert(partitionIndex >= 0 && partitionIndex < numParts)
185141
@@ -289,25 +245,11 @@ class CometExecIterator(
289245 traceMemoryUsage()
290246 }
291247
292- // The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released,
293- // so it will report:
294- // Caused by: java.lang.IllegalStateException: Memory was leaked by query.
295- // Memory leaked: (516) Allocator(ROOT) 0/516/808/9223372036854775807 (res/actual/peak/limit)
296- // Suspect this seems a false positive leak, because there is no reported memory leak at JVM
297- // when profiling. `allocator` reports a leak because it calculates the accumulated number
298- // of memory allocated for ArrowArray and ArrowSchema. But these exported ones will be
299- // released in native side later.
300- // More to clarify it. For ArrowArray and ArrowSchema, Arrow will put a release field into the
301- // memory region which is a callback function pointer (C function) that could be called to
302- // release these structs in native code too. Once we wrap their memory addresses at native
303- // side using FFI ArrowArray and ArrowSchema, and drop them later, the callback function will
304- // be called to release the memory.
305- // But at JVM, the allocator doesn't know about this fact so it still keeps the accumulated
306- // number.
307- // Tried to manually do `release` and `close` that can make the allocator happy, but it will
308- // cause JVM runtime failure.
309-
310- // allocator.close()
248+ val memInUse = cometTaskMemoryManager.getUsed
249+ if (memInUse != 0 ) {
250+ logWarning(s " CometExecIterator closed with non-zero memory usage : $memInUse" )
251+ }
252+
311253 closed = true
312254 }
313255 }
@@ -320,5 +262,67 @@ class CometExecIterator(
320262 val threadId = Thread .currentThread().getId
321263 nativeLib.logMemoryUsage(s " task_memory_comet_ $threadId" , cometTaskMemory)
322264 nativeLib.logMemoryUsage(s " task_memory_spark_ $threadId" , sparkTaskMemory)
265+
266+ logInfo(
267+ s " Task $taskAttemptId is using $totalTaskMemory bytes of memory " +
268+ s " (Comet: $cometTaskMemory, Spark: $sparkTaskMemory) " )
323269 }
324270}
271+
272+ object CometExecIterator extends Logging {
273+
274+ def getMemoryConfig (conf : SparkConf ): MemoryConfig = {
275+ val numCores = numDriverOrExecutorCores(conf).toFloat
276+ val coresPerTask = conf.get(" spark.task.cpus" , " 1" ).toFloat
277+ // there are different paths for on-heap vs off-heap mode
278+ val offHeapMode = CometSparkSessionExtensions .isOffHeapEnabled(conf)
279+ if (offHeapMode) {
280+ // in off-heap mode, Comet uses unified memory management to share off-heap memory with Spark
281+ val memoryLimit = ByteUnit .MiB .toBytes(conf.getSizeAsMb(" spark.memory.offHeap.size" ))
282+ val maxMemory = memoryLimit * CometConf .COMET_EXEC_MEMORY_POOL_FRACTION .get()
283+ val memoryLimitPerTask = (maxMemory.toFloat * coresPerTask / numCores).toLong
284+ MemoryConfig (
285+ offHeapMode,
286+ memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE .get(),
287+ memoryLimit,
288+ memoryLimitPerTask)
289+ } else {
290+ // we'll use the built-in memory pool from DF, and initializes with `memory_limit`
291+ // and `memory_fraction` below.
292+ val memoryLimit = CometSparkSessionExtensions .getCometMemoryOverhead(conf)
293+ val maxMemory = CometSparkSessionExtensions .getCometMemoryOverhead(conf)
294+ // example 16GB maxMemory * 16 cores with 4 cores per task results
295+ // in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
296+ val memoryLimitPerTask = (maxMemory.toFloat * coresPerTask / numCores).toLong
297+ logInfo(
298+ s " Calculated per-task memory limit of $memoryLimitPerTask " +
299+ s " ( $maxMemory * $coresPerTask / $numCores) " )
300+ MemoryConfig (
301+ offHeapMode,
302+ memoryPoolType = COMET_EXEC_MEMORY_POOL_TYPE .get(),
303+ memoryLimit,
304+ memoryLimitPerTask)
305+ }
306+ }
307+
308+ private def numDriverOrExecutorCores (conf : SparkConf ): Int = {
309+ def convertToInt (threads : String ): Int = {
310+ if (threads == " *" ) Runtime .getRuntime.availableProcessors() else threads.toInt
311+ }
312+ val LOCAL_N_REGEX = """ local\[([0-9]+|\*)\]""" .r
313+ val LOCAL_N_FAILURES_REGEX = """ local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""" .r
314+ val master = conf.get(" spark.master" )
315+ master match {
316+ case " local" => 1
317+ case LOCAL_N_REGEX (threads) => convertToInt(threads)
318+ case LOCAL_N_FAILURES_REGEX (threads, _) => convertToInt(threads)
319+ case _ => conf.get(" spark.executor.cores" , " 1" ).toInt
320+ }
321+ }
322+ }
323+
324+ case class MemoryConfig (
325+ offHeapMode : Boolean ,
326+ memoryPoolType : String ,
327+ memoryLimit : Long ,
328+ memoryLimitPerTask : Long )
0 commit comments