Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions doc/adr/0009-getobject-action-field.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# 9. GetObject action field for reliable state synchronization

Date: 2026-02-26

## Status

Accepted (supersedes ADR 8)

## Context

The diff-based GetObject protocol tracks state on both sides: the sender maintains a `remoteObjects` map recording what the receiver last successfully received, and uses this as the baseline for computing diffs on subsequent transfers. This tracking is updated optimistically β€” the sender assumes the receiver consumed the data successfully once streaming completes.

When the receiver fails mid-deserialization (e.g., `ClassCastException` from an invalid AST node), the two sides go out of sync: the sender thinks the receiver has version N, but the receiver discarded it.

### The Print problem

This manifests concretely with `Print`. After a composite recipe runs, Java computes diffs by printing both the `before` and `after` trees. For RPC-based languages (Python, JavaScript), printing works as follows:

1. Java sends a Print RPC to the remote (Python)
2. Python's `handle_print` calls `get_object_from_java(tree_id)`, sending GetObject back to Java
3. Java's `GetObject.Handler` computes a diff against `remoteObjects[id]` (its belief of what Python has) and streams the result

If a prior Visit failed in the *reverse* direction (Java requesting a modified tree from Python), the cleanup at `RewriteRpc.getObject()` only removes Java's **requester-side** `remoteObjects` entry. Java's **handler-side** `remoteObjects` (used by `GetObject.Handler` when Python requests from Java) may still reflect a state that Python no longer has. The subsequent Print-triggered GetObject computes a diff against the wrong baseline, producing corrupt data or errors.

### Fundamental issue: unilateral state updates

The root cause is that `remoteObjects` is updated unilaterally by the sender without confirmation from the receiver. If the receiver fails to deserialize, the sender has no way to learn this β€” the stale state persists and affects all subsequent operations in either direction.

## Decision

Add an `action` field to the GetObject request. This nullable string field allows the receiver to send corrective actions back to the handler. When null, the request is a normal data-transfer request.

### The `revert` action

When the receiver fails to deserialize a transferred object, it sends a GetObject request with `action: "revert"`. The handler:

1. Restores `remoteObjects[id]` to the pre-transfer value (stored in an `actionBaseline` map at transfer start)
2. Restores `localObjects[id]` to the same pre-transfer value β€” this ensures the failed modification is discarded rather than retried with the same broken diff
3. Cancels any in-progress batch send for that ID

This reverts both sides to a consistent, known-good state. The receiver also clears its own `remoteObjects[id]` tracking, so the next transfer starts fresh.

### Optimistic updates with rollback

Unlike a deferred-commit (ACK/NACK) approach, `remoteObjects` is updated optimistically when streaming completes β€” no extra round-trip is needed on the success path. The `actionBaseline` map stores the pre-transfer value so that `revert` can roll it back on the failure path.

### Extensibility

The `action` field is designed to support future corrective actions beyond `revert`:

- `"clear"` / `"remove"` β€” tell the handler to drop all tracking for this ID (e.g., when the caller knows the object is no longer needed)
- `"abort"` β€” cancel an in-progress batched transfer mid-stream
- `"reset"` β€” force a full re-serialization

### Protocol flow

**Success path** (no extra round-trip):
1. Handler streams batches, optimistically updates `remoteObjects[id] = after`
2. Receiver processes batches, updates its own `remoteObjects` and `localObjects`
3. Done β€” no confirmation needed

**Failure path** (one extra round-trip):
1. Handler streams batches, optimistically updates `remoteObjects[id] = after`
2. Receiver fails to deserialize
3. Receiver sends `GetObject(id, sourceFileType, action="revert")`
4. Handler restores `remoteObjects[id]` and `localObjects[id]` from `actionBaseline`
5. Handler returns empty list

### Relationship to ADR 8 (`reset` flag)

The `reset` flag from ADR 8 is removed. The `revert` action makes it unnecessary β€” instead of the receiver hinting "I lost sync" on its *next* request, it explicitly tells the handler to roll back immediately after failure.

## Consequences

**Positive:**
- No extra round-trip on the success path (unlike an ACK-based approach)
- On failure, reverts both `remoteObjects` and `localObjects` to a consistent pre-transfer state, preventing cascading errors
- Fixes the Print problem: the handler's `remoteObjects` is rolled back before any Print-triggered GetObject can observe stale state
- Extensible: the `action` field can carry future corrective actions without protocol changes
- Works for all GetObject consumers (Visit, Print, Generate) in both directions

**Negative:**
- Handler must store pre-transfer baselines (`actionBaseline` map) for potential rollback β€” one extra object reference per active transfer
- Reverting `localObjects` means the handler discards its local modification on failure, which is a deliberate policy choice: if the receiver can't deserialize it, retrying would just fail again

**Trade-offs:**
- The `actionBaseline` entries persist until overwritten by the next transfer for the same ID, rather than being cleaned up immediately on success. The memory cost is bounded by the number of active object IDs and is comparable to `remoteObjects` itself
- The inline-Visit optimization (bundling tree data with Visit request/response to eliminate GetObject round-trips) remains a complementary performance improvement that could be pursued independently
4 changes: 4 additions & 0 deletions doc/adr/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@
* [2. Naming recipes](0002-recipe-naming.md)
* [3. OSS contributor's guidelines](0003-oss-contributors.md)
* [4. Library migration recipe conventions](0004-library-migration-conventions.md)
* [5. Parser and LST conventions](0005-parser-lst-conventions.md)
* [6. Recipe marketplace CSV format](0006-recipe-marketplace-csv-format.md)
* [7. JavaScript templating engine enhancements](0007-javascript-templating-enhancements.md)
* [9. GetObject action field for error recovery](0009-getobject-action-field.md)
33 changes: 26 additions & 7 deletions rewrite-core/src/main/java/org/openrewrite/rpc/RewriteRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -469,22 +469,41 @@ public <T> T getObject(String id, @Nullable String sourceFileType) {

RpcReceiveQueue q = new RpcReceiveQueue(
remoteRefs,
() -> send("GetObject", new GetObject(id, sourceFileType), GetObjectResponse.class),
() -> send("GetObject", new GetObject(id, sourceFileType, null), GetObjectResponse.class),
sourceFileType,
log.get()
);
Object remoteObject;
try {
remoteObject = q.receive(before, null);
if (q.take().getState() != END_OF_OBJECT) {
throw new IllegalStateException("Expected END_OF_OBJECT");
}
} catch (Exception e) {
// Reset our tracking of the remote state so the next interaction
// forces a full object sync (ADD) instead of a delta (CHANGE).
remoteObjects.remove(id);
// Tell the handler to revert both remoteObjects and localObjects
// to the pre-transfer state
try {
send("GetObject", new GetObject(id, sourceFileType, "revert"), GetObjectResponse.class);
} catch (Exception revertError) {
PrintStream logFile = log.get();
if (logFile != null) {
revertError.printStackTrace(logFile);
}
}
// Revert our tracking to match the handler's reverted state.
// The handler restored remoteObjects[id] to the pre-transfer
// value, so the requester must do the same to stay in sync.
if (before != null) {
remoteObjects.put(id, before);
} else {
remoteObjects.remove(id);
}
// Back out refs registered during this failed receive
for (Integer refId : q.getNewRefIds()) {
remoteRefs.remove(refId);
}
throw e;
}
if (q.take().getState() != END_OF_OBJECT) {
throw new IllegalStateException("Expected END_OF_OBJECT");
}

//noinspection ConstantValue
if (remoteObject != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class RpcReceiveQueue {
private final Supplier<List<RpcObjectData>> pull;
private final @Nullable String sourceFileType;
private final @Nullable PrintStream log;
private final List<Integer> newRefIds = new ArrayList<>();

public RpcReceiveQueue(Map<Integer, Object> refs, Supplier<List<RpcObjectData>> pull,
@Nullable String sourceFileType, @Nullable PrintStream log) {
Expand All @@ -44,6 +45,13 @@ public RpcReceiveQueue(Map<Integer, Object> refs, Supplier<List<RpcObjectData>>
this.pull = pull;
}

/**
* @return the ref IDs that were newly registered during this receive.
*/
public List<Integer> getNewRefIds() {
return newRefIds;
}

public RpcObjectData take() {
if (batch.isEmpty()) {
List<RpcObjectData> data = pull.get();
Expand Down Expand Up @@ -123,6 +131,7 @@ public <T> T receive(@Nullable T before, @Nullable UnaryOperator<T> onChange) {
// immutable updates because of its cyclic nature, the before instance will ultimately
// be the same as the after instance below.
refs.put(ref, before);
newRefIds.add(ref);
}
}
// Intentional fall-through...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class RpcSendQueue {
private final IdentityHashMap<Object, Integer> refs;
private final @Nullable String sourceFileType;
private final boolean trace;
private final List<Object> newRefObjects = new ArrayList<>();

private @Nullable Object before;

Expand All @@ -46,6 +47,13 @@ public RpcSendQueue(int batchSize, ThrowingConsumer<List<RpcObjectData>> drain,
this.trace = trace;
}

/**
* @return the objects that were newly registered as refs during this send.
*/
public List<Object> getNewRefObjects() {
return newRefObjects;
}

public void put(RpcObjectData rpcObjectData) {
batch.add(rpcObjectData);
if (batch.size() == batchSize) {
Expand Down Expand Up @@ -176,6 +184,7 @@ private void add(Object after, @Nullable Runnable onChange) {
}
ref = refs.size() + 1;
refs.put(afterVal, ref);
newRefObjects.add(afterVal);
}
RpcCodec<Object> afterCodec = RpcCodec.forInstance(afterVal, sourceFileType);
put(new RpcObjectData(ADD, getValueType(afterVal),
Expand Down
113 changes: 99 additions & 14 deletions rewrite-core/src/main/java/org/openrewrite/rpc/request/GetObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@
import org.openrewrite.rpc.RpcSendQueue;

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static java.util.Collections.emptyList;
import static org.openrewrite.rpc.RpcObjectData.State.DELETE;
import static org.openrewrite.rpc.RpcObjectData.State.END_OF_OBJECT;

Expand All @@ -42,6 +41,20 @@ public class GetObject implements RpcRequest {
@Nullable
String sourceFileType;

/**
* An action for the handler to perform instead of a normal data transfer.
* When null, this is a normal data-transfer request.
* <p>
* Supported actions:
* <ul>
* <li>"revert" β€” sent by the receiver after a deserialization failure.
* The handler reverts both {@code remoteObjects} and {@code localObjects}
* for this ID to the pre-transfer state.</li>
* </ul>
*/
@Nullable
String action;

@RequiredArgsConstructor
public static class Handler extends JsonRpcMethod<GetObject> {
private static final ExecutorService forkJoin = ForkJoinPool.commonPool();
Expand All @@ -59,37 +72,107 @@ public static class Handler extends JsonRpcMethod<GetObject> {
private final AtomicReference<PrintStream> log;
private final Supplier<Boolean> traceGetObject;

private final Map<String, BlockingQueue<List<RpcObjectData>>> inProgressGetRpcObjects = new ConcurrentHashMap<>();
@RequiredArgsConstructor
private static class InProgressSend {
final BlockingQueue<List<RpcObjectData>> queue;
final @Nullable Object before;
final AtomicBoolean cancelled;
}

private final Map<String, InProgressSend> inProgressGetRpcObjects = new ConcurrentHashMap<>();

/**
* Stores the pre-transfer {@code remoteObjects} value for each in-flight
* or recently completed transfer. Used by the "revert" action to restore
* both {@code remoteObjects} and {@code localObjects} to the state before
* the transfer started.
*/
private final Map<String, @Nullable Object> actionBaseline = new HashMap<>();

/**
* Tracks objects newly registered as refs during each in-flight transfer.
* On revert or sender-side failure, these are removed from {@code localRefs}.
* On success (all data consumed), the entry is cleared.
*/
private final Map<String, List<Object>> pendingNewRefs = new ConcurrentHashMap<>();

@Override
protected List<RpcObjectData> handle(GetObject request) throws Exception {
String action = request.getAction();
if (action != null) {
if ("revert".equals(action)) {
String id = request.getId();
InProgressSend stale = inProgressGetRpcObjects.remove(id);
if (stale != null) {
stale.cancelled.set(true);
}
if (actionBaseline.containsKey(id)) {
Object before = actionBaseline.remove(id);
if (before != null) {
remoteObjects.put(id, before);
localObjects.put(id, before);
} else {
remoteObjects.remove(id);
localObjects.remove(id);
}
}
}
return emptyList();
}

Object after = localObjects.get(request.getId());

if (after == null) {
// Clean up any stale in-progress send for this ID
InProgressSend stale = inProgressGetRpcObjects.remove(request.getId());
if (stale != null) {
stale.cancelled.set(true);
}

List<RpcObjectData> deleted = new ArrayList<>(2);
deleted.add(new RpcObjectData(DELETE, null, null, null, traceGetObject.get()));
deleted.add(new RpcObjectData(END_OF_OBJECT, null, null, null, traceGetObject.get()));
return deleted;
}

BlockingQueue<List<RpcObjectData>> q = inProgressGetRpcObjects.computeIfAbsent(request.getId(), id -> {
Object currentBefore = remoteObjects.get(request.getId());

InProgressSend inProgress = inProgressGetRpcObjects.computeIfAbsent(request.getId(), id -> {
// Save the pre-transfer baseline for potential revert
actionBaseline.put(id, currentBefore);

BlockingQueue<List<RpcObjectData>> batch = new ArrayBlockingQueue<>(1);
Object before = remoteObjects.get(id);
AtomicBoolean cancelled = new AtomicBoolean(false);

RpcSendQueue sendQueue = new RpcSendQueue(batchSize.get(), batch::put, localRefs, request.getSourceFileType(), traceGetObject.get());
forkJoin.submit(() -> {
try {
sendQueue.send(after, before, null);
sendQueue.send(after, currentBefore, null);

// Track newly registered refs for potential revert
if (!sendQueue.getNewRefObjects().isEmpty()) {
pendingNewRefs.put(id, sendQueue.getNewRefObjects());
}

// All the data has been sent, and the remote should have received
// the full tree, so update our understanding of the remote state
// of this tree.
remoteObjects.put(id, after);
// Optimistically update remoteObjects β€” the receiver is
// expected to send action="revert" if deserialization fails,
// which will roll this back.
if (!cancelled.get()) {
remoteObjects.put(id, after);
}
} catch (Throwable t) {
// Reset our tracking of the remote state so the next interaction
// forces a full object sync (ADD) instead of a delta (CHANGE)
// against the stale, partially-sent baseline.
remoteObjects.remove(id);
// Remove the baseline so a subsequent "revert" from the
// receiver doesn't restore the entry we just removed.
actionBaseline.remove(id);
// Back out refs registered during this failed send
for (Object obj : sendQueue.getNewRefObjects()) {
localRefs.remove(obj);
}
pendingNewRefs.remove(id);
PrintStream logFile = log.get();
//noinspection ConstantValue
if (logFile != null) {
Expand All @@ -101,12 +184,14 @@ protected List<RpcObjectData> handle(GetObject request) throws Exception {
}
return 0;
});
return batch;
return new InProgressSend(batch, currentBefore, cancelled);
});

List<RpcObjectData> batch = q.take();
List<RpcObjectData> batch = inProgress.queue.take();
if (batch.get(batch.size() - 1).getState() == END_OF_OBJECT) {
inProgressGetRpcObjects.remove(request.getId());
// Transfer completed successfully β€” refs are committed
pendingNewRefs.remove(request.getId());
}

return batch;
Expand Down
Loading