Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 68 additions & 10 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ import {
type BatchStartInfo,
DuplicateBatchDetector,
ensureContentsDeserialized,
type IBatchCheckpoint,
OpCompressor,
OpDecompressor,
OpGroupingManager,
Expand Down Expand Up @@ -3542,29 +3541,29 @@ export class ContainerRuntime
* {@inheritDoc @fluidframework/runtime-definitions#IContainerRuntimeBase.orderSequentially}
*/
public orderSequentially<T>(callback: () => T): T {
let checkpoint: IBatchCheckpoint | undefined;
let stageControls: StageControlsInternal | undefined;
let checkpointCreated = false;
if (this.mc.config.getBoolean("Fluid.ContainerRuntime.EnableRollback") === true) {
if (!this.batchRunner.running && !this.inStagingMode) {
stageControls = this.enterStagingMode();
}
// Note: we are not touching any batches other than mainBatch here, for two reasons:
// Create a checkpoint to enable rollback if needed
// Note: we are only checkpointing the mainBatch for two reasons:
// 1. It would not help, as other batches are flushed independently from main batch.
// 2. There is no way to undo process of data store creation, blob creation, ID compressor ops, or other things tracked by other batches.
checkpoint = this.outbox.getBatchCheckpoints().mainBatch;
if (this.inStagingMode) {
this.stageControls?.checkpoint();
checkpointCreated = true;
}
}
const result = this.batchRunner.run(() => {
try {
return callback();
} catch (error) {
if (checkpoint) {
if (checkpointCreated) {
// This will throw and close the container if rollback fails
try {
checkpoint.rollback((message: LocalBatchMessage) =>
// These changes are staged since we entered staging mode above
this.rollbackStagedChange(message.runtimeOp, message.localOpMetadata),
);
this.updateDocumentDirtyState();
this.stageControls?.rollbackCheckpoint();
stageControls?.discardChanges();
stageControls = undefined;
} catch (error_) {
Expand Down Expand Up @@ -3657,6 +3656,10 @@ export class ContainerRuntime
}
};

// Track checkpoints for rollback support
// We flush the outbox on each checkpoint, so we only need to track PSM message count
const checkpointStack: number[] = [];

const stageControls: StageControlsInternal = {
discardChanges: () =>
exitStagingMode(() => {
Expand All @@ -3677,6 +3680,61 @@ export class ContainerRuntime
});
});
},
checkpoint: () => {
// Flush the outbox to ensure all messages are in the PendingStateManager
// This simplifies rollback - we only need to track PSM state
// NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c
this.outbox.flush();

// Don't create empty checkpoints - only push if there are messages to potentially roll back
const psmMessageCount = this.pendingStateManager.pendingMessagesCount;
// Also don't create duplicate checkpoints - if the count matches the last checkpoint,
// there are no new changes since the last checkpoint
const lastCheckpoint = checkpointStack[checkpointStack.length - 1];
if (psmMessageCount > 0 && psmMessageCount !== lastCheckpoint) {
checkpointStack.push(psmMessageCount);
}
},
get checkpointCount(): number {
return checkpointStack.length;
},
rollbackCheckpoint: () => {
if (checkpointStack.length === 0) {
return;
}
const checkpointMessageCount = checkpointStack.pop();
assert(checkpointMessageCount !== undefined, "Checkpoint should be defined"); // Rollback to the checkpoint by popping and rolling back messages added since the checkpoint
try {
// Flush the outbox to ensure all messages are in PSM before rolling back
// NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c
this.outbox.flush();

// Calculate how many messages were added since the checkpoint
const currentPsmCount = this.pendingStateManager.pendingMessagesCount;
const messagesToRollback = currentPsmCount - checkpointMessageCount;

if (messagesToRollback > 0) {
this.pendingStateManager.popStagedMessagesUpToCount(
messagesToRollback,
({ runtimeOp, localOpMetadata }) => {
this.rollbackStagedChange(runtimeOp, localOpMetadata);
},
);
}

this.updateDocumentDirtyState();
} catch (error) {
const error2 = wrapError(error, (message) => {
return DataProcessingError.create(
`RollbackError: ${message}`,
"checkpointRollback",
undefined,
) as DataProcessingError;
});
this.closeFn(error2);
throw error2;
}
},
};

this.stageControls = stageControls;
Expand Down
26 changes: 26 additions & 0 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,32 @@ export class PendingStateManager implements IDisposable {
0xb89 /* Shouldn't be any more staged messages */,
);
}

/**
* Pops a specific number of staged messages from the back, invoking the callback on each in LIFO order.
* Used for checkpoint rollback where we only want to rollback messages added since the checkpoint.
*/
public popStagedMessagesUpToCount(
count: number,
callback: (
stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage },
) => void,
): void {
let popped = 0;
while (!this.pendingMessages.isEmpty() && popped < count) {
const stagedMessage = this.pendingMessages.peekBack();
if (stagedMessage?.batchInfo.staged === true) {
this.pendingMessages.pop();
popped++;

if (hasTypicalRuntimeOp(stagedMessage)) {
callback(stagedMessage);
}
} else {
break; // no more staged messages
}
}
}
}

/**
Expand Down
224 changes: 224 additions & 0 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4440,6 +4440,230 @@ describe("Runtime", () => {

assert.equal(containerRuntime.isDirty, false, "Runtime should not be dirty anymore");
});

describe("Checkpoints", () => {
it("can create and track checkpoints", () => {
stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

assert.equal(controls.checkpointCount, 0, "Should start with 0 checkpoints");

// Add a message before creating checkpoint (empty checkpoints not created)
submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
controls.checkpoint();
assert.equal(controls.checkpointCount, 1, "Should have 1 checkpoint");

submitDataStoreOp(containerRuntime, "2", genTestDataStoreMessage("op-2"));
controls.checkpoint();
assert.equal(controls.checkpointCount, 2, "Should have 2 checkpoints");
controls.discardChanges();
});
it("rollbackCheckpoint rolls back changes made after checkpoint", () => {
const channelCollectionStub = stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

// Make changes before checkpoint
submitDataStoreOp(
containerRuntime,
"1",
genTestDataStoreMessage("before-checkpoint"),
);

// Create checkpoint
controls.checkpoint();

// Make changes after checkpoint
submitDataStoreOp(
containerRuntime,
"2",
genTestDataStoreMessage("after-checkpoint-1"),
);
submitDataStoreOp(
containerRuntime,
"3",
genTestDataStoreMessage("after-checkpoint-2"),
);

// Discard checkpoint - should only rollback ops 2 and 3
controls.rollbackCheckpoint();

// Should have rolled back ops 2 and 3 in LIFO order
const rollbackCalls = channelCollectionStub.rollbackDataStoreOp.getCalls();
assert.equal(rollbackCalls.length, 2, "Should rollback 2 ops");
assert.equal(rollbackCalls[0].args[0].address, "3", "Should rollback op 3 first");
assert.equal(rollbackCalls[1].args[0].address, "2", "Should rollback op 2 second");

// Commit should only submit op 1
controls.commitChanges();
assert.equal(submittedOps.length, 1, "Should only submit op 1");
});

it("rollbackCheckpoint works with flushed messages", () => {
const channelCollectionStub = stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

// Add an initial message and create checkpoint
submitDataStoreOp(containerRuntime, "0", genTestDataStoreMessage("op-0"));
controls.checkpoint();

// Make more changes and flush to PSM
submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
containerRuntime.flush(); // Move to PSM

submitDataStoreOp(containerRuntime, "2", genTestDataStoreMessage("op-2"));
// Don't flush - op 2 stays in outbox

// Discard checkpoint - should rollback both ops 1 and 2 (everything after the checkpoint)
controls.rollbackCheckpoint();

const rollbackCalls = channelCollectionStub.rollbackDataStoreOp.getCalls();
assert.equal(rollbackCalls.length, 2, "Should rollback both ops");

controls.discardChanges();
});
it("multiple checkpoints work correctly (nested)", () => {
const channelCollectionStub = stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
controls.checkpoint(); // Checkpoint 1

submitDataStoreOp(containerRuntime, "2", genTestDataStoreMessage("op-2"));
controls.checkpoint(); // Checkpoint 2

submitDataStoreOp(containerRuntime, "3", genTestDataStoreMessage("op-3"));
controls.checkpoint(); // Checkpoint 3

submitDataStoreOp(containerRuntime, "4", genTestDataStoreMessage("op-4"));

// Discard checkpoint 3 - should only rollback op 4
controls.rollbackCheckpoint();
assert.equal(
channelCollectionStub.rollbackDataStoreOp.getCalls().length,
1,
"Should rollback 1 op",
);

// Discard checkpoint 2 - should rollback op 3
controls.rollbackCheckpoint();
assert.equal(
channelCollectionStub.rollbackDataStoreOp.getCalls().length,
2,
"Should rollback 2 ops total",
);

// Commit should have ops 1 and 2
controls.commitChanges();
assert.equal(submittedOps.length, 2, "Should submit 2 ops");
});

it("discarding checkpoint with no new changes is safe", () => {
stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();
submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
controls.checkpoint();

// No new ops added

// Should not throw
assert.doesNotThrow(() => controls.rollbackCheckpoint());

controls.commitChanges();
assert.equal(submittedOps.length, 1, "Should submit the op before checkpoint");
});

it("cannot discard checkpoint when none exist", () => {
const controls = containerRuntime.enterStagingMode();

// Should not throw
assert.doesNotThrow(
() => controls.rollbackCheckpoint(),
"Should not throw when no checkpoint exists",
);

controls.discardChanges();
});

it("does not create empty checkpoints", () => {
stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

// Create checkpoint without any messages - should not create a checkpoint
controls.checkpoint();
assert.equal(controls.checkpointCount, 0, "Should not create empty checkpoint");

// Add a message and checkpoint - should create a checkpoint
submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
controls.checkpoint();
assert.equal(controls.checkpointCount, 1, "Should create checkpoint with message");

controls.discardChanges();
});
it("checkpoint count decreases after rollbackCheckpoint", () => {
stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();
// Create checkpoints with actual messages so they're not empty
submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
controls.checkpoint();
submitDataStoreOp(containerRuntime, "2", genTestDataStoreMessage("op-2"));
controls.checkpoint();
assert.equal(controls.checkpointCount, 2, "Should have 2 checkpoints");

controls.rollbackCheckpoint();
assert.equal(controls.checkpointCount, 1, "Should have 1 checkpoint after discard");

controls.rollbackCheckpoint();
assert.equal(
controls.checkpointCount,
0,
"Should have 0 checkpoints after second discard",
);

controls.discardChanges();
});
it("does not create duplicate checkpoints when no changes made", () => {
stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

// Add a message and create first checkpoint
submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));
controls.checkpoint();
assert.equal(controls.checkpointCount, 1, "Should have 1 checkpoint");

// Try to create another checkpoint without adding new messages
controls.checkpoint();
assert.equal(
controls.checkpointCount,
1,
"Should still have 1 checkpoint, not create duplicate",
);

// Add another message and checkpoint - should create a new checkpoint
submitDataStoreOp(containerRuntime, "2", genTestDataStoreMessage("op-2"));
controls.checkpoint();
assert.equal(controls.checkpointCount, 2, "Should have 2 checkpoints now");

controls.discardChanges();
});
it("checkpoint flushes outbox", () => {
stubChannelCollection(containerRuntime);
const controls = containerRuntime.enterStagingMode();

submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1"));

// Before checkpoint, op is in outbox
assert.equal(submittedOps.length, 0, "Op should not be submitted yet");

controls.checkpoint();

// After checkpoint, op should be flushed to PSM (but not submitted to service)
// We can verify this indirectly - the PSM message count should be non-zero
assert.equal(submittedOps.length, 0, "Op still shouldn't be submitted to service");

controls.commitChanges();
assert.equal(submittedOps.length, 1, "Op should be submitted after commit");
});
});
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,11 @@ export type PackagePath = readonly string[];

// @alpha @sealed @legacy
export interface StageControlsAlpha {
readonly checkpoint: () => void;
readonly checkpointCount: number;
readonly commitChanges: () => void;
readonly discardChanges: () => void;
readonly rollbackCheckpoint: () => void;
}

// @beta @legacy (undocumented)
Expand Down
Loading
Loading