Skip to content

Commit 4c13cf5

Browse files
committed
Removing atomic handling for ManagedVSR and cleaning up state transitions
Signed-off-by: Raghuvansh Raj <[email protected]>
1 parent ad3dd8b commit 4c13cf5

File tree

4 files changed

+98
-181
lines changed

4 files changed

+98
-181
lines changed
Lines changed: 88 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package com.parquet.parquetdataformat.vsr;
22

33
import com.parquet.parquetdataformat.bridge.ArrowExport;
4-
import org.apache.arrow.memory.ArrowBuf;
54
import org.apache.arrow.memory.BufferAllocator;
6-
import org.apache.arrow.memory.RootAllocator;
7-
import org.apache.arrow.vector.BigIntVector;
85
import org.apache.arrow.vector.VectorSchemaRoot;
96
import org.apache.arrow.vector.FieldVector;
107
import org.apache.arrow.c.ArrowArray;
@@ -13,15 +10,12 @@
1310
import org.apache.logging.log4j.LogManager;
1411
import org.apache.logging.log4j.Logger;
1512

16-
import java.util.concurrent.atomic.AtomicReference;
17-
import java.util.concurrent.locks.ReadWriteLock;
18-
import java.util.concurrent.locks.ReentrantReadWriteLock;
19-
2013
import static org.apache.arrow.vector.BitVectorHelper.byteIndex;
14+
import org.apache.arrow.vector.types.pojo.Schema;
2115

2216
/**
2317
* Managed wrapper around VectorSchemaRoot that handles state transitions
24-
* and provides thread-safe access for the ACTIVE/FROZEN lifecycle.
18+
* for the ACTIVE/FROZEN/CLOSED lifecycle with controlled access methods.
2519
*/
2620
public class ManagedVSR implements AutoCloseable {
2721

@@ -30,43 +24,23 @@ public class ManagedVSR implements AutoCloseable {
3024
private final String id;
3125
private final VectorSchemaRoot vsr;
3226
private final BufferAllocator allocator;
33-
private final AtomicReference<VSRState> state;
34-
private final ReadWriteLock lock;
35-
private final long createdTime;
27+
private VSRState state;
3628

3729

38-
public ManagedVSR(String id, VectorSchemaRoot vsr, BufferAllocator allocator) {
30+
public ManagedVSR(String id, Schema schema, BufferAllocator allocator) {
3931
this.id = id;
40-
this.vsr = vsr;
32+
this.vsr = VectorSchemaRoot.create(schema, allocator);
4133
this.allocator = allocator;
42-
this.state = new AtomicReference<>(VSRState.ACTIVE);
43-
this.lock = new ReentrantReadWriteLock();
44-
this.createdTime = System.currentTimeMillis();
45-
}
46-
47-
/**
48-
* Gets the underlying VectorSchemaRoot.
49-
* Should only be used when holding appropriate locks.
50-
*
51-
* @return VectorSchemaRoot instance
52-
*/
53-
public VectorSchemaRoot getVSR() {
54-
return vsr;
34+
this.state = VSRState.ACTIVE;
5535
}
5636

5737
/**
5838
* Gets the current row count in this VSR.
59-
* Thread-safe read operation.
6039
*
6140
* @return Number of rows currently in the VSR
6241
*/
6342
public int getRowCount() {
64-
lock.readLock().lock();
65-
try {
66-
return vsr.getRowCount();
67-
} finally {
68-
lock.readLock().unlock();
69-
}
43+
return vsr.getRowCount();
7044
}
7145

7246
/**
@@ -77,94 +51,109 @@ public int getRowCount() {
7751
* @throws IllegalStateException if VSR is not active or is immutable
7852
*/
7953
public void setRowCount(int rowCount) {
80-
lock.writeLock().lock();
81-
try {
82-
if (state.get() != VSRState.ACTIVE) {
83-
throw new IllegalStateException("Cannot modify VSR in state: " + state.get());
84-
}
85-
vsr.setRowCount(rowCount);
86-
} finally {
87-
lock.writeLock().unlock();
54+
if (state != VSRState.ACTIVE) {
55+
throw new IllegalStateException("Cannot modify VSR in state: " + state);
8856
}
57+
vsr.setRowCount(rowCount);
8958
}
9059

9160
/**
9261
* Gets a field vector by name.
93-
* Thread-safe read operation.
9462
*
9563
* @param fieldName Name of the field
9664
* @return FieldVector for the field, or null if not found
9765
*/
9866
public FieldVector getVector(String fieldName) {
99-
lock.readLock().lock();
100-
try {
101-
return vsr.getVector(fieldName);
102-
} finally {
103-
lock.readLock().unlock();
104-
}
67+
return vsr.getVector(fieldName);
10568
}
10669

10770
/**
10871
* Changes the state of this VSR.
10972
* Handles state transition logic and immutability.
73+
* This method is private to ensure controlled state transitions.
11074
*
11175
* @param newState New state to transition to
11276
*/
113-
public void setState(VSRState newState) {
114-
VSRState oldState = state.getAndSet(newState);
77+
private void setState(VSRState newState) {
78+
VSRState oldState = state;
79+
state = newState;
11580

11681
logger.debug("State transition: {} -> {} for VSR {}", oldState, newState, id);
11782
}
11883

84+
/**
85+
* Transitions the VSR from ACTIVE to FROZEN state.
86+
* This is the only way to freeze a VSR.
87+
*
88+
* @throws IllegalStateException if VSR is not in ACTIVE state
89+
*/
90+
public void moveToFrozen() {
91+
if (state != VSRState.ACTIVE) {
92+
throw new IllegalStateException(String.format(
93+
"Cannot freeze VSR %s: expected ACTIVE state but was %s", id, state));
94+
}
95+
setState(VSRState.FROZEN);
96+
}
97+
98+
/**
99+
* Transitions the VSR from FROZEN to CLOSED state.
100+
* This method is private and only called by close().
101+
*
102+
* @throws IllegalStateException if VSR is not in FROZEN state
103+
*/
104+
private void moveToClosed() {
105+
if (state != VSRState.FROZEN) {
106+
throw new IllegalStateException(String.format(
107+
"Cannot close VSR %s: expected FROZEN state but was %s", id, state));
108+
}
109+
setState(VSRState.CLOSED);
110+
111+
// Clean up resources
112+
if (vsr != null) {
113+
vsr.close();
114+
}
115+
if (allocator != null) {
116+
allocator.close();
117+
}
118+
}
119+
119120
/**
120121
* Gets the current state of this VSR.
121122
*
122123
* @return Current VSRState
123124
*/
124125
public VSRState getState() {
125-
return state.get();
126+
return state;
126127
}
127128

128129
/**
129130
* Exports this VSR to Arrow C Data Interface for Rust handoff.
130-
* Only allowed when VSR is FROZEN or FLUSHING.
131+
* Only allowed when VSR is FROZEN.
131132
*
132133
* @return ArrowExport containing ArrowArray and ArrowSchema
133134
* @throws IllegalStateException if VSR is not in correct state
134135
*/
135136
public ArrowExport exportToArrow() {
136-
VSRState currentState = state.get();
137-
if (currentState != VSRState.FROZEN &&
138-
currentState != VSRState.FLUSHING) {
139-
throw new IllegalStateException("Cannot export VSR in state: " + currentState);
137+
if (state != VSRState.FROZEN) {
138+
throw new IllegalStateException("Cannot export VSR in state: " + state + ". VSR must be FROZEN to export.");
140139
}
141140

142-
lock.readLock().lock();
143-
try {
144-
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
145-
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
141+
ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
142+
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
146143

147-
// Export the VectorSchemaRoot to C Data Interface
148-
Data.exportVectorSchemaRoot(allocator, vsr, null, arrowArray, arrowSchema);
144+
// Export the VectorSchemaRoot to C Data Interface
145+
Data.exportVectorSchemaRoot(allocator, vsr, null, arrowArray, arrowSchema);
149146

150-
return new ArrowExport(arrowArray, arrowSchema);
151-
} finally {
152-
lock.readLock().unlock();
153-
}
147+
return new ArrowExport(arrowArray, arrowSchema);
154148
}
155149

156150
public ArrowExport exportSchema() {
157-
lock.readLock().lock();
158-
try {
159-
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
151+
ArrowSchema arrowSchema = ArrowSchema.allocateNew(allocator);
160152

161-
// Export the VectorSchemaRoot to C Data Interface
162-
Data.exportSchema(allocator, vsr.getSchema(), null, arrowSchema);
153+
// Export the VectorSchemaRoot to C Data Interface
154+
Data.exportSchema(allocator, vsr.getSchema(), null, arrowSchema);
163155

164-
return new ArrowExport(null, arrowSchema);
165-
} finally {
166-
lock.readLock().unlock();
167-
}
156+
return new ArrowExport(null, arrowSchema);
168157
}
169158

170159
/**
@@ -173,8 +162,7 @@ public ArrowExport exportSchema() {
173162
* @return true if VSR cannot be modified
174163
*/
175164
public boolean isImmutable() {
176-
VSRState currentState = state.get();
177-
return currentState != VSRState.ACTIVE;
165+
return state != VSRState.ACTIVE;
178166
}
179167

180168

@@ -187,15 +175,6 @@ public String getId() {
187175
return id;
188176
}
189177

190-
/**
191-
* Gets the creation timestamp.
192-
*
193-
* @return Creation time in milliseconds
194-
*/
195-
public long getCreatedTime() {
196-
return createdTime;
197-
}
198-
199178
/**
200179
* Gets the associated BufferAllocator.
201180
*
@@ -207,25 +186,38 @@ public BufferAllocator getAllocator() {
207186

208187
/**
209188
* Closes this VSR and releases all resources.
189+
* This is the only way to transition a VSR to CLOSED state.
190+
* VSR must be in FROZEN state before it can be closed.
191+
*
192+
* @throws IllegalStateException if VSR is in ACTIVE state (must freeze first)
210193
*/
211194
@Override
212195
public void close() {
213-
lock.writeLock().lock();
214-
try {
215-
if (state.get() != VSRState.CLOSED) {
216-
state.set(VSRState.CLOSED);
217-
vsr.close();
218-
allocator.close();
219-
}
220-
} finally {
221-
lock.writeLock().unlock();
196+
// If already CLOSED, do nothing (idempotent)
197+
if (state == VSRState.CLOSED) {
198+
return;
199+
}
200+
201+
// If ACTIVE, must freeze first
202+
if (state == VSRState.ACTIVE) {
203+
throw new IllegalStateException(String.format(
204+
"Cannot close VSR %s: VSR is still ACTIVE. Must freeze VSR before closing.", id));
205+
}
206+
207+
// If FROZEN, transition to CLOSED
208+
if (state == VSRState.FROZEN) {
209+
moveToClosed();
210+
} else {
211+
// This should never happen with current states, but defensive programming
212+
throw new IllegalStateException(String.format(
213+
"Cannot close VSR %s: unexpected state %s", id, state));
222214
}
223215
}
224216

225217

226218
@Override
227219
public String toString() {
228220
return String.format("ManagedVSR{id='%s', state=%s, rows=%d, immutable=%s}",
229-
id, state.get(), getRowCount(), isImmutable());
221+
id, state, getRowCount(), isImmutable());
230222
}
231223
}

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/vsr/VSRManager.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,9 @@ public String flush(FlushIn flushIn) throws IOException {
119119
}
120120

121121
// Transition VSR to FROZEN state before flushing
122-
managedVSR.setState(VSRState.FROZEN);
122+
managedVSR.moveToFrozen();
123123
logger.info("Flushing {} rows for {}", managedVSR.getRowCount(), fileName);
124124

125-
// Transition to FLUSHING state
126-
managedVSR.setState(VSRState.FLUSHING);
127-
128125
// Direct native call - write the managed VSR data
129126
try (ArrowExport export = managedVSR.exportToArrow()) {
130127
RustBridge.write(fileName, export.getArrayAddress(), export.getSchemaAddress());
@@ -182,7 +179,6 @@ public void maybeRotateActiveVSR() throws IOException {
182179
frozenVSR.getId(), frozenVSR.getRowCount(), fileName);
183180

184181
// Write the frozen VSR data immediately
185-
frozenVSR.setState(VSRState.FLUSHING);
186182
try (ArrowExport export = frozenVSR.exportToArrow()) {
187183
RustBridge.write(fileName, export.getArrayAddress(), export.getSchemaAddress());
188184
}

0 commit comments

Comments
 (0)