diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f495c7681fff..3e78b64c0cb1 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -243,7 +243,6 @@ import { type BatchStartInfo, DuplicateBatchDetector, ensureContentsDeserialized, - type IBatchCheckpoint, OpCompressor, OpDecompressor, OpGroupingManager, @@ -252,6 +251,7 @@ import { RemoteMessageProcessor, type OutboundBatch, type BatchResubmitInfo, + type IBatchCheckpoint, } from "./opLifecycle/index.js"; import { pkgVersion } from "./packageVersion.js"; import { @@ -267,6 +267,7 @@ import { validateLoaderCompatibility, } from "./runtimeLayerCompatState.js"; import { SignalTelemetryManager } from "./signalTelemetryProcessing.js"; +import { StagingModeManager } from "./stagingModeManager.js"; // These types are imported as types here because they are present in summaryDelayLoadedModule, which is loaded dynamically when required. import { aliasBlobName, @@ -560,8 +561,6 @@ export const defaultRuntimeHeaderData: Required = { allowTombstone: false, }; -const defaultStagingCommitOptions = { squash: false }; - /** * @deprecated * Untagged logger is unsupported going forward. There are old loaders with old ContainerContexts that only @@ -1464,6 +1463,7 @@ export class ContainerRuntime private readonly duplicateBatchDetector: DuplicateBatchDetector | undefined; private readonly outbox: Outbox; private readonly garbageCollector: IGarbageCollector; + private readonly stagingModeManager: StagingModeManager; private readonly channelCollection: ChannelCollection; private readonly remoteMessageProcessor: RemoteMessageProcessor; @@ -1615,8 +1615,8 @@ export class ContainerRuntime logger: this.baseLogger, namespace: "ContainerRuntime", properties: { - all: { - inStagingMode: this.inStagingMode, + error: { + inStagingMode: () => this.inStagingMode, }, }, }); @@ -2037,6 +2037,17 @@ export class ContainerRuntime opReentrancy: () => this.dataModelChangeRunner.running, }); + // Initialize the staging mode manager with all its dependencies + this.stagingModeManager = new StagingModeManager({ + pendingStateManager: this.pendingStateManager, + outbox: this.outbox, + getChannelCollection: () => this.channelCollection, + submitIdAllocationOpIfNeeded: this.submitIdAllocationOpIfNeeded.bind(this), + rollbackStagedChange: this.rollbackStagedChange.bind(this), + updateDocumentDirtyState: this.updateDocumentDirtyState.bind(this), + closeFn: this.closeFn, + }); + this._quorum = quorum; this._quorum.on("removeMember", (clientId: string) => { this.remoteMessageProcessor.clearPartialMessagesFor(clientId); @@ -3607,15 +3618,13 @@ export class ContainerRuntime return result; } - private stageControls: StageControlsInternal | undefined; - /** * If true, the ContainerRuntime is not submitting any new ops to the ordering service. * Ops submitted to the ContainerRuntime while in Staging Mode will be queued in the PendingStateManager, * either to be discarded or committed later (via the Stage Controls returned from enterStagingMode). */ public get inStagingMode(): boolean { - return this.stageControls !== undefined; + return this.stagingModeManager?.inStagingMode ?? false; } /** @@ -3625,64 +3634,12 @@ export class ContainerRuntime * @returns Controls for exiting Staging Mode. */ public enterStagingMode = (): StageControlsInternal => { - if (this.stageControls !== undefined) { - throw new UsageError("Already in staging mode"); - } if (this.attachState === AttachState.Detached) { throw new UsageError("Cannot enter staging mode while Detached"); } - // Make sure Outbox is empty before entering staging mode, - // since we mark whole batches as "staged" or not to indicate whether to submit them. - this.flush(); - - const exitStagingMode = (discardOrCommit: () => void): void => { - try { - // Final flush of any last staged changes - // NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c - this.outbox.flush(); - - this.stageControls = undefined; - - // During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops). - // Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode. - this.submitIdAllocationOpIfNeeded({ staged: false }); - discardOrCommit(); - - this.channelCollection.notifyStagingMode(false); - } catch (error) { - const normalizedError = normalizeError(error); - this.closeFn(normalizedError); - throw normalizedError; - } - }; - - const stageControls: StageControlsInternal = { - discardChanges: () => - exitStagingMode(() => { - // Pop all staged batches from the PSM and roll them back in LIFO order - this.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => { - this.rollbackStagedChange(runtimeOp, localOpMetadata); - }); - this.updateDocumentDirtyState(); - }), - commitChanges: (options) => { - const { squash } = { ...defaultStagingCommitOptions, ...options }; - exitStagingMode(() => { - // Replay all staged batches in typical FIFO order. - // We'll be out of staging mode so they'll be sent to the service finally. - this.pendingStateManager.replayPendingStates({ - committingStagedBatches: true, - squash, - }); - }); - }, - }; - - this.stageControls = stageControls; - this.channelCollection.notifyStagingMode(true); - - return this.stageControls; + // Delegate to the staging mode manager + return this.stagingModeManager.enterStagingMode(() => this.flush()); }; /** @@ -4883,9 +4840,7 @@ export class ContainerRuntime ); const resubmitInfo = { - // Only include Batch ID if "Offline Load" feature is enabled - // It's only needed to identify batches across container forks arising from misuse of offline load. - batchId: this.batchIdTrackingEnabled ? batchId : undefined, + batchId, staged, }; diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index f1edd45d9284..58cc979564db 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -897,30 +897,57 @@ export class PendingStateManager implements IDisposable { } /** - * Pops all staged batches, invoking the callback on each constituent op in order (LIFO) + * Pops staged messages from the back, invoking the callback on each in LIFO order. + * Used for checkpoint rollback where we want to rollback to a specific batch, or for discarding all staged changes. + * + * @param callback - Called for each popped message that has a typical runtime op + * @param afterBatchId - Optional batch ID to rollback to (this batch is kept, everything after it is removed). If undefined, removes all staged messages. */ public popStagedBatches( callback: ( - // callback will only be given staged messages with a valid runtime op (i.e. not empty batch and not an initial message with only serialized content) stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage }, ) => void, + afterBatchId?: string, ): void { while (!this.pendingMessages.isEmpty()) { const stagedMessage = this.pendingMessages.peekBack(); - if (stagedMessage?.batchInfo.staged === true) { - this.pendingMessages.pop(); - if (hasTypicalRuntimeOp(stagedMessage)) { - callback(stagedMessage); + // Stop if we've reached the checkpoint batch ID + if (afterBatchId !== undefined && stagedMessage !== undefined) { + const messageBatchId = getEffectiveBatchId(stagedMessage); + if (messageBatchId === afterBatchId) { + break; } - } else { - break; // no more staged messages + } + + // Stop if we hit a non-staged message + if (stagedMessage?.batchInfo.staged !== true) { + break; + } + + // Pop the message + this.pendingMessages.pop(); + + if (hasTypicalRuntimeOp(stagedMessage)) { + callback(stagedMessage); } } - assert( - this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true), - 0xb89 /* Shouldn't be any more staged messages */, - ); + + // Verify no staged messages remain (when afterBatchId is undefined, we should have removed all) + if (afterBatchId === undefined) { + assert( + this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true), + 0xb89 /* Shouldn't be any more staged messages */, + ); + } + } + + /** + * Gets a reference to the last pending message in the queue, or undefined if the queue is empty. + * This reference can be used later to determine if any messages have been added since. + */ + public getLastPendingMessage(): IPendingMessage | undefined { + return this.pendingMessages.peekBack(); } } diff --git a/packages/runtime/container-runtime/src/stagingModeManager.ts b/packages/runtime/container-runtime/src/stagingModeManager.ts new file mode 100644 index 000000000000..f0846e31c1f6 --- /dev/null +++ b/packages/runtime/container-runtime/src/stagingModeManager.ts @@ -0,0 +1,242 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import type { ICriticalContainerError } from "@fluidframework/container-definitions/internal"; +import { + DataProcessingError, + LoggingError, + UsageError, + normalizeError, + wrapError, +} from "@fluidframework/telemetry-utils/internal"; +import type { + StageCheckpointAlpha, + StageControlsInternal, +} from "@fluidframework/runtime-definitions/internal"; +import { DoublyLinkedList } from "@fluidframework/core-utils/internal"; +import type { PendingStateManager } from "./pendingStateManager.js"; +import { getEffectiveBatchId, type Outbox } from "./opLifecycle/index.js"; +import type { ChannelCollection } from "./channelCollection.js"; +import type { LocalContainerRuntimeMessage } from "./messageTypes.js"; + +/** + * Default options for committing staged changes. + */ +const defaultStagingCommitOptions = { + squash: false, +} as const; + +/** + * Dependencies needed by the StagingModeManager to operate. + * Uses Pick to extract only the methods we actually need, making testing easier. + */ +export interface StagingModeDependencies { + readonly pendingStateManager: Pick< + PendingStateManager, + "popStagedBatches" | "replayPendingStates" | "getLastPendingMessage" + >; + readonly outbox: Pick; + readonly getChannelCollection: () => Pick; + readonly submitIdAllocationOpIfNeeded: (options: { staged: boolean }) => void; + readonly rollbackStagedChange: ( + runtimeOp: LocalContainerRuntimeMessage, + localOpMetadata: unknown, + ) => void; + readonly updateDocumentDirtyState: () => void; + readonly closeFn: (error?: ICriticalContainerError) => void; +} + +/** + * Manages staging mode state and checkpoint creation for the ContainerRuntime. + * Staging mode allows ops to be queued locally before being committed or discarded. + */ +export class StagingModeManager { + private stageControls: StageControlsInternal | undefined; + + constructor(private readonly dependencies: StagingModeDependencies) {} + + /** + * Whether the container is currently in staging mode. + */ + public get inStagingMode(): boolean { + return this.stageControls !== undefined; + } + + /** + * Enter staging mode, queuing ops locally instead of sending to the ordering service. + * + * @param flushFn - Function to flush the outbox before entering staging mode + * @returns Controls for managing staged changes + * @throws UsageError if already in staging mode + */ + public enterStagingMode(flushFn: () => void): StageControlsInternal { + if (this.stageControls !== undefined) { + throw new UsageError("Already in staging mode"); + } + + // Make sure Outbox is empty before entering staging mode + flushFn(); + + // Track checkpoints for rollback support + // Each checkpoint stores the batch ID of the last batch at that point + const checkpointList = new DoublyLinkedList(); + + const exitStagingMode = (discardOrCommit: () => void): void => { + try { + // Final flush of any last staged changes + this.dependencies.outbox.flush(); + + this.stageControls = undefined; + + // Invalidate all remaining checkpoints by removing them from the list + while (checkpointList.first !== undefined) { + checkpointList.first.remove(); + } + + // Submit any ID allocation ops that were deferred during staging mode + this.dependencies.submitIdAllocationOpIfNeeded({ staged: false }); + discardOrCommit(); + + this.dependencies.getChannelCollection().notifyStagingMode(false); + } catch (error) { + const normalizedError = normalizeError(error); + this.dependencies.closeFn(normalizedError); + throw normalizedError; + } + }; + + const stageControls: StageControlsInternal = { + discardChanges: () => + exitStagingMode(() => { + // Pop all staged batches from the PSM and roll them back in LIFO order + this.dependencies.pendingStateManager.popStagedBatches( + ({ runtimeOp, localOpMetadata }) => { + this.dependencies.rollbackStagedChange(runtimeOp, localOpMetadata); + }, + ); + this.dependencies.updateDocumentDirtyState(); + }), + commitChanges: (options) => { + const { squash } = { ...defaultStagingCommitOptions, ...options }; + exitStagingMode(() => { + // Replay all staged batches in typical FIFO order + this.dependencies.pendingStateManager.replayPendingStates({ + committingStagedBatches: true, + squash, + }); + }); + }, + checkpoint: () => { + // Flush outbox to ensure all messages are in PSM + this.dependencies.outbox.flush(); + + // Get reference to the last pending message (or undefined if none) + const lastMessage = this.dependencies.pendingStateManager.getLastPendingMessage(); + + return this.createCheckpoint( + checkpointList, + lastMessage?.batchInfo.staged === true + ? getEffectiveBatchId(lastMessage) + : undefined, + ); + }, + }; + + this.stageControls = stageControls; + this.dependencies.getChannelCollection().notifyStagingMode(true); + + return this.stageControls; + } + + /** + * Create a checkpoint that can be rolled back to later. + * + * @param checkpointList - List tracking all active checkpoints + * @param batchId - Batch ID of the last batch at checkpoint time (or undefined if no messages yet) + * @returns Checkpoint object with rollback and dispose capabilities + */ + private createCheckpoint( + checkpointList: DoublyLinkedList, + batchId: string | undefined, + ): StageCheckpointAlpha { + // Add checkpoint to the list and store the node + // We store the batch ID of the last batch at this checkpoint + const { last: checkpointNode } = checkpointList.push(batchId); + + // Capture dependencies for use in checkpoint methods + const deps = this.dependencies; + + // Create the checkpoint object + const checkpoint: StageCheckpointAlpha = { + rollback: () => { + // Check if this checkpoint is still in the list + if (checkpointNode.list === undefined) { + throw new LoggingError("Cannot rollback an invalid checkpoint"); + } + + // Invalidate all checkpoints created after this one + while (checkpointNode.next !== undefined) { + checkpointNode.next.remove(); + } + + // Remove this checkpoint itself + checkpointNode.remove(); + + try { + // Flush the outbox to ensure all messages are in PSM before rolling back + deps.outbox.flush(); + + // Rollback all messages added after the checkpoint batch ID + // This uses batch ID comparison - stable across resubmit! + deps.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => { + deps.rollbackStagedChange(runtimeOp, localOpMetadata); + }, batchId); + + deps.updateDocumentDirtyState(); + } catch (error) { + const error2 = wrapError(error, (message) => { + return DataProcessingError.create( + `RollbackError: ${message}`, + "checkpointRollback", + undefined, + ) as DataProcessingError; + }); + deps.closeFn(error2); + throw error2; + } + }, + dispose: () => { + // Check if this checkpoint is still in the list + if (checkpointNode.list === undefined) { + throw new LoggingError("Cannot dispose an invalid checkpoint"); + } + // Remove only this checkpoint from the list + // Other checkpoints (before and after) remain valid + checkpointNode.remove(); + }, + get isValid(): boolean { + // Checkpoint is valid if it's still in the list + return checkpointNode.list !== undefined; + }, + get hasChangesSince(): boolean { + // Check if there are unflushed messages in the outbox + if (deps.outbox.mainBatchMessageCount !== 0) { + return true; + } + + // Check if any messages have been added since the checkpoint + // by comparing the current last batch ID with the checkpoint's batch ID + const currentLastMessage = deps.pendingStateManager.getLastPendingMessage(); + const currentLastBatchId = + currentLastMessage?.batchInfo.staged === true + ? getEffectiveBatchId(currentLastMessage) + : undefined; + return currentLastBatchId !== batchId; + }, + }; + + return checkpoint; + } +} diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 16dad4546815..fef9c81e2b0e 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -231,6 +231,83 @@ function assertSignalContentIsAString( assert(typeof content === "string", "Signal content expected to be a string"); } +/** + * Helper function to validate that a batchId matches the unsent format (ends with "_[-1]") + */ +function assertBatchIdMatchesUnsentFormat(batchId: string | undefined, message: string): void { + assert(batchIdMatchesUnsentFormat(batchId), message); +} + +/** + * Helper function to validate that a batchId matches the unsent format with length check + */ +function batchIdMatchesUnsentFormat(batchId?: string): boolean { + return ( + typeof batchId === "string" && + batchId.length === "00000000-0000-0000-0000-000000000000_[-1]".length && + batchId.endsWith("_[-1]") + ); +} + +/** + * Validates the structure of a batch in the submitted ops metadata array. + * A batch should have: + * - Start marker (batch: true) with batchId + * - Middle messages with undefined metadata + * - End marker (batch: false) without batchId + * + * @param submittedOpsMetadata - Array of submitted op metadata + * @param startIndex - Index where the batch starts + * @param batchSize - Total size of the batch (including start and end markers) + */ +function assertBatchStructure( + submittedOpsMetadata: unknown[], + startIndex: number, + batchSize: number, +): void { + assert(batchSize >= 1, `Batch size must be at least 1, got ${batchSize}`); + + const endIndex = startIndex + batchSize - 1; + + // Validate start marker + const startMetadata = submittedOpsMetadata[startIndex]; + assert.strictEqual( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (startMetadata as any)?.batch, + true, + `Message at index ${startIndex} should be batch start`, + ); + assertBatchIdMatchesUnsentFormat( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (startMetadata as any)?.batchId, + `Batch start at index ${startIndex} should have unsent batchId format`, + ); + + // Validate middle messages (if any) + for (let i = startIndex + 1; i < endIndex; i++) { + assert.strictEqual( + submittedOpsMetadata[i], + undefined, + `Message at index ${i} (middle of batch) should not hold batch info`, + ); + } + + // Validate end marker + const endMetadata = submittedOpsMetadata[endIndex]; + assert.strictEqual( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (endMetadata as any)?.batch, + false, + `Message at index ${endIndex} should be batch end`, + ); + assert.strictEqual( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (endMetadata as any)?.batchId, + undefined, + `Batch end at index ${endIndex} should not have batchId`, + ); +} + describe("Runtime", () => { const configProvider = (settings: Record): IConfigProviderBase => ({ getRawConfig: (name: string): ConfigTypes => settings[name], @@ -483,14 +560,6 @@ describe("Runtime", () => { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access assert.strictEqual(submittedOps[1].contents.address, "2"); - function batchIdMatchesUnsentFormat(batchId?: string): boolean { - return ( - batchId !== undefined && - batchId.length === "00000000-0000-0000-0000-000000000000_[-1]".length && - batchId.endsWith("_[-1]") - ); - } - if (enableBatchIdTracking === true) { assert( batchIdMatchesUnsentFormat( @@ -504,16 +573,26 @@ describe("Runtime", () => { // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access submittedOps[1].metadata?.batchId as string | undefined, ), - "expected unsent batchId format (0)", + "expected unsent batchId format (1)", ); } else { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert(submittedOps[0].metadata?.batchId === undefined, "Expected no batchId (0)"); - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - assert(submittedOps[1].metadata?.batchId === undefined, "Expected no batchId (1)"); + // batchId is now always added, even when enableBatchIdTracking is undefined + assert( + batchIdMatchesUnsentFormat( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + submittedOps[0].metadata?.batchId as string | undefined, + ), + "expected unsent batchId format (0)", + ); + assert( + batchIdMatchesUnsentFormat( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + submittedOps[1].metadata?.batchId as string | undefined, + ), + "expected unsent batchId format (1)", + ); } }); - // NOTE: This test is examining a case that only occurs with an old Loader that doesn't tell ContainerRuntime when processing system ops. // In other words, when the MockDeltaManager bumps its lastSequenceNumber, ContainerRuntime.process would be called in the current code, but not with legacy loader. for (const skipSafetyFlushDuringProcessStack of [true, undefined]) { @@ -818,23 +897,7 @@ describe("Runtime", () => { (containerRuntime as any).flush(); assert.strictEqual(submittedOpsMetadata.length, 3, "3 messages should be sent"); - assert.strictEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - submittedOpsMetadata[0].batch, - true, - "first message should be the batch start", - ); - assert.strictEqual( - submittedOpsMetadata[1], - undefined, - "second message should not hold batch info", - ); - assert.strictEqual( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - submittedOpsMetadata[2].batch, - false, - "third message should be the batch end", - ); + assertBatchStructure(submittedOpsMetadata, 0, 3); }); it("Resubmitting batch preserves original batches", async () => { @@ -864,25 +927,13 @@ describe("Runtime", () => { assert.strictEqual(submittedOpsMetadata.length, 6, "6 messages should be sent"); - const expectedBatchMetadata = [ - { batch: true }, - undefined, - { batch: false }, - { batch: true }, - undefined, - { batch: false }, - ]; - - assert.deepStrictEqual( - submittedOpsMetadata, - expectedBatchMetadata, - "batch metadata does not match", - ); + // Verify batch structure - each batch has 3 messages + assertBatchStructure(submittedOpsMetadata, 0, 3); // First batch: messages 0-2 + assertBatchStructure(submittedOpsMetadata, 3, 3); // Second batch: messages 3-5 }); }); } }); - describe("orderSequentially with rollback", () => { for (const flushMode of [ FlushMode.TurnBased, @@ -4440,6 +4491,87 @@ describe("Runtime", () => { assert.equal(containerRuntime.isDirty, false, "Runtime should not be dirty anymore"); }); + + describe("Checkpoint integration", () => { + it("checkpoint.rollback() rolls back changes made after checkpoint", () => { + const channelCollectionStub = stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + submitDataStoreOp(containerRuntime, "1", genTestDataStoreMessage("op-1")); + const cp = controls.checkpoint(); + + submitDataStoreOp(containerRuntime, "2", genTestDataStoreMessage("op-2")); + submitDataStoreOp(containerRuntime, "3", genTestDataStoreMessage("op-3")); + + cp.rollback(); + + // Should have rolled back ops 2 and 3 in LIFO order + const rollbackCalls = channelCollectionStub.rollbackDataStoreOp.getCalls(); + assert.equal(rollbackCalls.length, 2, "Should rollback 2 ops"); + assert.equal(rollbackCalls[0].args[0].address, "3", "Should rollback op 3 first"); + assert.equal(rollbackCalls[1].args[0].address, "2", "Should rollback op 2 second"); + + controls.commitChanges(); + assert.equal(submittedOps.length, 1, "Should only submit op 1"); + }); + + it("example: async batch processing with checkpoint objects", () => { + stubChannelCollection(containerRuntime); + const controls = containerRuntime.enterStagingMode(); + + // Simulate async processing with checkpoints + const checkpoints: { + id: number; + checkpoint: ReturnType; + }[] = []; + + // Process records + for (let i = 1; i <= 5; i++) { + submitDataStoreOp( + containerRuntime, + i.toString(), + genTestDataStoreMessage(`op-${i}`), + ); + const cp = controls.checkpoint(); + checkpoints.push({ id: i, checkpoint: cp }); + } + + // All checkpoints should be valid + assert.equal( + checkpoints.every((c) => c.checkpoint.isValid), + true, + "All should be valid", + ); + + // Simulate async decision to rollback batch 3 + const batch3 = checkpoints.find((c) => c.id === 3); + assert(batch3 !== undefined); + batch3.checkpoint.rollback(); + + // Batches 3, 4, 5 should be invalid (rolled back) + assert.equal(checkpoints[2].checkpoint.isValid, false, "Batch 3 should be invalid"); + assert.equal(checkpoints[3].checkpoint.isValid, false, "Batch 4 should be invalid"); + assert.equal(checkpoints[4].checkpoint.isValid, false, "Batch 5 should be invalid"); + + // Batches 1, 2 should still be valid + assert.equal(checkpoints[0].checkpoint.isValid, true, "Batch 1 should be valid"); + assert.equal(checkpoints[1].checkpoint.isValid, true, "Batch 2 should be valid"); + + // Verify batches 1 and 2 still have changes since their checkpoints + assert.equal( + checkpoints[0].checkpoint.hasChangesSince, + true, + "Batch 1 should have changes (op 2 and 3)", + ); + assert.equal( + checkpoints[1].checkpoint.hasChangesSince, + true, + "Batch 2 should have changes (op 3)", + ); + controls.commitChanges(); + assert.equal(submittedOps.length, 3, "Should submit ops 1, 2, and 3"); + }); + }); }); }); }); diff --git a/packages/runtime/container-runtime/src/test/stagingModeManager.spec.ts b/packages/runtime/container-runtime/src/test/stagingModeManager.spec.ts new file mode 100644 index 000000000000..4eb879294a34 --- /dev/null +++ b/packages/runtime/container-runtime/src/test/stagingModeManager.spec.ts @@ -0,0 +1,464 @@ +/*! + * Copyright (c) Microsoft Corporation and contributors. All rights reserved. + * Licensed under the MIT License. + */ + +import { strict as assert } from "node:assert"; +import Sinon from "sinon"; +import { StagingModeManager, type StagingModeDependencies } from "../stagingModeManager.js"; +import type { LocalContainerRuntimeMessage } from "../messageTypes.js"; + +describe("StagingModeManager", () => { + let sandbox: Sinon.SinonSandbox; + let dependencies: StagingModeDependencies; + let mockMainBatchMessageCount: { value: number }; + let mockLastStagedMessage: { value: object | undefined }; + let mockChannelCollection: { notifyStagingMode: Sinon.SinonStub }; + + beforeEach(() => { + sandbox = Sinon.createSandbox(); + + // Use objects to allow mutation of readonly properties in tests + mockMainBatchMessageCount = { value: 0 }; + mockLastStagedMessage = { value: undefined }; + mockChannelCollection = { notifyStagingMode: sandbox.stub() }; + + // Create minimal mocks for each dependency using Pick types + dependencies = { + pendingStateManager: { + popStagedBatches: sandbox.stub(), + replayPendingStates: sandbox.stub(), + getLastPendingMessage: sandbox.stub().callsFake(() => mockLastStagedMessage.value), + } as unknown as StagingModeDependencies["pendingStateManager"], + outbox: { + flush: sandbox.stub(), + get mainBatchMessageCount() { + return mockMainBatchMessageCount.value; + }, + } as unknown as StagingModeDependencies["outbox"], + getChannelCollection: () => mockChannelCollection, + submitIdAllocationOpIfNeeded: sandbox.stub(), + rollbackStagedChange: sandbox.stub(), + updateDocumentDirtyState: sandbox.stub(), + closeFn: sandbox.stub(), + }; + }); + + afterEach(() => { + sandbox.restore(); + }); + + describe("inStagingMode", () => { + it("should return false initially", () => { + const manager = new StagingModeManager(dependencies); + assert.equal(manager.inStagingMode, false); + }); + + it("should return true after entering staging mode", () => { + const manager = new StagingModeManager(dependencies); + manager.enterStagingMode(() => {}); + assert.equal(manager.inStagingMode, true); + }); + + it("should return false after exiting staging mode", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.discardChanges(); + assert.equal(manager.inStagingMode, false); + }); + }); + + describe("enterStagingMode", () => { + it("should throw if already in staging mode", () => { + const manager = new StagingModeManager(dependencies); + manager.enterStagingMode(() => {}); + assert.throws(() => manager.enterStagingMode(() => {}), /Already in staging mode/); + }); + + it("should call flush function", () => { + const flushFn = sandbox.stub(); + const manager = new StagingModeManager(dependencies); + manager.enterStagingMode(flushFn); + assert(flushFn.calledOnce, "Flush function should be called once"); + }); + + it("should call notifyStagingMode(true)", () => { + const manager = new StagingModeManager(dependencies); + manager.enterStagingMode(() => {}); + assert( + mockChannelCollection.notifyStagingMode.calledOnceWithExactly(true), + "notifyStagingMode should be called with true", + ); + }); + + it("should return stage controls", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + assert.notEqual(controls.discardChanges, undefined, "Should have discardChanges"); + assert.notEqual(controls.commitChanges, undefined, "Should have commitChanges"); + assert.notEqual(controls.checkpoint, undefined, "Should have checkpoint"); + }); + }); + + describe("discardChanges", () => { + it("should flush outbox", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.discardChanges(); + assert( + (dependencies.outbox.flush as Sinon.SinonStub).called, + "Outbox flush should be called", + ); + }); + + it("should pop staged batches and call rollback", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + + // Simulate popStagedBatches calling the callback + (dependencies.pendingStateManager.popStagedBatches as Sinon.SinonStub).callsFake( + ( + callback: (args: { + runtimeOp: LocalContainerRuntimeMessage; + localOpMetadata: unknown; + }) => void, + ) => { + const mockOp: LocalContainerRuntimeMessage = { + type: "test", + contents: {}, + } as unknown as LocalContainerRuntimeMessage; + callback({ + runtimeOp: mockOp, + localOpMetadata: "meta", + }); + }, + ); + + controls.discardChanges(); + + assert( + (dependencies.pendingStateManager.popStagedBatches as Sinon.SinonStub).calledOnce, + "popStagedBatches should be called", + ); + assert( + (dependencies.rollbackStagedChange as Sinon.SinonStub).calledOnce, + "rollbackStagedChange should be called", + ); + }); + + it("should call updateDocumentDirtyState", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.discardChanges(); + assert( + (dependencies.updateDocumentDirtyState as Sinon.SinonStub).calledOnce, + "updateDocumentDirtyState should be called", + ); + }); + + it("should call notifyStagingMode(false)", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.discardChanges(); + assert( + mockChannelCollection.notifyStagingMode.calledWith(false), + "notifyStagingMode should be called with false", + ); + }); + + it("should call submitIdAllocationOpIfNeeded with staged: false", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.discardChanges(); + assert( + (dependencies.submitIdAllocationOpIfNeeded as Sinon.SinonStub).calledWith({ + staged: false, + }), + "submitIdAllocationOpIfNeeded should be called with staged: false", + ); + }); + }); + + describe("commitChanges", () => { + it("should flush outbox", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.commitChanges(); + assert( + (dependencies.outbox.flush as Sinon.SinonStub).called, + "Outbox flush should be called", + ); + }); + + it("should replay pending states", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.commitChanges(); + assert( + (dependencies.pendingStateManager.replayPendingStates as Sinon.SinonStub).calledWith({ + committingStagedBatches: true, + squash: false, + }), + "replayPendingStates should be called with correct options", + ); + }); + + it("should replay pending states with squash option", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.commitChanges({ squash: true }); + assert( + (dependencies.pendingStateManager.replayPendingStates as Sinon.SinonStub).calledWith({ + committingStagedBatches: true, + squash: true, + }), + "replayPendingStates should be called with squash: true", + ); + }); + + it("should call notifyStagingMode(false)", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.commitChanges(); + assert( + mockChannelCollection.notifyStagingMode.calledWith(false), + "notifyStagingMode should be called with false", + ); + }); + }); + + describe("checkpoint", () => { + it("should flush outbox when creating checkpoint", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + controls.checkpoint(); + assert( + (dependencies.outbox.flush as Sinon.SinonStub).called, + "Outbox flush should be called", + ); + }); + + it("should return a checkpoint object with required methods", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + assert.notEqual(checkpoint.rollback, undefined, "Should have rollback method"); + assert.notEqual(checkpoint.dispose, undefined, "Should have dispose method"); + assert.equal(typeof checkpoint.isValid, "boolean", "Should have isValid property"); + assert.equal( + typeof checkpoint.hasChangesSince, + "boolean", + "Should have hasChangesSince property", + ); + }); + + it("checkpoint.isValid should be true initially", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + assert.equal(checkpoint.isValid, true); + }); + + it("checkpoint.isValid should be false after rollback", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + checkpoint.rollback(); + assert.equal(checkpoint.isValid, false); + }); + + it("checkpoint.hasChangesSince should be false when no changes", () => { + const msg1 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch1" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 1, length: 1 }, + }; + mockLastStagedMessage.value = msg1; + + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + + // Last message is still the same + assert.equal(checkpoint.hasChangesSince, false); + }); + + it("checkpoint.hasChangesSince should be true when messages added", () => { + const msg1 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch1" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 1, length: 1 }, + }; + mockLastStagedMessage.value = msg1; + + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + + // Simulate messages being added after checkpoint + const msg2 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch2" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 2, length: 1 }, + }; + mockLastStagedMessage.value = msg2; + + assert.equal(checkpoint.hasChangesSince, true); + }); + + it("checkpoint.hasChangesSince should be true when outbox has messages", () => { + mockMainBatchMessageCount.value = 0; + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + mockMainBatchMessageCount.value = 2; + assert.equal(checkpoint.hasChangesSince, true); + }); + + it("checkpoint.rollback should throw when invalid", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + checkpoint.rollback(); + assert.throws(() => checkpoint.rollback(), /Cannot rollback an invalid checkpoint/); + }); + + it("checkpoint.dispose should throw when invalid", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + checkpoint.dispose(); + assert.throws(() => checkpoint.dispose(), /Cannot dispose an invalid checkpoint/); + }); + + it("checkpoint.rollback should call popStagedMessagesAfter with correct reference", () => { + const msg1 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch1" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 1, length: 1 }, + }; + mockLastStagedMessage.value = msg1; + + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + + // Simulate messages being added after checkpoint + const msg2 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch2" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 2, length: 1 }, + }; + mockLastStagedMessage.value = msg2; + + checkpoint.rollback(); + assert( + (dependencies.pendingStateManager.popStagedBatches as Sinon.SinonStub).calledWith( + Sinon.match.func, + "batch1", + ), + "popStagedBatches should be called with callback and checkpoint batch ID", + ); + }); + it("multiple checkpoints should work independently", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + + const msg1 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch1" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 1, length: 1 }, + }; + mockLastStagedMessage.value = msg1; + const checkpoint1 = controls.checkpoint(); + + const msg2 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch2" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 2, length: 1 }, + }; + mockLastStagedMessage.value = msg2; + const checkpoint2 = controls.checkpoint(); + + assert.equal(checkpoint1.isValid, true); + assert.equal(checkpoint2.isValid, true); + + checkpoint2.rollback(); + assert.equal(checkpoint1.isValid, true, "checkpoint1 should still be valid"); + assert.equal(checkpoint2.isValid, false, "checkpoint2 should be invalid"); + }); + + it("rolling back earlier checkpoint should invalidate later ones", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + + const msg1 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch1" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 1, length: 1 }, + }; + mockLastStagedMessage.value = msg1; + const checkpoint1 = controls.checkpoint(); + + const msg2 = { + localOpMetadata: {}, + opMetadata: { batchId: "batch2" }, + batchInfo: { staged: true, clientId: "client1", batchStartCsn: 2, length: 1 }, + }; + mockLastStagedMessage.value = msg2; + const checkpoint2 = controls.checkpoint(); + + checkpoint1.rollback(); + assert.equal(checkpoint1.isValid, false); + assert.equal(checkpoint2.isValid, false, "checkpoint2 should also be invalidated"); + }); + + it("exiting staging mode should invalidate all checkpoints", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + + const checkpoint1 = controls.checkpoint(); + const checkpoint2 = controls.checkpoint(); + + controls.discardChanges(); + + assert.equal(checkpoint1.isValid, false); + assert.equal(checkpoint2.isValid, false); + }); + }); + + describe("error handling", () => { + it("should call closeFn when exitStagingMode throws", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + + // Make popStagedBatches throw + (dependencies.pendingStateManager.popStagedBatches as Sinon.SinonStub).throws( + new Error("Test error"), + ); + + assert.throws(() => controls.discardChanges()); + assert( + (dependencies.closeFn as Sinon.SinonStub).called, + "closeFn should be called on error", + ); + }); + + it("should call closeFn when checkpoint rollback throws", () => { + const manager = new StagingModeManager(dependencies); + const controls = manager.enterStagingMode(() => {}); + const checkpoint = controls.checkpoint(); + + // Make popStagedBatches throw + (dependencies.pendingStateManager.popStagedBatches as Sinon.SinonStub).throws( + new Error("Test error"), + ); + + assert.throws(() => checkpoint.rollback()); + assert( + (dependencies.closeFn as Sinon.SinonStub).called, + "closeFn should be called on error", + ); + }); + }); +}); diff --git a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md index 70a7792f9f48..3a569060d711 100644 --- a/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md +++ b/packages/runtime/runtime-definitions/api-report/runtime-definitions.legacy.alpha.api.md @@ -421,8 +421,17 @@ export interface OpAttributionKey { // @beta @legacy export type PackagePath = readonly string[]; +// @alpha @sealed @legacy +export interface StageCheckpointAlpha { + dispose(): void; + readonly hasChangesSince: boolean; + readonly isValid: boolean; + rollback(): void; +} + // @alpha @sealed @legacy export interface StageControlsAlpha { + readonly checkpoint: () => StageCheckpointAlpha; readonly commitChanges: () => void; readonly discardChanges: () => void; } diff --git a/packages/runtime/runtime-definitions/src/index.ts b/packages/runtime/runtime-definitions/src/index.ts index 4a21e634679f..7ccffe1fc349 100644 --- a/packages/runtime/runtime-definitions/src/index.ts +++ b/packages/runtime/runtime-definitions/src/index.ts @@ -95,6 +95,7 @@ export type { MinimumVersionForCollab } from "./compatibilityDefinitions.js"; export { type ContainerRuntimeBaseAlpha, type StageControlsAlpha, + type StageCheckpointAlpha, type CommitStagedChangesOptionsInternal, type IContainerRuntimeBaseInternal, type StageControlsInternal, diff --git a/packages/runtime/runtime-definitions/src/stagingMode.ts b/packages/runtime/runtime-definitions/src/stagingMode.ts index ee02df6d594a..bb839e04af1e 100644 --- a/packages/runtime/runtime-definitions/src/stagingMode.ts +++ b/packages/runtime/runtime-definitions/src/stagingMode.ts @@ -44,24 +44,145 @@ export interface StageControlsInternal extends StageControlsAlpha { } /** - * Controls for managing staged changes in alpha staging mode. + * Represents a checkpoint that can be rolled back to. * - * Provides methods to either commit or discard changes made while in staging mode. + * Created by {@link StageControlsAlpha.checkpoint}. Checkpoints allow you to undo changes + * made after the checkpoint was created while preserving earlier changes. + * + * @legacy @alpha + * @sealed + */ +export interface StageCheckpointAlpha { + /** + * Roll back all changes to this checkpoint. + * + * Undoes all operations made after this checkpoint was created and invalidates this checkpoint + * and any checkpoints created after it. + * + * @throws Error if this checkpoint is no longer valid (e.g., invalidated by rolling back to an earlier checkpoint). + * + * @example + * ```typescript + * const checkpoint = controls.checkpoint(); + * this.map.set("key", "value"); + * checkpoint.rollback(); // Undoes the map.set + * ``` + */ + rollback(): void; + + /** + * Remove this checkpoint without rolling back changes. + * + * Useful when you no longer need a checkpoint but want to keep the changes made after it. + * Only this specific checkpoint becomes invalid - later checkpoints remain valid. + * + * @throws Error if this checkpoint is no longer valid. + * + * @example + * ```typescript + * const cp1 = controls.checkpoint(); + * this.map.set("key", "value"); + * const cp2 = controls.checkpoint(); + * + * cp1.dispose(); // cp2 remains valid + * cp2.rollback(); // Still works, undoes both changes + * ``` + */ + dispose(): void; + + /** + * Whether this checkpoint is still valid and can be rolled back to. + * + * A checkpoint becomes invalid when you roll back to an earlier checkpoint or call `dispose()` on it. + * + * @example + * ```typescript + * const cp1 = controls.checkpoint(); + * const cp2 = controls.checkpoint(); + * + * cp1.rollback(); // Invalidates cp2 + * console.log(cp2.isValid); // false + * ``` + */ + readonly isValid: boolean; + + /** + * Whether any changes have been made since this checkpoint was created. + * + * @example + * ```typescript + * const checkpoint = controls.checkpoint(); + * console.log(checkpoint.hasChangesSince); // false + * this.map.set("key", "value"); + * console.log(checkpoint.hasChangesSince); // true + * ``` + */ + readonly hasChangesSince: boolean; +} + +/** + * Controls for managing staged changes in staging mode. + * + * Staging mode lets you make changes locally before committing or discarding them. + * You can create checkpoints to rollback specific changes. + * + * @example Async validation with checkpoints + * ```typescript + * class DraftFormEditor { + * private controls = this.runtime.enterStagingMode(); + * + * async updateField(name: string, value: string) { + * this.map.set(name, value); + * const checkpoint = this.controls.checkpoint(); + * + * try { + * await this.validateWithServer(name, value); + * } catch (error) { + * checkpoint.rollback(); // Rolls back only this field + * throw error; + * } + * } + * + * save() { + * this.controls.commitChanges(); + * } + * + * cancel() { + * this.controls.discardChanges(); + * } + * } + * ``` * * @legacy @alpha * @sealed */ export interface StageControlsAlpha { /** - * Exit staging mode and commit to any changes made while in staging mode. - * This will cause them to be sent to the ordering service, and subsequent changes - * made by this container will additionally flow freely to the ordering service. + * Exit staging mode and send all changes to the service. */ readonly commitChanges: () => void; + /** - * Exit staging mode and discard any changes made while in staging mode. + * Exit staging mode and undo all changes. */ readonly discardChanges: () => void; + + /** + * Create a checkpoint you can rollback to later. + * + * Returns a {@link StageCheckpointAlpha} object. The checkpoint remains valid until you + * roll back to it or to an earlier checkpoint. + * + * @returns A checkpoint object that can be used to rollback to this point. + * + * @example + * ```typescript + * const checkpoint = controls.checkpoint(); + * this.map.set("key", "value"); + * checkpoint.rollback(); // Undoes the map.set + * ``` + */ + readonly checkpoint: () => StageCheckpointAlpha; } /** @@ -77,19 +198,48 @@ export interface IContainerRuntimeBaseInternal extends ContainerRuntimeBaseAlpha } /** - * Alpha interface for container runtime base supporting staging mode. + * Alpha interface for container runtime with staging mode support. * * @legacy @alpha * @sealed */ export interface ContainerRuntimeBaseAlpha extends IContainerRuntimeBase { /** - * Enters staging mode, allowing changes to be staged before being committed or discarded. - * @returns Controls for committing or discarding staged changes. + * Enter staging mode to queue changes locally before committing or discarding them. + * + * @returns Controls for managing staged changes. See {@link StageControlsAlpha}. + * + * @example + * ```typescript + * class DraftFormEditor { + * private controls = this.runtime.enterStagingMode(); + * + * async updateField(name: string, value: string) { + * this.map.set(name, value); + * const checkpoint = this.controls.checkpoint(); + * + * try { + * await this.validateWithServer(name, value); + * } catch (error) { + * checkpoint.rollback(); + * throw error; + * } + * } + * + * save() { + * this.controls.commitChanges(); + * } + * + * dispose() { + * this.controls.discardChanges(); + * } + * } + * ``` */ enterStagingMode(): StageControlsAlpha; + /** - * Indicates whether the container is currently in staging mode. + * Whether the container is currently in staging mode. */ readonly inStagingMode: boolean; } diff --git a/packages/test/local-server-stress-tests/package.json b/packages/test/local-server-stress-tests/package.json index 4296d638782c..02066d322c8b 100644 --- a/packages/test/local-server-stress-tests/package.json +++ b/packages/test/local-server-stress-tests/package.json @@ -113,7 +113,8 @@ "@fluidframework/matrix#build:test", "@fluidframework/tree#build:test", "@fluidframework/task-manager#build:test", - "@fluidframework/legacy-dds#build:test" + "@fluidframework/legacy-dds#build:test", + "@fluidframework/counter#build:test" ] } }, diff --git a/packages/test/local-server-stress-tests/src/baseModel.ts b/packages/test/local-server-stress-tests/src/baseModel.ts index 27d70c88fcc4..16e053e2f251 100644 --- a/packages/test/local-server-stress-tests/src/baseModel.ts +++ b/packages/test/local-server-stress-tests/src/baseModel.ts @@ -61,6 +61,10 @@ const orderSequentiallyReducer = async ( export const reducer = combineReducersAsync({ enterStagingMode: async (state, op) => state.client.entryPoint.enterStagingMode(), exitStagingMode: async (state, op) => state.client.entryPoint.exitStagingMode(op.commit), + stagingModeCreateCheckpoint: async (state, op) => + state.client.entryPoint.createCheckpoint(op.tag), + stagingModeRollbackToCheckpoint: async (state, op) => + state.client.entryPoint.rollbackToCheckpoint(op.tag), createDataStore: async (state, op) => state.datastore.createDataStore(op.tag, op.asChild), createChannel: async (state, op) => { state.datastore.createChannel(op.tag, op.channelType); @@ -123,9 +127,26 @@ export function makeGenerator( commit: random.bool(), }), 25, - (state) => - state.client.entryPoint.inStagingMode() && - state.client.container.attachState !== AttachState.Detached, + (state) => state.client.entryPoint.inStagingMode(), + ], + [ + async (state) => ({ + type: "stagingModeCreateCheckpoint", + tag: state.tag("checkpoint"), + }), + 5, + (state) => state.client.entryPoint.inStagingMode(), + ], + [ + async (state) => { + const validTags = state.client.entryPoint.getValidCheckpointTags(); + return { + type: "stagingModeRollbackToCheckpoint", + tag: state.random.pick(validTags), + }; + }, + 10, + (state) => state.client.entryPoint.getValidCheckpointTags().length > 0, ], [DDSModelOpGenerator, 100], [ diff --git a/packages/test/local-server-stress-tests/src/localServerStressHarness.ts b/packages/test/local-server-stress-tests/src/localServerStressHarness.ts index 5014750f06ba..5bcada057738 100644 --- a/packages/test/local-server-stress-tests/src/localServerStressHarness.ts +++ b/packages/test/local-server-stress-tests/src/localServerStressHarness.ts @@ -76,9 +76,11 @@ import { } from "./stressDataObject.js"; import { makeUnreachableCodePathProxy } from "./utils.js"; +export type Tagged = `${T}-${number}`; + export interface Client { container: ContainerAlpha; - tag: `client-${number}`; + tag: Tagged<"client">; entryPoint: DefaultStressDataObject; } @@ -87,7 +89,7 @@ export interface Client { */ export interface LocalServerStressState extends BaseFuzzTestState { localDeltaConnectionServer: ILocalDeltaConnectionServer; - pendingLocalStateStore: PendingLocalStateStore<`client-${number}` | undefined>; + pendingLocalStateStore: PendingLocalStateStore | undefined>; codeLoader: ICodeDetailsLoader; validationClient: Client; random: IRandom; @@ -96,16 +98,16 @@ export interface LocalServerStressState extends BaseFuzzTestState { datastore: StressDataObject; channel: IChannel; seed: number; - tag(prefix: T): `${T}-${number}`; + tag(prefix: T): Tagged; } /** * @internal */ interface SelectedClientSpec { - clientTag: `client-${number}`; - datastoreTag: `datastore-${number}`; - channelTag: `channel-${number}`; + clientTag: Tagged<"client">; + datastoreTag: Tagged<"datastore">; + channelTag: Tagged<"channel">; } /** @@ -120,8 +122,8 @@ interface Attach { */ interface AddClient { type: "addClient"; - clientTag: `client-${number}`; - fromClientTag: `client-${number}` | undefined; + clientTag: Tagged<"client">; + fromClientTag: Tagged<"client"> | undefined; } /** @@ -129,7 +131,7 @@ interface AddClient { */ interface RemoveClient { type: "removeClient"; - clientTag: `client-${number}`; + clientTag: Tagged<"client">; } /** @@ -733,7 +735,7 @@ function mixinClientSelection( ...baseOp, clientTag: client.tag, datastoreTag: entry.tag, - channelTag: channel.id as `channel-${number}`, + channelTag: channel.id as Tagged<"channel">, } satisfies SelectedClientSpec); }; }; @@ -817,7 +819,7 @@ async function createDetachedClient( localDeltaConnectionServer: ILocalDeltaConnectionServer, codeLoader: ICodeDetailsLoader, codeDetails: IFluidCodeDetails, - tag: `client-${number}`, + tag: Tagged<"client">, seed: number, options: LocalServerStressOptions, ): Promise { @@ -849,7 +851,7 @@ async function createDetachedClient( async function loadClient( localDeltaConnectionServer: ILocalDeltaConnectionServer, codeLoader: ICodeDetailsLoader, - tag: `client-${number}`, + tag: Tagged<"client">, url: string, seed: number, options: LocalServerStressOptions, diff --git a/packages/test/local-server-stress-tests/src/stressDataObject.ts b/packages/test/local-server-stress-tests/src/stressDataObject.ts index eea42f444622..935f52960faf 100644 --- a/packages/test/local-server-stress-tests/src/stressDataObject.ts +++ b/packages/test/local-server-stress-tests/src/stressDataObject.ts @@ -27,7 +27,10 @@ import type { IChannel } from "@fluidframework/datastore-definitions/internal"; // eslint-disable-next-line import-x/no-internal-modules import { modifyClusterSize } from "@fluidframework/id-compressor/internal/test-utils"; import { ISharedMap, SharedMap } from "@fluidframework/map/internal"; -import { type StageControlsAlpha } from "@fluidframework/runtime-definitions/internal"; +import { + type StageCheckpointAlpha, + type StageControlsAlpha, +} from "@fluidframework/runtime-definitions/internal"; import { RuntimeHeaders, toFluidHandleInternal, @@ -37,21 +40,22 @@ import { timeoutAwait } from "@fluidframework/test-utils/internal"; import { ddsModelMap } from "./ddsModels.js"; import { makeUnreachableCodePathProxy } from "./utils.js"; +import type { Tagged } from "./localServerStressHarness.js"; export interface UploadBlob { type: "uploadBlob"; - tag: `blob-${number}`; + tag: Tagged<"blob">; } export interface CreateDataStore { type: "createDataStore"; asChild: boolean; - tag: `datastore-${number}`; + tag: Tagged<"datastore">; } export interface CreateChannel { type: "createChannel"; channelType: string; - tag: `channel-${number}`; + tag: Tagged<"channel">; } export interface EnterStagingMode { @@ -62,12 +66,24 @@ export interface ExitStagingMode { commit: boolean; } +export interface StagingModeCreateCheckpoint { + type: "stagingModeCreateCheckpoint"; + tag: Tagged<"checkpoint">; +} + +export interface StagingModeRollbackToCheckpoint { + type: "stagingModeRollbackToCheckpoint"; + tag: Tagged<"checkpoint">; +} + export type StressDataObjectOperations = | UploadBlob | CreateDataStore | CreateChannel | EnterStagingMode - | ExitStagingMode; + | ExitStagingMode + | StagingModeCreateCheckpoint + | StagingModeRollbackToCheckpoint; export class StressDataObject extends DataObject { public static readonly factory: DataObjectFactory = new DataObjectFactory({ @@ -143,7 +159,7 @@ export class StressDataObject extends DataObject { return this.runtime.attachState !== AttachState.Detached; } - public async uploadBlob(tag: `blob-${number}`, contents: string) { + public async uploadBlob(tag: Tagged<"blob">, contents: string) { const handle = await this.runtime.uploadBlob(stringToBuffer(contents, "utf-8")); this.defaultStressObject.registerLocallyCreatedObject({ type: "newBlob", @@ -152,12 +168,12 @@ export class StressDataObject extends DataObject { }); } - public createChannel(tag: `channel-${number}`, type: string) { + public createChannel(tag: Tagged<"channel">, type: string) { this.runtime.createChannel(tag, type); this.channelNameMap.set(tag, type); } - public async createDataStore(tag: `datastore-${number}`, asChild: boolean) { + public async createDataStore(tag: Tagged<"datastore">, asChild: boolean) { const dataStore = await this.context.containerRuntime.createDataStore( asChild ? [...this.context.packagePath, StressDataObject.factory.type] @@ -184,10 +200,10 @@ export class StressDataObject extends DataObject { } export type ContainerObjects = - | { type: "newBlob"; handle: IFluidHandle; tag: `blob-${number}` } + | { type: "newBlob"; handle: IFluidHandle; tag: Tagged<"blob"> } | { type: "stressDataObject"; - tag: `datastore-${number}`; + tag: Tagged<"datastore">; handle: IFluidHandle; stressDataObject: StressDataObject; }; @@ -204,9 +220,8 @@ export class DefaultStressDataObject extends StressDataObject { * will also be in these the containerObjectMap, but are not necessarily usable * as they could be detached, in which can only this instance can access them. */ - private readonly _locallyCreatedObjects: ContainerObjects[] = []; public async getContainerObjects(): Promise[]> { - const containerObjects: Readonly[] = [...this._locallyCreatedObjects]; + const containerObjects: Readonly[] = []; const containerRuntime = // eslint-disable-next-line import-x/no-deprecated this.context.containerRuntime as IContainerRuntimeWithResolveHandle_Deprecated; for (const [url, entry] of this.containerObjectMap as any as [ @@ -296,10 +311,10 @@ export class DefaultStressDataObject extends StressDataObject { this.containerObjectMap.set(handle.absolutePath, { tag: obj.tag, type: obj.type }); } } - this._locallyCreatedObjects.push(obj); } private stageControls: StageControlsAlpha | undefined; + private readonly checkpoints = new Map, StageCheckpointAlpha>(); private readonly containerRuntimeExp = asLegacyAlpha(this.context.containerRuntime); public enterStagingMode() { assert( @@ -325,6 +340,56 @@ export class DefaultStressDataObject extends StressDataObject { this.stageControls.discardChanges(); } this.stageControls = undefined; + this.checkpoints.clear(); + } + + public createCheckpoint(tag: Tagged<"checkpoint">) { + assert(this.stageControls !== undefined, "must have staging mode controls to checkpoint"); + const checkpoint = this.stageControls.checkpoint(); + this.checkpoints.set(tag, checkpoint); + } + + public rollbackToCheckpoint(tag: Tagged<"checkpoint">) { + assert( + this.stageControls !== undefined, + "must have staging mode controls to rollback checkpoint", + ); + const checkpoint = this.checkpoints.get(tag); + assert(checkpoint !== undefined, `checkpoint ${tag} not found`); + checkpoint.rollback(); + // Remove this checkpoint and all checkpoints created after it (they're now invalid) + const checkpointsToRemove: Tagged<"checkpoint">[] = []; + for (const [key, cp] of this.checkpoints.entries()) { + if (cp.isValid === false) { + checkpointsToRemove.push(key); + } + } + for (const key of checkpointsToRemove) { + this.checkpoints.delete(key); + } + } + + public getValidCheckpointTags(): Tagged<"checkpoint">[] { + const validTags: Tagged<"checkpoint">[] = []; + for (const [tag, checkpoint] of this.checkpoints.entries()) { + if (checkpoint.isValid === true) { + validTags.push(tag); + } + } + return validTags; + } + + public hasChangesSinceCheckpoint(): boolean { + if (this.stageControls === undefined || this.checkpoints.size === 0) { + return false; + } + // Check if any valid checkpoint has changes since it was created + for (const checkpoint of this.checkpoints.values()) { + if (checkpoint.isValid === true && checkpoint.hasChangesSince === true) { + return true; + } + } + return false; } } diff --git a/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts b/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts index 9d6a0b99fd53..e9a7b2aabd81 100644 --- a/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts +++ b/packages/test/local-server-stress-tests/src/test/localServerStress.spec.ts @@ -38,11 +38,10 @@ describe("Local Server Stress", () => { "Fluid.Container.enableOfflineFull": true, "Fluid.ContainerRuntime.EnableRollback": true, }, - // skipMinimization: true, + skipMinimization: true, // Use skip, replay, and only properties to control which seeds run. skip: [ - 11, // container closes with 0xc3d, and then test fails due to closed container - 173, // 0xc3d + 19, // Task queues are not the same size ], }); }); diff --git a/packages/test/local-server-tests/src/test/stagingMode.spec.ts b/packages/test/local-server-tests/src/test/stagingMode.spec.ts index 1abf1620b9be..8dfbb4acb886 100644 --- a/packages/test/local-server-tests/src/test/stagingMode.spec.ts +++ b/packages/test/local-server-tests/src/test/stagingMode.spec.ts @@ -680,4 +680,447 @@ describe("Staging Mode", () => { "Should not be able to set an alias in staging mode", ); }); + + describe("Checkpoints", () => { + it("can create and rollback to checkpoint", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + // Make initial changes + clients.original.dataObject.makeEdit("before-checkpoint"); + + // Create checkpoint + const checkpoint = stagingControls.checkpoint(); + assert.equal( + checkpoint.hasChangesSince, + false, + "Should have no changes since checkpoint", + ); + + // Make changes after checkpoint + clients.original.dataObject.makeEdit("after-checkpoint"); + + assert.equal( + hasEdit(clients.original, "before-checkpoint"), + true, + "Should have before-checkpoint edit", + ); + assert.equal( + hasEdit(clients.original, "after-checkpoint"), + true, + "Should have after-checkpoint edit", + ); + + // Rollback to checkpoint + checkpoint.rollback(); + assert.equal( + hasEdit(clients.original, "before-checkpoint"), + true, + "Should still have before-checkpoint edit", + ); + assert.equal( + hasEdit(clients.original, "after-checkpoint"), + false, + "Should not have after-checkpoint edit after rollback", + ); + + // Commit the remaining changes + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + assert.equal( + hasEdit(clients.loaded, "before-checkpoint"), + true, + "Loaded client should have before-checkpoint edit", + ); + assert.equal( + hasEdit(clients.loaded, "after-checkpoint"), + false, + "Loaded client should not have after-checkpoint edit", + ); + }); + + it("supports nested checkpoints", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("edit-1"); + stagingControls.checkpoint(); // cp1 - not used for rollback + + clients.original.dataObject.makeEdit("edit-2"); + const cp2 = stagingControls.checkpoint(); + + clients.original.dataObject.makeEdit("edit-3"); + const cp3 = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("edit-4"); + + assert.equal(cp3.hasChangesSince, true, "Should have changes since last checkpoint"); + + // Rollback checkpoint 3 (removes edit-4) + cp3.rollback(); + assert.equal( + hasEdit(clients.original, "edit-4"), + false, + "Should not have edit-4 after first rollback", + ); + + // Rollback checkpoint 2 (removes edit-3) + cp2.rollback(); + assert.equal( + hasEdit(clients.original, "edit-3"), + false, + "Should not have edit-3 after second rollback", + ); + + // Commit remaining changes (edit-1 and edit-2) + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + assert.equal(hasEdit(clients.loaded, "edit-1"), true, "Should have edit-1"); + assert.equal(hasEdit(clients.loaded, "edit-2"), true, "Should have edit-2"); + assert.equal(hasEdit(clients.loaded, "edit-3"), false, "Should not have edit-3"); + assert.equal(hasEdit(clients.loaded, "edit-4"), false, "Should not have edit-4"); + }); + it("checkpoint with DDS creation and rollback", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("before-dds"); + const checkpoint = stagingControls.checkpoint(); + + clients.original.dataObject.addDDS("checkpoint-dds"); + clients.original.dataObject.makeEdit("after-dds"); + + // Rollback should remove both the DDS and the edit + checkpoint.rollback(); + assert.equal( + hasEdit(clients.original, "before-dds"), + true, + "Should have before-dds edit", + ); + assert.equal( + hasEdit(clients.original, "checkpoint-dds"), + false, + "Should not have checkpoint-dds after rollback", + ); + assert.equal( + hasEdit(clients.original, "after-dds"), + false, + "Should not have after-dds edit after rollback", + ); + + stagingControls.commitChanges(); + await waitForSave(clients); + + await assertDeepConsistent(clients, "states should match after commit"); + }); + it("checkpoints work with remote changes", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("local-1"); + const checkpoint = stagingControls.checkpoint(); + + // Remote client makes changes + clients.loaded.dataObject.makeEdit("remote-1"); + await waitForSave([clients.loaded]); + await catchUp(clients, await waitForSave([clients.loaded])); + + clients.original.dataObject.makeEdit("local-2"); + + // Rollback local changes after checkpoint + checkpoint.rollback(); + assert.equal(hasEdit(clients.original, "local-1"), true, "Should have local-1"); + assert.equal(hasEdit(clients.original, "remote-1"), true, "Should have remote-1"); + assert.equal(hasEdit(clients.original, "local-2"), false, "Should not have local-2"); + + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + assert.equal(hasEdit(clients.loaded, "local-1"), true, "Should have local-1"); + assert.equal(hasEdit(clients.loaded, "remote-1"), true, "Should have remote-1"); + assert.equal(hasEdit(clients.loaded, "local-2"), false, "Should not have local-2"); + }); + + it("checkpoint throws when rolled back to an invalid checkpoint", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + const cp1 = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("edit-1"); + const cp2 = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("edit-2"); + + cp1.rollback(); // This rolls back to cp1 and invalidates cp2 + + assert.throws( + () => cp2.rollback(), + "Should throw when rolling back to invalid checkpoint", + ); + + assert.equal(hasEdit(clients.original, "edit-1"), false, "edit-1 should be rolled back"); + assert.equal(hasEdit(clients.original, "edit-2"), false, "edit-2 should be rolled back"); + + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + }); + + it("does not create empty checkpoints", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + // Create checkpoint without any changes + const emptyCheckpoint = stagingControls.checkpoint(); + assert.equal( + emptyCheckpoint.hasChangesSince, + false, + "Empty checkpoint should show no changes", + ); + + // Make a change and create checkpoint + clients.original.dataObject.makeEdit("edit-1"); + const checkpoint = stagingControls.checkpoint(); + assert.equal( + checkpoint.hasChangesSince, + false, + "Should have checkpoint with no new changes", + ); + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + }); + + it("does not create duplicate checkpoints", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("edit-1"); + const cp1 = stagingControls.checkpoint(); + assert.equal(cp1.hasChangesSince, false, "Should have checkpoint after edit-1"); + + // Create another checkpoint without new changes + const cp2 = stagingControls.checkpoint(); + assert.equal(cp2.hasChangesSince, false, "Second checkpoint should show no new changes"); + + // Make another change and create checkpoint + clients.original.dataObject.makeEdit("edit-2"); + const cp3 = stagingControls.checkpoint(); + assert.equal(cp3.hasChangesSince, false, "Should have checkpoint after edit-2"); + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + }); + it("checkpoints work while disconnected", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("before-disconnect"); + const checkpoint = stagingControls.checkpoint(); + + await ensureDisconnected(clients.original); + + clients.original.dataObject.makeEdit("while-disconnected"); + + // Rollback while disconnected + checkpoint.rollback(); + assert.equal( + hasEdit(clients.original, "before-disconnect"), + true, + "Should have before-disconnect edit", + ); + assert.equal( + hasEdit(clients.original, "while-disconnected"), + false, + "Should not have while-disconnected edit after rollback", + ); + + await ensureConnected(clients.original); + + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + assert.equal(hasEdit(clients.loaded, "before-disconnect"), true, "Should have edit"); + assert.equal( + hasEdit(clients.loaded, "while-disconnected"), + false, + "Should not have edit", + ); + }); + + it("checkpoints survive disconnect/reconnect and resubmit", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("before-checkpoint"); + const checkpoint = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("after-checkpoint"); + + // Disconnect and reconnect to trigger resubmit of ops + await ensureDisconnected(clients.original); + await ensureConnected(clients.original); + + // Make another edit after reconnect + clients.original.dataObject.makeEdit("after-reconnect"); + + // Checkpoint should still be valid and rollback should work + assert.equal(checkpoint.isValid, true, "Checkpoint should remain valid after resubmit"); + checkpoint.rollback(); + + assert.equal( + hasEdit(clients.original, "before-checkpoint"), + true, + "Should have edit before checkpoint", + ); + assert.equal( + hasEdit(clients.original, "after-checkpoint"), + false, + "Should not have edit after checkpoint (rolled back)", + ); + assert.equal( + hasEdit(clients.original, "after-reconnect"), + false, + "Should not have edit after reconnect (rolled back)", + ); + + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + assert.equal( + hasEdit(clients.loaded, "before-checkpoint"), + true, + "Should have edit before checkpoint", + ); + assert.equal( + hasEdit(clients.loaded, "after-checkpoint"), + false, + "Should not have edit after checkpoint", + ); + assert.equal( + hasEdit(clients.loaded, "after-reconnect"), + false, + "Should not have edit after reconnect", + ); + }); + + it("checkpoint.isValid reflects validity state", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + const cp1 = stagingControls.checkpoint(); + const cp2 = stagingControls.checkpoint(); + const cp3 = stagingControls.checkpoint(); + + assert.equal(cp1.isValid, true, "cp1 should be valid"); + assert.equal(cp2.isValid, true, "cp2 should be valid"); + assert.equal(cp3.isValid, true, "cp3 should be valid"); + + // Rolling back to cp1 invalidates cp2 and cp3 + cp1.rollback(); + assert.equal(cp1.isValid, false, "cp1 should be invalid after rollback"); + assert.equal(cp2.isValid, false, "cp2 should be invalid after rollback to cp1"); + assert.equal(cp3.isValid, false, "cp3 should be invalid after rollback to cp1"); + + stagingControls.commitChanges(); + }); + + it("checkpoint.dispose() invalidates only that checkpoint", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + clients.original.dataObject.makeEdit("edit-1"); + const cp1 = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("edit-2"); + const cp2 = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("edit-3"); + const cp3 = stagingControls.checkpoint(); + clients.original.dataObject.makeEdit("edit-4"); + + // Dispose cp2 - cp1 and cp3 should remain valid + cp2.dispose(); + assert.equal(cp1.isValid, true, "cp1 should remain valid"); + assert.equal(cp2.isValid, false, "cp2 should be invalid after dispose"); + assert.equal(cp3.isValid, true, "cp3 should remain valid"); + + // cp3 can still be rolled back (removes edit-4) + cp3.rollback(); + assert.equal(hasEdit(clients.original, "edit-4"), false, "Should not have edit-4"); + assert.equal(hasEdit(clients.original, "edit-3"), true, "Should still have edit-3"); + + // cp1 can still be rolled back (removes edit-2 and edit-3) + cp1.rollback(); + assert.equal(hasEdit(clients.original, "edit-3"), false, "Should not have edit-3"); + assert.equal(hasEdit(clients.original, "edit-2"), false, "Should not have edit-2"); + assert.equal(hasEdit(clients.original, "edit-1"), true, "Should still have edit-1"); + + stagingControls.commitChanges(); + await waitForSave(clients); + + assertConsistent(clients, "states should match after commit"); + }); + + it("checkpoint.dispose() throws on invalid checkpoint", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + const cp1 = stagingControls.checkpoint(); + const cp2 = stagingControls.checkpoint(); + + cp1.rollback(); // Invalidates cp2 + + assert.throws(() => cp2.dispose(), "Should throw when disposing invalid checkpoint"); + + stagingControls.commitChanges(); + }); + + it("checkpoint.rollback() throws on invalid checkpoint after dispose", async () => { + const deltaConnectionServer = LocalDeltaConnectionServer.create(); + const clients = await createClients(deltaConnectionServer); + + const stagingControls = clients.original.dataObject.enterStagingMode(); + + const checkpoint = stagingControls.checkpoint(); + checkpoint.dispose(); + + assert.equal(checkpoint.isValid, false, "Checkpoint should be invalid after dispose"); + assert.throws( + () => checkpoint.rollback(), + "Should throw when rolling back disposed checkpoint", + ); + + stagingControls.commitChanges(); + }); + }); });