-
Notifications
You must be signed in to change notification settings - Fork 234
chore: Improve documentation for CometExecIterator
#2169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,22 +35,66 @@ import org.apache.comet.Tracing.withTrace | |
import org.apache.comet.vector.NativeUtil | ||
|
||
/** | ||
* An iterator class used to execute Comet native query. It takes an input iterator which comes | ||
* from Comet Scan and is expected to produce batches of Arrow Arrays. During consuming this | ||
* iterator, it will consume input iterator and pass Arrow Arrays to Comet native engine by | ||
* addresses. Even after the end of input iterator, this iterator still possibly continues | ||
* executing native query as there might be blocking operators such as Sort, Aggregate. The API | ||
* `hasNext` can be used to check if it is the end of this iterator (i.e. the native query is | ||
* done). | ||
* Comet's primary execution iterator that bridges JVM (Spark) and native (Rust) execution | ||
* environments. This iterator orchestrates native query execution on Arrow columnar batches while | ||
* managing sophisticated memory ownership semantics across the JNI boundary. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "sophisticated" isn't adding anything here. |
||
* | ||
* '''Architecture Overview:''' | ||
* - Consumes input ColumnarBatch iterators from Spark operators | ||
* - Transfers Arrow array ownership to native DataFusion execution engine via JNI (* see | ||
* note below) | ||
* - Executes queries natively using DataFusion's columnar processing | ||
* - Returns results as ColumnarBatch with ownership transferred back to JVM | ||
* | ||
* * This isn't quite true. Comet does not currently implement best practice when passing batches | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
is it needed?? |
||
* from JVM to native. JVM retains ownership of arrays and native code must make defensive copies | ||
* as needed. | ||
* | ||
* '''Memory Management:''' This class implements a sophisticated ownership transfer pattern to | ||
* prevent memory leaks and ensure thread safety: | ||
* - '''Single Ownership''': Only one owner of Arrow array data at any time | ||
* - '''Transfer Semantics''': Ownership flows JVM → Native → JVM during processing | ||
* - '''Automatic Cleanup''': Previous batches are automatically released on next() calls | ||
* - '''Exception Safety''': Resources are cleaned up even during exceptions | ||
* | ||
* '''Thread Safety:''' This class is '''NOT thread-safe''' by design. Each iterator instance | ||
* should only be accessed from a single thread. Concurrent access will cause race conditions and | ||
* memory corruption. | ||
* | ||
* '''Memory Leak Prevention:''' The iterator automatically manages Arrow array lifecycle: | ||
* - `hasNext()` releases previous batch before preparing next | ||
* - `next()` releases current batch before returning new one | ||
* - `close()` ensures all resources are properly cleaned up | ||
* | ||
* '''Error Handling:''' Native exceptions are automatically mapped to appropriate Spark | ||
* exceptions: | ||
* - File not found errors → SparkException with FileNotFoundException cause | ||
* - Parquet errors → SparkException with Parquet-specific error details | ||
* | ||
* @param id | ||
* Unique identifier for this execution context (used for native plan tracking) | ||
* @param inputs | ||
* The input iterators producing sequence of batches of Arrow Arrays. | ||
* Sequence of input ColumnarBatch iterators from upstream Spark operators | ||
* @param numOutputCols | ||
* Number of columns in the output schema | ||
* @param protobufQueryPlan | ||
* The serialized bytes of Spark execution plan. | ||
* Serialized Spark physical plan as protocol buffer bytes | ||
* @param nativeMetrics | ||
* Metrics collection node for native execution statistics | ||
* @param numParts | ||
* The number of partitions. | ||
* Total number of partitions in the query | ||
* @param partitionIndex | ||
* The index of the partition. | ||
* Zero-based index of the partition this iterator processes | ||
* | ||
* @note | ||
* '''Memory Leak False Positives''': The allocator may report memory leaks for ArrowArray and | ||
* ArrowSchema structs. These are false positives because the native side releases memory | ||
* through Arrow's C Data Interface release callbacks, which the JVM allocator doesn't track. | ||
* | ||
* @see | ||
* [[CometBatchIterator]] for the underlying batch iteration mechanism | ||
* @see | ||
* [[org.apache.comet.vector.NativeUtil]] for Arrow array import/export utilities | ||
*/ | ||
class CometExecIterator( | ||
val id: Long, | ||
|
@@ -109,31 +153,68 @@ class CometExecIterator( | |
private var currentBatch: ColumnarBatch = null | ||
private var closed: Boolean = false | ||
|
||
private def getMemoryLimitPerTask(conf: SparkConf): Long = { | ||
val numCores = numDriverOrExecutorCores(conf).toFloat | ||
val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) | ||
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat | ||
// example 16GB maxMemory * 16 cores with 4 cores per task results | ||
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB | ||
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong | ||
logInfo( | ||
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)") | ||
limit | ||
/** | ||
* Checks if there are more batches available from the native execution engine. | ||
* | ||
* @return | ||
* true if more batches are available, false if execution is complete | ||
* | ||
* @note | ||
* This method may trigger expensive native operations for blocking operators. Consider this | ||
* when implementing backpressure or timeout mechanisms. | ||
*/ | ||
override def hasNext: Boolean = { | ||
if (closed) return false | ||
|
||
if (nextBatch.isDefined) { | ||
return true | ||
} | ||
|
||
// Close previous batch if any. | ||
// This is to guarantee safety at the native side before we overwrite the buffer memory | ||
// shared across batches in the native side. | ||
if (prevBatch != null) { | ||
prevBatch.close() | ||
prevBatch = null | ||
} | ||
|
||
nextBatch = getNextBatch | ||
|
||
if (nextBatch.isEmpty) { | ||
close() | ||
false | ||
} else { | ||
true | ||
} | ||
} | ||
|
||
private def numDriverOrExecutorCores(conf: SparkConf): Int = { | ||
def convertToInt(threads: String): Int = { | ||
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt | ||
/** | ||
* Returns the next ColumnarBatch from native execution with proper memory ownership transfer. | ||
* | ||
* @return | ||
* ColumnarBatch containing Arrow arrays with transferred ownership | ||
* @throws NoSuchElementException | ||
* if no more elements are available (call hasNext first) | ||
* | ||
* @note | ||
* '''Critical''': Do not store references to returned batches across iterations. Process each | ||
* batch immediately to prevent memory leaks. | ||
*/ | ||
override def next(): ColumnarBatch = { | ||
if (currentBatch != null) { | ||
// Eagerly release Arrow Arrays in the previous batch | ||
currentBatch.close() | ||
currentBatch = null | ||
} | ||
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r | ||
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r | ||
val master = conf.get("spark.master") | ||
master match { | ||
case "local" => 1 | ||
case LOCAL_N_REGEX(threads) => convertToInt(threads) | ||
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) | ||
case _ => conf.get("spark.executor.cores", "1").toInt | ||
|
||
if (nextBatch.isEmpty && !hasNext) { | ||
throw new NoSuchElementException("No more element") | ||
} | ||
|
||
currentBatch = nextBatch.get | ||
prevBatch = currentBatch | ||
nextBatch = None | ||
currentBatch | ||
} | ||
|
||
private def getNextBatch: Option[ColumnarBatch] = { | ||
|
@@ -185,48 +266,25 @@ class CometExecIterator( | |
} | ||
} | ||
|
||
override def hasNext: Boolean = { | ||
if (closed) return false | ||
|
||
if (nextBatch.isDefined) { | ||
return true | ||
} | ||
|
||
// Close previous batch if any. | ||
// This is to guarantee safety at the native side before we overwrite the buffer memory | ||
// shared across batches in the native side. | ||
if (prevBatch != null) { | ||
prevBatch.close() | ||
prevBatch = null | ||
} | ||
|
||
nextBatch = getNextBatch | ||
|
||
if (nextBatch.isEmpty) { | ||
close() | ||
false | ||
} else { | ||
true | ||
} | ||
} | ||
|
||
override def next(): ColumnarBatch = { | ||
if (currentBatch != null) { | ||
// Eagerly release Arrow Arrays in the previous batch | ||
currentBatch.close() | ||
currentBatch = null | ||
} | ||
|
||
if (nextBatch.isEmpty && !hasNext) { | ||
throw new NoSuchElementException("No more element") | ||
} | ||
|
||
currentBatch = nextBatch.get | ||
prevBatch = currentBatch | ||
nextBatch = None | ||
currentBatch | ||
} | ||
|
||
/** | ||
* Releases all resources associated with this iterator including native memory and JNI handles. | ||
* | ||
* '''Resource Cleanup:''' | ||
* - Closes current ColumnarBatch and releases Arrow arrays | ||
* - Closes NativeUtil and associated Arrow allocators | ||
* - Releases native execution plan and associated memory | ||
* - Marks iterator as closed to prevent further operations | ||
* | ||
* '''Memory Leak Prevention:''' This method is critical for preventing memory leaks. The native | ||
* side holds references to Arrow arrays and execution context that must be explicitly released. | ||
* | ||
* '''Thread Safety:''' This method is synchronized to prevent race conditions during cleanup. | ||
* Multiple threads calling close() concurrently will be serialized safely. This is important | ||
* because this method may be invoked from a Spark TaskCompletionListener thread. | ||
* | ||
* @note | ||
* This method is idempotent - multiple calls are safe but only the first has effect. | ||
*/ | ||
def close(): Unit = synchronized { | ||
if (!closed) { | ||
if (currentBatch != null) { | ||
|
@@ -240,29 +298,37 @@ class CometExecIterator( | |
traceMemoryUsage() | ||
} | ||
|
||
// The allocator thoughts the exported ArrowArray and ArrowSchema structs are not released, | ||
// so it will report: | ||
// Caused by: java.lang.IllegalStateException: Memory was leaked by query. | ||
// Memory leaked: (516) Allocator(ROOT) 0/516/808/9223372036854775807 (res/actual/peak/limit) | ||
// Suspect this seems a false positive leak, because there is no reported memory leak at JVM | ||
// when profiling. `allocator` reports a leak because it calculates the accumulated number | ||
// of memory allocated for ArrowArray and ArrowSchema. But these exported ones will be | ||
// released in native side later. | ||
// More to clarify it. For ArrowArray and ArrowSchema, Arrow will put a release field into the | ||
// memory region which is a callback function pointer (C function) that could be called to | ||
// release these structs in native code too. Once we wrap their memory addresses at native | ||
// side using FFI ArrowArray and ArrowSchema, and drop them later, the callback function will | ||
// be called to release the memory. | ||
// But at JVM, the allocator doesn't know about this fact so it still keeps the accumulated | ||
// number. | ||
// Tried to manually do `release` and `close` that can make the allocator happy, but it will | ||
// cause JVM runtime failure. | ||
|
||
// allocator.close() | ||
Comment on lines
-243
to
-261
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment refers to an |
||
closed = true | ||
} | ||
} | ||
|
||
private def getMemoryLimitPerTask(conf: SparkConf): Long = { | ||
val numCores = numDriverOrExecutorCores(conf).toFloat | ||
val maxMemory = CometSparkSessionExtensions.getCometMemoryOverhead(conf) | ||
val coresPerTask = conf.get("spark.task.cpus", "1").toFloat | ||
// example 16GB maxMemory * 16 cores with 4 cores per task results | ||
// in memory_limit_per_task = 16 GB * 4 / 16 = 16 GB / 4 = 4GB | ||
val limit = (maxMemory.toFloat * coresPerTask / numCores).toLong | ||
logInfo( | ||
s"Calculated per-task memory limit of $limit ($maxMemory * $coresPerTask / $numCores)") | ||
limit | ||
} | ||
|
||
private def numDriverOrExecutorCores(conf: SparkConf): Int = { | ||
def convertToInt(threads: String): Int = { | ||
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt | ||
} | ||
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r | ||
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r | ||
val master = conf.get("spark.master") | ||
master match { | ||
case "local" => 1 | ||
case LOCAL_N_REGEX(threads) => convertToInt(threads) | ||
case LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) | ||
case _ => conf.get("spark.executor.cores", "1").toInt | ||
} | ||
} | ||
|
||
private def traceMemoryUsage(): Unit = { | ||
nativeLib.logMemoryUsage("jvm_heapUsed", memoryMXBean.getHeapMemoryUsage.getUsed) | ||
val totalTaskMemory = cometTaskMemoryManager.internal.getMemoryConsumptionForThisTask | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would simplify this down (along with CometBatchIterator) down to something that just clearly lays out "iterator for passing Arrow batches from JVM to Native" or vice versa. There are a lot of words here that don't really make clear what it's doing (same with #2168)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. i've pushed a more concise version