Skip to content

Commit 3e0daa3

Browse files
authored
Refactor stream handling for DF execution (#20076)
* Refactor stream handling for DF execution * Change to Buffer Allocator for DF execution * Introduce RecordBatchIterator and integrate with collector * Minor fixup * Update VSRManager to update field vector atomically * Fixup * Fixup * Fixup * Test fixup * Integrate with AsyncRecordBatchIterator * Fix up
1 parent ad3dd8b commit 3e0daa3

File tree

15 files changed

+489
-230
lines changed

15 files changed

+489
-230
lines changed

libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/NativeHandle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ protected NativeHandle(long ptr) {
4343
* Ensures the handle is still open.
4444
* @throws IllegalStateException if the handle has been closed
4545
*/
46-
protected void ensureOpen() {
46+
public void ensureOpen() {
4747
if (closed.get()) {
4848
throw new IllegalStateException("Handle already closed");
4949
}

libs/vectorized-exec-spi/src/main/java/org/opensearch/vectorized/execution/jni/RefCountedNativeHandle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public void retain() {
4242
*/
4343
@Override
4444
public final void close() {
45+
ensureOpen();
4546
if (refCount.decrementAndGet() == 0) {
4647
super.close();
4748
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package com.parquet.parquetdataformat.bridge;
10+
11+
import java.io.Closeable;
12+
import java.io.IOException;
13+
14+
/**
15+
* Type-safe handle for native Parquet writer with lifecycle management.
16+
*/
17+
public class NativeParquetWriter implements Closeable {
18+
19+
private final String filePath;
20+
21+
/**
22+
* Creates a new native Parquet writer.
23+
* @param filePath path to the Parquet file
24+
* @param schemaAddress Arrow C Data Interface schema pointer
25+
* @throws IOException if writer creation fails
26+
*/
27+
public NativeParquetWriter(String filePath, long schemaAddress) throws IOException {
28+
this.filePath = filePath;
29+
RustBridge.createWriter(filePath, schemaAddress);
30+
}
31+
32+
/**
33+
* Writes a batch to the Parquet file.
34+
* @param arrayAddress Arrow C Data Interface array pointer
35+
* @param schemaAddress Arrow C Data Interface schema pointer
36+
* @throws IOException if write fails
37+
*/
38+
public void write(long arrayAddress, long schemaAddress) throws IOException {
39+
RustBridge.write(filePath, arrayAddress, schemaAddress);
40+
}
41+
42+
/**
43+
* Flushes buffered data to disk.
44+
* @throws IOException if flush fails
45+
*/
46+
public void flush() throws IOException {
47+
RustBridge.flushToDisk(filePath);
48+
}
49+
50+
@Override
51+
public void close() {
52+
try {
53+
RustBridge.closeWriter(filePath);
54+
} catch (IOException e) {
55+
throw new RuntimeException("Failed to close Parquet writer for " + filePath, e);
56+
}
57+
}
58+
59+
public String getFilePath() {
60+
return filePath;
61+
}
62+
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class RustBridge {
3333
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
3434
public static native void closeWriter(String file) throws IOException;
3535
public static native void flushToDisk(String file) throws IOException;
36+
3637
public static native long getFilteredNativeBytesUsed(String pathPrefix);
3738

3839

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
package com.parquet.parquetdataformat.vsr;
1010

1111
import com.parquet.parquetdataformat.bridge.ArrowExport;
12-
import com.parquet.parquetdataformat.bridge.RustBridge;
12+
import com.parquet.parquetdataformat.bridge.NativeParquetWriter;
1313
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
1414
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
1515
import org.apache.arrow.vector.FieldVector;
@@ -20,9 +20,11 @@
2020
import org.opensearch.index.engine.exec.FlushIn;
2121
import org.opensearch.index.engine.exec.WriteResult;
2222

23+
import java.io.Closeable;
2324
import java.io.IOException;
2425
import java.util.HashMap;
2526
import java.util.Map;
27+
import java.util.concurrent.atomic.AtomicReference;
2628

2729
/**
2830
* Manages VectorSchemaRoot lifecycle with integrated memory management and native call wrappers.
@@ -32,18 +34,19 @@
3234
* <ul>
3335
* <li>{@link ManagedVSR} - Thread-safe VSR with state management</li>
3436
* <li>{@link VSRPool} - Resource pooling for VSRs</li>
35-
* <li>{@link RustBridge} - Direct JNI calls to Rust backend</li>
37+
* <li>{@link com.parquet.parquetdataformat.bridge.RustBridge} - Direct JNI calls to Rust backend</li>
3638
* </ul>
3739
*/
38-
public class VSRManager {
39-
40-
private static final Logger logger = LogManager.getLogger(VSRManager.class);
41-
42-
private ManagedVSR managedVSR;
40+
public class VSRManager implements Closeable {
41+
private final AtomicReference<ManagedVSR> managedVSR = new AtomicReference<>();
4342
private Map<String, FieldVector> fieldVectorMap;
4443
private final Schema schema;
4544
private final String fileName;
4645
private final VSRPool vsrPool;
46+
private NativeParquetWriter writer;
47+
48+
private static final Logger logger = LogManager.getLogger(VSRManager.class);
49+
4750

4851
public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPool) {
4952
this.fileName = fileName;
@@ -53,42 +56,42 @@ public VSRManager(String fileName, Schema schema, ArrowBufferPool arrowBufferPoo
5356
this.vsrPool = new VSRPool("pool-" + fileName, schema, arrowBufferPool);
5457

5558
// Get active VSR from pool
56-
this.managedVSR = vsrPool.getActiveVSR();
59+
this.managedVSR.set(vsrPool.getActiveVSR());
5760
initializeFieldVectorMap();
5861
// Initialize writer lazily to avoid crashes
5962
initializeWriter();
6063
}
6164

6265
private void initializeWriter() {
6366
try {
64-
// Export schema through managed VSR
65-
try (ArrowExport export = managedVSR.exportSchema()) {
66-
long schemaAddress = export.getSchemaAddress();
67-
68-
// Direct native call - RustBridge handles all validation
69-
RustBridge.createWriter(fileName, schemaAddress);
67+
try (ArrowExport export = managedVSR.get().exportSchema()) {
68+
writer = new NativeParquetWriter(fileName, export.getSchemaAddress());
7069
}
7170
} catch (Exception e) {
7271
throw new RuntimeException("Failed to initialize Parquet writer: " + e.getMessage(), e);
7372
}
7473
}
7574

7675
public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOException {
77-
// Ensure we have an active VSR (handle case where getActiveVSR() returns null)
78-
if (managedVSR == null) {
79-
managedVSR = vsrPool.getActiveVSR();
80-
if (managedVSR == null) {
81-
throw new IOException("No active VSR available");
76+
ManagedVSR currentVSR = managedVSR.updateAndGet(vsr -> {
77+
if (vsr == null) {
78+
ManagedVSR newVSR = vsrPool.getActiveVSR();
79+
if (newVSR != null) {
80+
reinitializeFieldVectorMap();
81+
}
82+
return newVSR;
8283
}
83-
reinitializeFieldVectorMap();
84-
}
84+
return vsr;
85+
});
8586

86-
// Ensure VSR is in ACTIVE state for modifications
87-
if (managedVSR.getState() != VSRState.ACTIVE) {
88-
throw new IOException("Cannot add document - VSR is not active: " + managedVSR.getState());
87+
if (currentVSR == null) {
88+
throw new IOException("No active VSR available");
89+
}
90+
if (currentVSR.getState() != VSRState.ACTIVE) {
91+
throw new IOException("Cannot add document - VSR is not active: " + currentVSR.getState());
8992
}
9093

91-
logger.debug("addToManagedVSR called for {}, current row count: {}", fileName, managedVSR.getRowCount());
94+
logger.debug("addToManagedVSR called for {}, current row count: {}", fileName, currentVSR.getRowCount());
9295

9396
try {
9497
// Since ParquetDocumentInput now works directly with ManagedVSR,
@@ -97,7 +100,7 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
97100
// which will increment the row count.
98101
WriteResult result = document.addToWriter();
99102

100-
logger.debug("After adding document to {}, row count: {}", fileName, managedVSR.getRowCount());
103+
logger.debug("After adding document to {}, row count: {}", fileName, currentVSR.getRowCount());
101104

102105
// Check for VSR rotation AFTER successful document processing
103106
maybeRotateActiveVSR();
@@ -110,25 +113,26 @@ public WriteResult addToManagedVSR(ParquetDocumentInput document) throws IOExcep
110113
}
111114

112115
public String flush(FlushIn flushIn) throws IOException {
113-
logger.info("Flush called for {}, row count: {}", fileName, managedVSR.getRowCount());
116+
ManagedVSR currentVSR = managedVSR.get();
117+
logger.info("Flush called for {}, row count: {}", fileName, currentVSR.getRowCount());
114118
try {
115119
// Only flush if we have data
116-
if (managedVSR.getRowCount() == 0) {
120+
if (currentVSR.getRowCount() == 0) {
117121
logger.debug("No data to flush for {}, returning null", fileName);
118122
return null;
119123
}
120124

121125
// Transition VSR to FROZEN state before flushing
122-
managedVSR.setState(VSRState.FROZEN);
123-
logger.info("Flushing {} rows for {}", managedVSR.getRowCount(), fileName);
126+
currentVSR.setState(VSRState.FROZEN);
127+
logger.info("Flushing {} rows for {}", currentVSR.getRowCount(), fileName);
124128

125129
// Transition to FLUSHING state
126-
managedVSR.setState(VSRState.FLUSHING);
130+
currentVSR.setState(VSRState.FLUSHING);
127131

128-
// Direct native call - write the managed VSR data
129-
try (ArrowExport export = managedVSR.exportToArrow()) {
130-
RustBridge.write(fileName, export.getArrayAddress(), export.getSchemaAddress());
131-
RustBridge.closeWriter(fileName);
132+
// Write through native writer handle
133+
try (ArrowExport export = currentVSR.exportToArrow()) {
134+
writer.write(export.getArrayAddress(), export.getSchemaAddress());
135+
writer.close();
132136
}
133137
logger.info("Successfully flushed data for {}", fileName);
134138

@@ -139,20 +143,15 @@ public String flush(FlushIn flushIn) throws IOException {
139143
}
140144
}
141145

146+
@Override
142147
public void close() {
143148
try {
144-
// Direct native calls
145-
try {
146-
RustBridge.closeWriter(fileName);
147-
RustBridge.flushToDisk(fileName);
148-
} catch (IOException e) {
149-
logger.warn("Failed to close/flush writer for {}: {}", fileName, e.getMessage(), e);
149+
if (writer != null) {
150+
writer.flush();
151+
writer.close();
150152
}
151-
152-
// Close VSR Pool
153153
vsrPool.close();
154-
managedVSR = null;
155-
154+
managedVSR.set(null);
156155
} catch (Exception e) {
157156
logger.error("Error during close for {}: {}", fileName, e.getMessage(), e);
158157
}
@@ -184,7 +183,7 @@ public void maybeRotateActiveVSR() throws IOException {
184183
// Write the frozen VSR data immediately
185184
frozenVSR.setState(VSRState.FLUSHING);
186185
try (ArrowExport export = frozenVSR.exportToArrow()) {
187-
RustBridge.write(fileName, export.getArrayAddress(), export.getSchemaAddress());
186+
writer.write(export.getArrayAddress(), export.getSchemaAddress());
188187
}
189188

190189
logger.debug("Successfully wrote frozen VSR data for {}", fileName);
@@ -196,17 +195,19 @@ public void maybeRotateActiveVSR() throws IOException {
196195
logger.warn("Rotation occurred but no frozen VSR found for {}", fileName);
197196
}
198197

199-
// Update to new active VSR
200-
managedVSR = vsrPool.getActiveVSR();
201-
if (managedVSR == null) {
198+
// Update to new active VSR atomically with field vector map
199+
ManagedVSR oldVSR = managedVSR.get();
200+
ManagedVSR newVSR = vsrPool.getActiveVSR();
201+
if (newVSR == null) {
202202
throw new IOException("No active VSR available after rotation");
203203
}
204+
updateVSRAndReinitialize(oldVSR, newVSR);
204205

205206
// Reinitialize field vector map with new VSR
206207
reinitializeFieldVectorMap();
207208

208209
logger.debug("VSR rotation completed for {}, new active VSR: {}, row count: {}",
209-
fileName, managedVSR.getId(), managedVSR.getRowCount());
210+
fileName, newVSR.getId(), newVSR.getRowCount());
210211
}
211212
} catch (IOException e) {
212213
logger.error("Error during VSR rotation for {}: {}", fileName, e.getMessage(), e);
@@ -226,17 +227,24 @@ private void checkAndHandleVSRRotation() throws IOException {
226227
ManagedVSR currentActive = vsrPool.getActiveVSR();
227228

228229
// Check if we got a different VSR (rotation occurred)
229-
if (currentActive != managedVSR) {
230+
ManagedVSR oldVSR = managedVSR.get();
231+
if (currentActive != oldVSR) {
230232
logger.debug("VSR rotation detected for {}, updating references", fileName);
231233

232-
// Update the managed VSR reference
233-
managedVSR = currentActive;
234-
235-
// Reinitialize field vector map with new VSR
236-
reinitializeFieldVectorMap();
234+
// Update the managed VSR reference atomically with field vector map
235+
updateVSRAndReinitialize(oldVSR, currentActive);
237236

238237
// Note: Writer initialization is not needed per VSR as it's per file
239-
logger.debug("VSR rotation completed for {}, new row count: {}", fileName, managedVSR.getRowCount());
238+
logger.debug("VSR rotation completed for {}, new row count: {}", fileName, currentActive.getRowCount());
239+
}
240+
}
241+
242+
/**
243+
* Atomically updates managedVSR and reinitializes field vector map.
244+
*/
245+
private void updateVSRAndReinitialize(ManagedVSR oldVSR, ManagedVSR newVSR) {
246+
if (managedVSR.compareAndSet(oldVSR, newVSR)) {
247+
reinitializeFieldVectorMap();
240248
}
241249
}
242250

@@ -253,7 +261,7 @@ private void initializeFieldVectorMap() {
253261
fieldVectorMap = new HashMap<>();
254262
for (Field field : schema.getFields()) {
255263
String fieldName = field.getName();
256-
FieldVector fieldVector = managedVSR.getVector(fieldName);
264+
FieldVector fieldVector = managedVSR.get().getVector(fieldName);
257265
// Vector is already properly typed from ManagedVSR.getVector()
258266
fieldVectorMap.put(fieldName, fieldVector);
259267
}
@@ -265,6 +273,6 @@ private void initializeFieldVectorMap() {
265273
* @return The current managed VSR instance
266274
*/
267275
public ManagedVSR getActiveManagedVSR() {
268-
return managedVSR;
276+
return managedVSR.get();
269277
}
270278
}

0 commit comments

Comments
 (0)