|
| 1 | +<!-- |
| 2 | +Licensed to the Apache Software Foundation (ASF) under one |
| 3 | +or more contributor license agreements. See the NOTICE file |
| 4 | +distributed with this work for additional information |
| 5 | +regarding copyright ownership. The ASF licenses this file |
| 6 | +to you under the Apache License, Version 2.0 (the |
| 7 | +"License"); you may not use this file except in compliance |
| 8 | +with the License. You may obtain a copy of the License at |
| 9 | +
|
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +
|
| 12 | +Unless required by applicable law or agreed to in writing, |
| 13 | +software distributed under the License is distributed on an |
| 14 | +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | +KIND, either express or implied. See the License for the |
| 16 | +specific language governing permissions and limitations |
| 17 | +under the License. |
| 18 | +--> |
| 19 | + |
| 20 | +# Arrow FFI Usage in Comet |
| 21 | + |
| 22 | +## Overview |
| 23 | + |
| 24 | +Comet uses the [Arrow C Data Interface](https://arrow.apache.org/docs/format/CDataInterface.html) for zero-copy data transfer in two directions: |
| 25 | + |
| 26 | +1. **JVM → Native**: Native code pulls batches from JVM using `CometBatchIterator` |
| 27 | +2. **Native → JVM**: JVM pulls batches from native code using `CometExecIterator` |
| 28 | + |
| 29 | +The following diagram shows an example of the end-to-end flow for a query stage. |
| 30 | + |
| 31 | + |
| 32 | + |
| 33 | +Both scenarios use the same FFI mechanism but have different ownership semantics and memory management implications. |
| 34 | + |
| 35 | +## Arrow FFI Basics |
| 36 | + |
| 37 | +The Arrow C Data Interface defines two C structures: |
| 38 | +- `ArrowArray`: Contains pointers to data buffers and metadata |
| 39 | +- `ArrowSchema`: Contains type information |
| 40 | + |
| 41 | +### Key Characteristics |
| 42 | +- **Zero-copy**: Data buffers can be shared across language boundaries without copying |
| 43 | +- **Ownership transfer**: Clear semantics for who owns and must free the data |
| 44 | +- **Release callbacks**: Custom cleanup functions for proper resource management |
| 45 | + |
| 46 | +## JVM → Native Data Flow (ScanExec) |
| 47 | + |
| 48 | +### Architecture |
| 49 | + |
| 50 | +When native code needs data from the JVM, it uses `ScanExec` which calls into `CometBatchIterator`: |
| 51 | + |
| 52 | +``` |
| 53 | +┌─────────────────┐ |
| 54 | +│ Spark/Scala │ |
| 55 | +│ CometExecIter │ |
| 56 | +└────────┬────────┘ |
| 57 | + │ produces batches |
| 58 | + ▼ |
| 59 | +┌─────────────────┐ |
| 60 | +│ CometBatchIter │ ◄─── JNI call from native |
| 61 | +│ (JVM side) │ |
| 62 | +└────────┬────────┘ |
| 63 | + │ Arrow FFI |
| 64 | + │ (transfers ArrowArray/ArrowSchema pointers) |
| 65 | + ▼ |
| 66 | +┌─────────────────┐ |
| 67 | +│ ScanExec │ |
| 68 | +│ (Rust/native) │ |
| 69 | +└────────┬────────┘ |
| 70 | + │ |
| 71 | + ▼ |
| 72 | +┌─────────────────┐ |
| 73 | +│ DataFusion │ |
| 74 | +│ operators │ |
| 75 | +└─────────────────┘ |
| 76 | +``` |
| 77 | + |
| 78 | +### FFI Transfer Process |
| 79 | + |
| 80 | +The data transfer happens in `ScanExec::get_next()`: |
| 81 | + |
| 82 | +```rust |
| 83 | +// 1. Allocate FFI structures on native side (Rust heap) |
| 84 | +for _ in 0..num_cols { |
| 85 | + let arrow_array = Rc::new(FFI_ArrowArray::empty()); |
| 86 | + let arrow_schema = Rc::new(FFI_ArrowSchema::empty()); |
| 87 | + let array_ptr = Rc::into_raw(arrow_array) as i64; |
| 88 | + let schema_ptr = Rc::into_raw(arrow_schema) as i64; |
| 89 | + // Store pointers... |
| 90 | +} |
| 91 | + |
| 92 | +// 2. Call JVM to populate FFI structures |
| 93 | +let num_rows: i32 = unsafe { |
| 94 | + jni_call!(env, comet_batch_iterator(iter).next(array_obj, schema_obj) -> i32)? |
| 95 | +}; |
| 96 | + |
| 97 | +// 3. Import data from FFI structures |
| 98 | +for i in 0..num_cols { |
| 99 | + let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?; |
| 100 | + let array = make_array(array_data); |
| 101 | + // ... process array |
| 102 | +} |
| 103 | +``` |
| 104 | + |
| 105 | +### Memory Layout |
| 106 | + |
| 107 | +When a batch is transferred from JVM to native: |
| 108 | + |
| 109 | +``` |
| 110 | +JVM Heap: Native Memory: |
| 111 | +┌──────────────────┐ ┌──────────────────┐ |
| 112 | +│ ColumnarBatch │ │ FFI_ArrowArray │ |
| 113 | +│ ┌──────────────┐ │ │ ┌──────────────┐ │ |
| 114 | +│ │ ArrowBuf │─┼──────────────>│ │ buffers[0] │ │ |
| 115 | +│ │ (off-heap) │ │ │ │ (pointer) │ │ |
| 116 | +│ └──────────────┘ │ │ └──────────────┘ │ |
| 117 | +└──────────────────┘ └──────────────────┘ |
| 118 | + │ │ |
| 119 | + │ │ |
| 120 | +Off-heap Memory: │ |
| 121 | +┌──────────────────┐ <──────────────────────┘ |
| 122 | +│ Actual Data │ |
| 123 | +│ (e.g., int32[]) │ |
| 124 | +└──────────────────┘ |
| 125 | +``` |
| 126 | + |
| 127 | +**Key Point**: The actual data buffers can be off-heap, but the `ArrowArray` and `ArrowSchema` wrapper objects are **always allocated on the JVM heap**. |
| 128 | + |
| 129 | +### Wrapper Object Lifecycle |
| 130 | + |
| 131 | +When arrays are created in the JVM and passed to native code, the JVM creates the array data off-heap and creates |
| 132 | +wrapper objects `ArrowArray` and `ArrowSchema` on-heap. These wrapper objects can consume significant memory over |
| 133 | +time. |
| 134 | + |
| 135 | +``` |
| 136 | +Per batch overhead on JVM heap: |
| 137 | +- ArrowArray object: ~100 bytes |
| 138 | +- ArrowSchema object: ~100 bytes |
| 139 | +- Per column: ~200 bytes |
| 140 | +- 100 columns × 1000 batches = ~20 MB of wrapper objects |
| 141 | +``` |
| 142 | + |
| 143 | +When native code pulls batches from the JVM, the JVM wrapper objects are kept alive until the native code drops |
| 144 | +all references to the arrays. |
| 145 | + |
| 146 | +When operators such as `SortExec` fetch many batches and buffer them in native code, the number of wrapper objects |
| 147 | +in Java on-heap memory keeps growing until the batches are released in native code at the end of the sort operation. |
| 148 | + |
| 149 | +### Ownership Transfer |
| 150 | + |
| 151 | +The Arrow C data interface supports ownership transfer by registering callbacks in the C struct that is passed over |
| 152 | +the JNI boundary for the function to delete the array data. For example, the `ArrowArray` struct has: |
| 153 | + |
| 154 | +```c |
| 155 | +// Release callback |
| 156 | +void (*release)(struct ArrowArray*); |
| 157 | +``` |
| 158 | + |
| 159 | +Comet currently does not always follow best practice around ownership transfer because there are some cases where |
| 160 | +Comet JVM code will retain references to arrays after passing them to native code and may mutate the underlying |
| 161 | +buffers. There is an `arrow_ffi_safe` flag in the protocol buffer definition of `Scan` that indicates whether |
| 162 | +ownership is being transferred according to the Arrow C data interface specification. |
| 163 | + |
| 164 | + |
| 165 | +```protobuf |
| 166 | +message Scan { |
| 167 | + repeated spark.spark_expression.DataType fields = 1; |
| 168 | + // The source of the scan (e.g. file scan, broadcast exchange, shuffle, etc). This |
| 169 | + // is purely for informational purposes when viewing native query plans in |
| 170 | + // debug mode. |
| 171 | + string source = 2; |
| 172 | + // Whether native code can assume ownership of batches that it receives |
| 173 | + bool arrow_ffi_safe = 3; |
| 174 | +} |
| 175 | +``` |
| 176 | + |
| 177 | +#### When ownership is NOT transferred to native: |
| 178 | + |
| 179 | +If the data originates from `native_comet` scan (or from `native_iceberg_compat` in some cases), then ownership is |
| 180 | +not transferred to native and the JVM may re-use the underlying buffers in the future. |
| 181 | + |
| 182 | +It is critical that the native code performs a deep copy of the arrays if the arrays are to be buffered by |
| 183 | +operators such as `SortExec` or `ShuffleWriterExec`, otherwise data corruption is likely to occur. |
| 184 | + |
| 185 | +#### When ownership IS transferred to native: |
| 186 | + |
| 187 | +When ownership is transferred, it is safe to buffer batches in native. However, JVM wrapper objects will not be |
| 188 | +released until the native batches are dropped. This can lead to OOM or GC pressure if there is not enough Java |
| 189 | +heap memory configured. |
| 190 | + |
| 191 | +## Native → JVM Data Flow (CometExecIterator) |
| 192 | + |
| 193 | +### Architecture |
| 194 | + |
| 195 | +When JVM needs results from native execution: |
| 196 | + |
| 197 | +``` |
| 198 | +┌─────────────────┐ |
| 199 | +│ DataFusion Plan │ |
| 200 | +│ (native) │ |
| 201 | +└────────┬────────┘ |
| 202 | + │ produces RecordBatch |
| 203 | + ▼ |
| 204 | +┌─────────────────┐ |
| 205 | +│ CometExecIter │ |
| 206 | +│ (Rust/native) │ |
| 207 | +└────────┬────────┘ |
| 208 | + │ Arrow FFI |
| 209 | + │ (transfers ArrowArray/ArrowSchema pointers) |
| 210 | + ▼ |
| 211 | +┌─────────────────┐ |
| 212 | +│ CometExecIter │ ◄─── JNI call from Spark |
| 213 | +│ (Scala side) │ |
| 214 | +└────────┬────────┘ |
| 215 | + │ |
| 216 | + ▼ |
| 217 | +┌─────────────────┐ |
| 218 | +│ Spark Actions │ |
| 219 | +│ (collect, etc) │ |
| 220 | +└─────────────────┘ |
| 221 | +``` |
| 222 | + |
| 223 | +### FFI Transfer Process |
| 224 | + |
| 225 | +The transfer happens in `CometExecIterator::getNextBatch()`: |
| 226 | + |
| 227 | +```scala |
| 228 | +// Scala side |
| 229 | +def getNextBatch(): ColumnarBatch = { |
| 230 | + val batchHandle = Native.getNextBatch(nativeHandle) |
| 231 | + |
| 232 | + // Import from FFI structures |
| 233 | + val vectors = (0 until schema.length).map { i => |
| 234 | + val array = Array.empty[Long](1) |
| 235 | + val schemaPtr = Array.empty[Long](1) |
| 236 | + |
| 237 | + // Get FFI pointers from native |
| 238 | + Native.exportVector(batchHandle, i, array, schemaPtr) |
| 239 | + |
| 240 | + // Import into Arrow Java |
| 241 | + Data.importVector(allocator, array(0), schemaPtr(0)) |
| 242 | + } |
| 243 | + |
| 244 | + new ColumnarBatch(vectors.toArray, numRows) |
| 245 | +} |
| 246 | +``` |
| 247 | + |
| 248 | +```rust |
| 249 | +// Native side (simplified) |
| 250 | +#[no_mangle] |
| 251 | +pub extern "system" fn Java_..._getNextBatch( |
| 252 | + env: JNIEnv, |
| 253 | + handle: jlong, |
| 254 | +) -> jlong { |
| 255 | + let context = get_exec_context(handle)?; |
| 256 | + let batch = context.stream.next().await?; |
| 257 | + |
| 258 | + // Store batch and return handle |
| 259 | + let batch_handle = Box::into_raw(Box::new(batch)) as i64; |
| 260 | + batch_handle |
| 261 | +} |
| 262 | + |
| 263 | +#[no_mangle] |
| 264 | +pub extern "system" fn Java_..._exportVector( |
| 265 | + env: JNIEnv, |
| 266 | + batch_handle: jlong, |
| 267 | + col_idx: jint, |
| 268 | + array_ptr: jlongArray, |
| 269 | + schema_ptr: jlongArray, |
| 270 | +) { |
| 271 | + let batch = get_batch(batch_handle)?; |
| 272 | + let array = batch.column(col_idx); |
| 273 | + |
| 274 | + // Export to FFI structures |
| 275 | + let (array_ffi, schema_ffi) = to_ffi(array.to_data())?; |
| 276 | + |
| 277 | + // Write pointers back to JVM |
| 278 | + env.set_long_array_region(array_ptr, 0, &[array_ffi as i64])?; |
| 279 | + env.set_long_array_region(schema_ptr, 0, &[schema_ffi as i64])?; |
| 280 | +} |
| 281 | +``` |
| 282 | + |
| 283 | +### Wrapper Object Lifecycle (Native → JVM) |
| 284 | + |
| 285 | +``` |
| 286 | +Time Native Memory JVM Heap Off-heap/Native |
| 287 | +──────────────────────────────────────────────────────────────────────── |
| 288 | +t0 RecordBatch produced - Data in native |
| 289 | + in DataFusion |
| 290 | +
|
| 291 | +t1 FFI_ArrowArray created - Data in native |
| 292 | + FFI_ArrowSchema created |
| 293 | + (native heap) |
| 294 | +
|
| 295 | +t2 Pointers exported to JVM ArrowBuf created Data in native |
| 296 | + (wraps native ptr) |
| 297 | +
|
| 298 | +t3 FFI structures kept alive Spark processes Data in native |
| 299 | + via batch handle ColumnarBatch ✓ Valid |
| 300 | +
|
| 301 | +t4 Batch handle released ArrowBuf freed Data freed |
| 302 | + Release callback runs (triggers native (via release |
| 303 | + release callback) callback) |
| 304 | +``` |
| 305 | + |
| 306 | +**Key Difference from JVM → Native**: |
| 307 | +- Native code controls lifecycle through batch handle |
| 308 | +- JVM creates `ArrowBuf` wrappers that point to native memory |
| 309 | +- Release callback ensures proper cleanup when JVM is done |
| 310 | +- No GC pressure issue because native allocator manages the data |
| 311 | + |
| 312 | +### Release Callbacks |
| 313 | + |
| 314 | +Critical for proper cleanup: |
| 315 | + |
| 316 | +```rust |
| 317 | +// Native release callback (simplified) |
| 318 | +extern "C" fn release_batch(array: *mut FFI_ArrowArray) { |
| 319 | + if !array.is_null() { |
| 320 | + unsafe { |
| 321 | + // Free the data buffers |
| 322 | + for buffer in (*array).buffers { |
| 323 | + drop(Box::from_raw(buffer)); |
| 324 | + } |
| 325 | + // Free the array structure itself |
| 326 | + drop(Box::from_raw(array)); |
| 327 | + } |
| 328 | + } |
| 329 | +} |
| 330 | +``` |
| 331 | + |
| 332 | +When JVM is done with the data: |
| 333 | +```java |
| 334 | +// ArrowBuf.close() triggers the release callback |
| 335 | +arrowBuf.close(); // → calls native release_batch() |
| 336 | +``` |
| 337 | + |
| 338 | +## Memory Ownership Rules |
| 339 | + |
| 340 | +### JVM → Native |
| 341 | + |
| 342 | +| Scenario | `arrow_ffi_safe` | Ownership | Action Required | |
| 343 | +|----------|------------------|-----------|-----------------| |
| 344 | +| Temporary scan | `false` | JVM keeps | **Must deep copy** to avoid corruption | |
| 345 | +| Ownership transfer | `true` | Native owns | Copy only to unpack dictionaries | |
| 346 | + |
| 347 | +### Native → JVM |
| 348 | + |
| 349 | +| Scenario | Ownership | Action Required | |
| 350 | +|----------|-----------|-----------------| |
| 351 | +| All cases | Native allocates, JVM references | JVM must call `close()` to trigger native release callback | |
| 352 | + |
| 353 | +## Further Reading |
| 354 | + |
| 355 | +- [Arrow C Data Interface Specification](https://arrow.apache.org/docs/format/CDataInterface.html) |
| 356 | +- [Arrow Java FFI Implementation](https://github.com/apache/arrow/tree/main/java/c) |
| 357 | +- [Arrow Rust FFI Implementation](https://docs.rs/arrow/latest/arrow/ffi/) |
0 commit comments