Skip to content

Commit c2c8ba0

Browse files
andygroveclaude
andcommitted
feat: add experimental native columnar to row conversion
This PR adds an experimental native (Rust-based) implementation of ColumnarToRowExec that converts Arrow columnar data to Spark UnsafeRow format. Benefits over the current Scala implementation: - Zero-copy for variable-length types: String and Binary data is written directly to the output buffer without intermediate Java object allocation - Vectorized processing: The native implementation processes data in a columnar fashion, improving CPU cache utilization - Reduced GC pressure: All conversion happens in native memory, avoiding the creation of temporary Java objects that would need garbage collection - Buffer reuse: The output buffer is allocated once and reused across batches, minimizing memory allocation overhead The feature is disabled by default and can be enabled by setting: spark.comet.exec.columnarToRow.native.enabled=true Supported data types: - Primitive types: Boolean, Byte, Short, Int, Long, Float, Double - Date and Timestamp (microseconds) - Decimal (both inline precision<=18 and variable-length precision>18) - String and Binary - Complex types: Struct, Array, Map (nested) This is an experimental feature for evaluation and benchmarking purposes. Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 68f127b commit c2c8ba0

File tree

11 files changed

+1265
-4
lines changed

11 files changed

+1265
-4
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,17 @@ object CometConf extends ShimCometConf {
286286
val COMET_EXEC_LOCAL_TABLE_SCAN_ENABLED: ConfigEntry[Boolean] =
287287
createExecEnabledConfig("localTableScan", defaultValue = false)
288288

289+
val COMET_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
290+
conf(s"$COMET_EXEC_CONFIG_PREFIX.columnarToRow.native.enabled")
291+
.category(CATEGORY_EXEC)
292+
.doc(
293+
"Whether to enable native columnar to row conversion. When enabled, Comet will use " +
294+
"native Rust code to convert Arrow columnar data to Spark UnsafeRow format instead " +
295+
"of the JVM implementation. This can improve performance for queries that need to " +
296+
"convert between columnar and row formats. This is an experimental feature.")
297+
.booleanConf
298+
.createWithDefault(false)
299+
289300
val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
290301
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
291302
.category(CATEGORY_ENABLE_EXEC)

common/src/main/scala/org/apache/comet/vector/NativeUtil.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,26 @@ class NativeUtil {
7878
(arrays, schemas)
7979
}
8080

81+
/**
82+
* Exports a ColumnarBatch to Arrow FFI and returns the memory addresses.
83+
*
84+
* This is a convenience method that allocates Arrow structs, exports the batch, and returns
85+
* just the memory addresses (without exposing the Arrow types).
86+
*
87+
* @param batch
88+
* the columnar batch to export
89+
* @return
90+
* a tuple of (array addresses, schema addresses, number of rows)
91+
*/
92+
def exportBatchToAddresses(batch: ColumnarBatch): (Array[Long], Array[Long], Int) = {
93+
val numCols = batch.numCols()
94+
val (arrays, schemas) = allocateArrowStructs(numCols)
95+
val arrayAddrs = arrays.map(_.memoryAddress())
96+
val schemaAddrs = schemas.map(_.memoryAddress())
97+
val numRows = exportBatch(arrayAddrs, schemaAddrs, batch)
98+
(arrayAddrs, schemaAddrs, numRows)
99+
}
100+
81101
/**
82102
* Exports a Comet `ColumnarBatch` into a list of memory addresses that can be consumed by the
83103
* native execution.

0 commit comments

Comments
 (0)