Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 152 additions & 97 deletions spark/src/main/scala/org/apache/comet/CometExecIterator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,59 @@ import org.apache.comet.Tracing.withTrace
import org.apache.comet.vector.NativeUtil

/**
* An iterator class used to execute Comet native query. It takes an input iterator which comes
* from Comet Scan and is expected to produce batches of Arrow Arrays. During consuming this
* iterator, it will consume input iterator and pass Arrow Arrays to Comet native engine by
* addresses. Even after the end of input iterator, this iterator still possibly continues
* executing native query as there might be blocking operators such as Sort, Aggregate. The API
* `hasNext` can be used to check if it is the end of this iterator (i.e. the native query is
* done).
* Comet's primary execution iterator that bridges JVM (Spark) and native
* (Rust) execution environments. This iterator orchestrates native query execution on Arrow
* columnar batches while managing sophisticated memory ownership semantics across the JNI boundary.
*
* @param inputs
* The input iterators producing sequence of batches of Arrow Arrays.
* @param protobufQueryPlan
* The serialized bytes of Spark execution plan.
* @param numParts
* The number of partitions.
* @param partitionIndex
* The index of the partition.
* '''Architecture Overview:'''
* 1. Consumes input ColumnarBatch iterators from Spark operators
* 2. Transfers Arrow array ownership to native DataFusion execution engine via JNI (* see note below)
* 3. Executes queries natively using DataFusion's columnar processing
* 4. Returns results as ColumnarBatch with ownership transferred back to JVM
*
* * This isn't quite true. Comet does not currently implement best practice when passing
* batches from JVM to native. JVM retains ownership of arrays and native code must
* make defensive copies as needed.
*
* '''Memory Management:'''
* This class implements a sophisticated ownership transfer pattern to prevent memory leaks
* and ensure thread safety:
* - '''Single Ownership''': Only one owner of Arrow array data at any time
* - '''Transfer Semantics''': Ownership flows JVM → Native → JVM during processing
* - '''Automatic Cleanup''': Previous batches are automatically released on next() calls
* - '''Exception Safety''': Resources are cleaned up even during exceptions
*
* '''Thread Safety:'''
* This class is '''NOT thread-safe''' by design. Each iterator instance should only be
* accessed from a single thread. Concurrent access will cause race conditions and memory
* corruption.
*
* '''Memory Leak Prevention:'''
* The iterator automatically manages Arrow array lifecycle:
* - `hasNext()` releases previous batch before preparing next
* - `next()` releases current batch before returning new one
* - `close()` ensures all resources are properly cleaned up
*
* '''Error Handling:'''
* Native exceptions are automatically mapped to appropriate Spark exceptions:
* - File not found errors → SparkException with FileNotFoundException cause
* - Parquet errors → SparkException with Parquet-specific error details
*
* @param id Unique identifier for this execution context (used for native plan tracking)
* @param inputs Sequence of input ColumnarBatch iterators from upstream Spark operators
* @param numOutputCols Number of columns in the output schema
* @param protobufQueryPlan Serialized Spark physical plan as protocol buffer bytes
* @param nativeMetrics Metrics collection node for native execution statistics
* @param numParts Total number of partitions in the query
* @param partitionIndex Zero-based index of the partition this iterator processes
*
* @note '''Memory Leak False Positives''': The allocator may report memory leaks for
* ArrowArray and ArrowSchema structs. These are false positives because the native
* side releases memory through Arrow's C Data Interface release callbacks, which
* the JVM allocator doesn't track.
*
* @see [[CometBatchIterator]] for the underlying batch iteration mechanism
* @see [[org.apache.comet.vector.NativeUtil]] for Arrow array import/export utilities
*/
class CometExecIterator(
val id: Long,
Expand Down Expand Up @@ -109,31 +146,63 @@ class CometExecIterator(
private var currentBatch: ColumnarBatch = null
private var closed: Boolean = false

private def getMemoryLimitPerTask(conf: SparkConf): Long = {
val numCores = numDriverOrExecutorCores(conf).toFloat
val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
logInfo(
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)")
limit
/**
* Checks if there are more batches available from the native execution engine.
*
* @return true if more batches are available, false if execution is complete
*
* @note This method may trigger expensive native operations for blocking operators.
* Consider this when implementing backpressure or timeout mechanisms.
*/
override def hasNext: Boolean = {
if (closed) return false

if (nextBatch.isDefined) {
return true
}

// Close previous batch if any.
// This is to guarantee safety at the native side before we overwrite the buffer memory
// shared across batches in the native side.
if (prevBatch != null) {
prevBatch.close()
prevBatch = null
}

nextBatch = getNextBatch

if (nextBatch.isEmpty) {
close()
false
} else {
true
}
}

private def numDriverOrExecutorCores(conf: SparkConf): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
/**
* Returns the next ColumnarBatch from native execution with proper memory ownership transfer.
*
* @return ColumnarBatch containing Arrow arrays with transferred ownership
* @throws NoSuchElementException if no more elements are available (call hasNext first)
*
* @note '''Critical''': Do not store references to returned batches across iterations.
* Process each batch immediately to prevent memory leaks.
*/
override def next(): ColumnarBatch = {
if (currentBatch != null) {
// Eagerly release Arrow Arrays in the previous batch
currentBatch.close()
currentBatch = null
}
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
val master = conf.get("spark.master")
master match {
case "local" => 1
case LOCAL_N_REGEX(threads) => convertToInt(threads)
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => conf.get("spark.executor.cores", "1").toInt

if (nextBatch.isEmpty && !hasNext) {
throw new NoSuchElementException("No more element")
}

currentBatch = nextBatch.get
prevBatch = currentBatch
nextBatch = None
currentBatch
}

private def getNextBatch: Option[ColumnarBatch] = {
Expand Down Expand Up @@ -185,48 +254,26 @@ class CometExecIterator(
}
}

override def hasNext: Boolean = {
if (closed) return false

if (nextBatch.isDefined) {
return true
}

// Close previous batch if any.
// This is to guarantee safety at the native side before we overwrite the buffer memory
// shared across batches in the native side.
if (prevBatch != null) {
prevBatch.close()
prevBatch = null
}

nextBatch = getNextBatch

if (nextBatch.isEmpty) {
close()
false
} else {
true
}
}

override def next(): ColumnarBatch = {
if (currentBatch != null) {
// Eagerly release Arrow Arrays in the previous batch
currentBatch.close()
currentBatch = null
}

if (nextBatch.isEmpty && !hasNext) {
throw new NoSuchElementException("No more element")
}

currentBatch = nextBatch.get
prevBatch = currentBatch
nextBatch = None
currentBatch
}

/**
* Releases all resources associated with this iterator including native memory and JNI handles.
*
* '''Resource Cleanup:'''
* - Closes current ColumnarBatch and releases Arrow arrays
* - Closes NativeUtil and associated Arrow allocators
* - Releases native execution plan and associated memory
* - Marks iterator as closed to prevent further operations
*
* '''Memory Leak Prevention:'''
* This method is critical for preventing memory leaks. The native side holds references
* to Arrow arrays and execution context that must be explicitly released.
*
* '''Thread Safety:'''
* This method is synchronized to prevent race conditions during cleanup.
* Multiple threads calling close() concurrently will be serialized safely. This is
* important because it may be invoked from Spark's task cleanup listener.
*
* @note This method is idempotent - multiple calls are safe but only the first has effect.
*/
def close(): Unit = synchronized {
if (!closed) {
if (currentBatch != null) {
Expand All @@ -240,29 +287,37 @@ class CometExecIterator(
traceMemoryUsage()
}

// The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released,
// so it will report:
// Caused by: java.lang.IllegalStateException: Memory was leaked by query.
// Memory leaked: (516) Allocator(ROOT) 0/516/808/9223372036854775807 (res/actual/peak/limit)
// Suspect this seems a false positive leak, because there is no reported memory leak at JVM
// when profiling. `allocator` reports a leak because it calculates the accumulated number
// of memory allocated for ArrowArray and ArrowSchema. But these exported ones will be
// released in native side later.
// More to clarify it. For ArrowArray and ArrowSchema, Arrow will put a release field into the
// memory region which is a callback function pointer (C function) that could be called to
// release these structs in native code too. Once we wrap their memory addresses at native
// side using FFI ArrowArray and ArrowSchema, and drop them later, the callback function will
// be called to release the memory.
// But at JVM, the allocator doesn't know about this fact so it still keeps the accumulated
// number.
// Tried to manually do `release` and `close` that can make the allocator happy, but it will
// cause JVM runtime failure.

// allocator.close()
Comment on lines -243 to -261
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment refers to an allocator that no longer exists so it doesn't seem very useful to keep around

closed = true
}
}

private def getMemoryLimitPerTask(conf: SparkConf): Long = {
val numCores = numDriverOrExecutorCores(conf).toFloat
val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf)
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat
// example 16GB maxMemory * 16 cores with 4 cores per task results
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong
logInfo(
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)")
limit
}

private def numDriverOrExecutorCores(conf: SparkConf): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
val master = conf.get("spark.master")
master match {
case "local" => 1
case LOCAL_N_REGEX(threads) => convertToInt(threads)
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => conf.get("spark.executor.cores", "1").toInt
}
}

private def traceMemoryUsage(): Unit = {
nativeLib.logMemoryUsage("jvm_heapUsed", memoryMXBean.getHeapMemoryUsage.getUsed)
val totalTaskMemory = cometTaskMemoryManager.internal.getMemoryConsumptionForThisTask
Expand Down
Loading