Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 20 additions & 65 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ import {
type BatchStartInfo,
DuplicateBatchDetector,
ensureContentsDeserialized,
type IBatchCheckpoint,
OpCompressor,
OpDecompressor,
OpGroupingManager,
Expand All @@ -252,6 +251,7 @@ import {
RemoteMessageProcessor,
type OutboundBatch,
type BatchResubmitInfo,
type IBatchCheckpoint,
} from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
import {
Expand All @@ -267,6 +267,7 @@ import {
validateLoaderCompatibility,
} from "./runtimeLayerCompatState.js";
import { SignalTelemetryManager } from "./signalTelemetryProcessing.js";
import { StagingModeManager } from "./stagingModeManager.js";
// These types are imported as types here because they are present in summaryDelayLoadedModule, which is loaded dynamically when required.
import {
aliasBlobName,
Expand Down Expand Up @@ -560,8 +561,6 @@ export const defaultRuntimeHeaderData: Required<RuntimeHeaderData> = {
allowTombstone: false,
};

const defaultStagingCommitOptions = { squash: false };

/**
* @deprecated
* Untagged logger is unsupported going forward. There are old loaders with old ContainerContexts that only
Expand Down Expand Up @@ -1464,6 +1463,7 @@ export class ContainerRuntime
private readonly duplicateBatchDetector: DuplicateBatchDetector | undefined;
private readonly outbox: Outbox;
private readonly garbageCollector: IGarbageCollector;
private readonly stagingModeManager: StagingModeManager;

private readonly channelCollection: ChannelCollection;
private readonly remoteMessageProcessor: RemoteMessageProcessor;
Expand Down Expand Up @@ -1615,8 +1615,8 @@ export class ContainerRuntime
logger: this.baseLogger,
namespace: "ContainerRuntime",
properties: {
all: {
inStagingMode: this.inStagingMode,
error: {
inStagingMode: () => this.inStagingMode,
},
},
});
Expand Down Expand Up @@ -2037,6 +2037,17 @@ export class ContainerRuntime
opReentrancy: () => this.dataModelChangeRunner.running,
});

// Initialize the staging mode manager with all its dependencies
this.stagingModeManager = new StagingModeManager({
pendingStateManager: this.pendingStateManager,
outbox: this.outbox,
getChannelCollection: () => this.channelCollection,
submitIdAllocationOpIfNeeded: this.submitIdAllocationOpIfNeeded.bind(this),
rollbackStagedChange: this.rollbackStagedChange.bind(this),
updateDocumentDirtyState: this.updateDocumentDirtyState.bind(this),
closeFn: this.closeFn,
});

this._quorum = quorum;
this._quorum.on("removeMember", (clientId: string) => {
this.remoteMessageProcessor.clearPartialMessagesFor(clientId);
Expand Down Expand Up @@ -3607,15 +3618,13 @@ export class ContainerRuntime
return result;
}

private stageControls: StageControlsInternal | undefined;

/**
* If true, the ContainerRuntime is not submitting any new ops to the ordering service.
* Ops submitted to the ContainerRuntime while in Staging Mode will be queued in the PendingStateManager,
* either to be discarded or committed later (via the Stage Controls returned from enterStagingMode).
*/
public get inStagingMode(): boolean {
return this.stageControls !== undefined;
return this.stagingModeManager?.inStagingMode ?? false;
}

/**
Expand All @@ -3625,64 +3634,12 @@ export class ContainerRuntime
* @returns Controls for exiting Staging Mode.
*/
public enterStagingMode = (): StageControlsInternal => {
if (this.stageControls !== undefined) {
throw new UsageError("Already in staging mode");
}
if (this.attachState === AttachState.Detached) {
throw new UsageError("Cannot enter staging mode while Detached");
}

// Make sure Outbox is empty before entering staging mode,
// since we mark whole batches as "staged" or not to indicate whether to submit them.
this.flush();

const exitStagingMode = (discardOrCommit: () => void): void => {
try {
// Final flush of any last staged changes
// NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c
this.outbox.flush();

this.stageControls = undefined;

// During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops).
// Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode.
this.submitIdAllocationOpIfNeeded({ staged: false });
discardOrCommit();

this.channelCollection.notifyStagingMode(false);
} catch (error) {
const normalizedError = normalizeError(error);
this.closeFn(normalizedError);
throw normalizedError;
}
};

const stageControls: StageControlsInternal = {
discardChanges: () =>
exitStagingMode(() => {
// Pop all staged batches from the PSM and roll them back in LIFO order
this.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => {
this.rollbackStagedChange(runtimeOp, localOpMetadata);
});
this.updateDocumentDirtyState();
}),
commitChanges: (options) => {
const { squash } = { ...defaultStagingCommitOptions, ...options };
exitStagingMode(() => {
// Replay all staged batches in typical FIFO order.
// We'll be out of staging mode so they'll be sent to the service finally.
this.pendingStateManager.replayPendingStates({
committingStagedBatches: true,
squash,
});
});
},
};

this.stageControls = stageControls;
this.channelCollection.notifyStagingMode(true);

return this.stageControls;
// Delegate to the staging mode manager
return this.stagingModeManager.enterStagingMode(() => this.flush());
};

/**
Expand Down Expand Up @@ -4883,9 +4840,7 @@ export class ContainerRuntime
);

const resubmitInfo = {
// Only include Batch ID if "Offline Load" feature is enabled
// It's only needed to identify batches across container forks arising from misuse of offline load.
batchId: this.batchIdTrackingEnabled ? batchId : undefined,
batchId,
staged,
};

Expand Down
51 changes: 39 additions & 12 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -897,30 +897,57 @@ export class PendingStateManager implements IDisposable {
}

/**
* Pops all staged batches, invoking the callback on each constituent op in order (LIFO)
* Pops staged messages from the back, invoking the callback on each in LIFO order.
* Used for checkpoint rollback where we want to rollback to a specific batch, or for discarding all staged changes.
*
* @param callback - Called for each popped message that has a typical runtime op
* @param afterBatchId - Optional batch ID to rollback to (this batch is kept, everything after it is removed). If undefined, removes all staged messages.
*/
public popStagedBatches(
callback: (
// callback will only be given staged messages with a valid runtime op (i.e. not empty batch and not an initial message with only serialized content)
stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage },
) => void,
afterBatchId?: string,
): void {
while (!this.pendingMessages.isEmpty()) {
const stagedMessage = this.pendingMessages.peekBack();
if (stagedMessage?.batchInfo.staged === true) {
this.pendingMessages.pop();

if (hasTypicalRuntimeOp(stagedMessage)) {
callback(stagedMessage);
// Stop if we've reached the checkpoint batch ID
if (afterBatchId !== undefined && stagedMessage !== undefined) {
const messageBatchId = getEffectiveBatchId(stagedMessage);
if (messageBatchId === afterBatchId) {
break;
}
} else {
break; // no more staged messages
}

// Stop if we hit a non-staged message
if (stagedMessage?.batchInfo.staged !== true) {
break;
}

// Pop the message
this.pendingMessages.pop();

if (hasTypicalRuntimeOp(stagedMessage)) {
callback(stagedMessage);
}
}
assert(
this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true),
0xb89 /* Shouldn't be any more staged messages */,
);

// Verify no staged messages remain (when afterBatchId is undefined, we should have removed all)
if (afterBatchId === undefined) {
assert(
this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true),
0xb89 /* Shouldn't be any more staged messages */,
);
}
}

/**
* Gets a reference to the last pending message in the queue, or undefined if the queue is empty.
* This reference can be used later to determine if any messages have been added since.
*/
public getLastPendingMessage(): IPendingMessage | undefined {
return this.pendingMessages.peekBack();
}
}

Expand Down
Loading
Loading