diff --git a/.claude/skills/trigger-pipelines-for-copilot-pr/SKILL.md b/.claude/skills/trigger-pipelines-for-copilot-pr/SKILL.md index 641b8c6b2196..661bc175d394 100644 --- a/.claude/skills/trigger-pipelines-for-copilot-pr/SKILL.md +++ b/.claude/skills/trigger-pipelines-for-copilot-pr/SKILL.md @@ -26,7 +26,8 @@ Second comment: Posting those comments will trigger all our pipelines, which is necessary for PRs that are created by Copilot. To post the comments first check if the GitHub CLI is available, -and if so use `gh pr comment --repo microsoft/FluidFramework --body ""`. +and if so use `MSYS_NO_PATHCONV=1 gh pr comment --repo microsoft/FluidFramework --body ""`. +Note: `MSYS_NO_PATHCONV=1` is required on Windows (Git Bash) to prevent `/azp` from being expanded to `C:/Program Files/Git/azp`. If `gh` is not available but `$GITHUB_TOKEN` is, you can try the GitHub REST API directly, e.g.: ``` diff --git a/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md b/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md index 8ede8c8fa7d8..679a701f0ca6 100644 --- a/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md +++ b/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md @@ -46,6 +46,7 @@ export interface ContainerRuntimeOptions { readonly gcOptions: IGCRuntimeOptions; readonly loadSequenceNumberVerification: "close" | "log" | "bypass"; readonly maxBatchSizeInBytes: number; + readonly stagingModeAutoFlushThreshold: number; // (undocumented) readonly summaryOptions: ISummaryRuntimeOptions; } diff --git a/packages/runtime/container-runtime/src/containerCompatibility.ts b/packages/runtime/container-runtime/src/containerCompatibility.ts index 291106a0ea56..0924b1a0adfc 100644 --- a/packages/runtime/container-runtime/src/containerCompatibility.ts +++ b/packages/runtime/container-runtime/src/containerCompatibility.ts @@ -43,6 +43,7 @@ export type RuntimeOptionsAffectingDocSchema = Omit< | "maxBatchSizeInBytes" | "loadSequenceNumberVerification" | "summaryOptions" + | "stagingModeAutoFlushThreshold" | "disableSchemaUpgrade" >; diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 36f9319d1298..407d93166c52 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -242,6 +242,7 @@ import { DuplicateBatchDetector, ensureContentsDeserialized, type IBatchCheckpoint, + largeBatchThreshold, OpCompressor, OpDecompressor, OpGroupingManager, @@ -257,6 +258,7 @@ import { type IPendingLocalState, PendingStateManager, type PendingBatchResubmitMetadata, + type IPendingMessage, } from "./pendingStateManager.js"; import { BatchRunCounter, RunCounter } from "./runCounter.js"; import { @@ -475,6 +477,18 @@ export interface ContainerRuntimeOptions { */ readonly createBlobPayloadPending: true | undefined; + /** + * Controls automatic batch flushing during staging mode. + * Normal turn-based/async flush scheduling is suppressed while in staging mode + * until the accumulated batch reaches this many ops, at which point the batch + * is flushed. Incoming ops always break the current batch regardless of this setting. + * + * Set to Infinity to only break batches on system events (incoming ops). + * + * @defaultValue `largeBatchThreshold` (currently 1000) + */ + readonly stagingModeAutoFlushThreshold: number; + /** * When this property is set to true, the runtime will never send DocumentSchemaChange ops * and will throw an error if any incoming DocumentSchemaChange ops are received. @@ -613,6 +627,16 @@ const defaultMaxBatchSizeInBytes = 700 * 1024; const defaultChunkSizeInBytes = 204800; +/** + * Default maximum ops per staging-mode batch before automatic flush scheduling resumes. + * + * Chosen based on production telemetry: copy-paste operations routinely produce batches + * of 1000+ ops (435K instances over 30 days), and receivers on modern Fluid versions + * handle them without issues. Uses {@link largeBatchThreshold} to stay aligned with + * the existing "large batch" telemetry threshold ({@link OpGroupingManager}). + */ +const defaultStagingModeAutoFlushThreshold = largeBatchThreshold; + /** * The default time to wait for pending ops to be processed during summarization */ @@ -967,6 +991,7 @@ export class ContainerRuntime loadSequenceNumberVerification: "close", maxBatchSizeInBytes: defaultMaxBatchSizeInBytes, chunkSizeInBytes: defaultChunkSizeInBytes, + stagingModeAutoFlushThreshold: defaultStagingModeAutoFlushThreshold, disableSchemaUpgrade: false, }; @@ -993,6 +1018,7 @@ export class ContainerRuntime ? disabledCompressionConfig : defaultConfigs.compressionOptions, createBlobPayloadPending = defaultConfigs.createBlobPayloadPending, + stagingModeAutoFlushThreshold = defaultConfigs.stagingModeAutoFlushThreshold, disableSchemaUpgrade = defaultConfigs.disableSchemaUpgrade, }: IContainerRuntimeOptionsInternal = runtimeOptions; @@ -1223,6 +1249,7 @@ export class ContainerRuntime enableGroupedBatching, explicitSchemaControl, createBlobPayloadPending, + stagingModeAutoFlushThreshold, disableSchemaUpgrade, }; @@ -1402,6 +1429,7 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; + private readonly stagingModeAutoFlushThreshold: number; /** * BatchId tracking is needed whenever there's a possibility of a "forked Container", * where the same local state is pending in two different running Containers, each of @@ -1834,6 +1862,10 @@ export class ContainerRuntime this.closeFn(error); throw error; } + this.stagingModeAutoFlushThreshold = + this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ?? + runtimeOptions.stagingModeAutoFlushThreshold ?? + defaultStagingModeAutoFlushThreshold; this.batchIdTrackingEnabled = this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ?? this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ?? @@ -3613,20 +3645,39 @@ export class ContainerRuntime // since we mark whole batches as "staged" or not to indicate whether to submit them. this.flush(); - const exitStagingMode = (discardOrCommit: () => void): void => { + const exitStagingMode = ( + discardOrCommit: () => IPendingMessage["batchInfo"][], + exitMethod: "commit" | "discard", + ): void => { try { - // Final flush of any last staged changes - // NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c - this.outbox.flush(); - - this.stageControls = undefined; - - // During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops). - // Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode. - this.submitIdAllocationOpIfNeeded({ staged: false }); - discardOrCommit(); - - this.channelCollection.notifyStagingMode(false); + PerformanceEvent.timedExec( + this.mc.logger, + { + eventName: `ExitStagingMode_${exitMethod}`, + }, + (event) => { + // Final flush of any last staged changes + // NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c + this.outbox.flush(); + + this.stageControls = undefined; + + // During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops). + // Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode. + this.submitIdAllocationOpIfNeeded({ staged: false }); + const batchInfos = discardOrCommit(); + event.reportProgress({ + details: { + autoFlushThreshold: this.stagingModeAutoFlushThreshold, + batches: batchInfos.length, + batchesAtOrOverThreshold: batchInfos.filter( + (b) => b.length >= this.stagingModeAutoFlushThreshold, + ).length, + }, + }); + this.channelCollection.notifyStagingMode(false); + }, + ); } catch (error) { const normalizedError = normalizeError(error); this.closeFn(normalizedError); @@ -3638,21 +3689,24 @@ export class ContainerRuntime discardChanges: () => exitStagingMode(() => { // Pop all staged batches from the PSM and roll them back in LIFO order - this.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => { - this.rollbackStagedChange(runtimeOp, localOpMetadata); - }); + const batchInfos = this.pendingStateManager.popStagedBatches( + ({ runtimeOp, localOpMetadata }) => { + this.rollbackStagedChange(runtimeOp, localOpMetadata); + }, + ); this.updateDocumentDirtyState(); - }), + return batchInfos; + }, "discard"), commitChanges: (options) => { const { squash } = { ...defaultStagingCommitOptions, ...options }; exitStagingMode(() => { // Replay all staged batches in typical FIFO order. // We'll be out of staging mode so they'll be sent to the service finally. - this.pendingStateManager.replayPendingStates({ + return this.pendingStateManager.replayPendingStates({ committingStagedBatches: true, squash, }); - }); + }, "commit"); }, }; @@ -4783,6 +4837,20 @@ export class ContainerRuntime } private scheduleFlush(): void { + // During staging mode, suppress automatic flush scheduling until the main batch + // reaches or exceeds the threshold. + // Incoming ops still break the batch via direct this.flush() calls elsewhere + // (deltaManager "op" handler, process(), connection changes, getPendingLocalState, + // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. + // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects + // sequence number changes and throws if unexpected changes are detected. + if ( + this.inStagingMode && + this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold + ) { + return; + } + if (this.flushScheduled) { return; } diff --git a/packages/runtime/container-runtime/src/opLifecycle/index.ts b/packages/runtime/container-runtime/src/opLifecycle/index.ts index 051612b5245a..d898f99f84d9 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/index.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/index.ts @@ -45,6 +45,7 @@ export { } from "./remoteMessageProcessor.js"; export { type EmptyGroupedBatch, + largeBatchThreshold, OpGroupingManager, type OpGroupingManagerConfig, isGroupedBatch, diff --git a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts index 3a8d6a857d5d..1a1bb70ee02d 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts @@ -17,6 +17,13 @@ import type { OutboundSingletonBatch, } from "./definitions.js"; +/** + * The number of ops in a batch above which the batch is considered "large" + * for telemetry purposes. Used by both {@link OpGroupingManager} (GroupLargeBatch event) + * and as the default staging-mode auto-flush threshold. + */ +export const largeBatchThreshold = 1000; + /** * Grouping makes assumptions about the shape of message contents. This interface codifies those assumptions, but does not validate them. */ @@ -123,7 +130,10 @@ export class OpGroupingManager { return batch as OutboundSingletonBatch; } - if (batch.messages.length >= 1000) { + // Use > (not >=) so that batches flushed exactly at the staging-mode + // auto-flush threshold (which defaults to largeBatchThreshold) don't + // trigger this event. Only genuinely oversized batches are logged. + if (batch.messages.length > largeBatchThreshold) { this.logger.sendTelemetryEvent({ eventName: "GroupLargeBatch", length: batch.messages.length, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index a240aeb2fc14..d5e24970b11b 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -431,7 +431,7 @@ export class PendingStateManager implements IDisposable { clientId !== undefined, 0xa33 /* clientId (from stateHandler) could only be undefined if we've never connected, but we have a CSN so we know that's not the case */, ); - + const batchInfo = { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }; for (const message of batch) { const { runtimeOp, @@ -446,8 +446,7 @@ export class PendingStateManager implements IDisposable { runtimeOp, localOpMetadata, opMetadata, - // Note: We only will read this off the first message, but put it on all for simplicity - batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }, + batchInfo, }; this.pendingMessages.push(pendingMessage); } @@ -751,8 +750,12 @@ export class PendingStateManager implements IDisposable { * Called when the Container's connection state changes. If the Container gets connected, it replays all the pending * states in its queue. This includes triggering resubmission of unacked ops. * ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer) + * + * @returns The unique batch infos for all batches that were replayed. */ - public replayPendingStates(options?: ReplayPendingStateOptions): void { + public replayPendingStates( + options?: ReplayPendingStateOptions, + ): IPendingMessage["batchInfo"][] { const { committingStagedBatches, squash } = { ...defaultReplayPendingStatesOptions, ...options, @@ -779,6 +782,7 @@ export class PendingStateManager implements IDisposable { const initialPendingMessagesCount = this.pendingMessages.length; let remainingPendingMessagesCount = this.pendingMessages.length; + const replayedBatchSet = new Set(); let seenStagedBatch = false; @@ -816,6 +820,7 @@ export class PendingStateManager implements IDisposable { if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) { // Resubmit no messages, with the batchId. Will result in another empty batch marker. this.stateHandler.reSubmitBatch([], { batchId, staged, squash }); + replayedBatchSet.add(pendingMessage.batchInfo); continue; } @@ -842,6 +847,7 @@ export class PendingStateManager implements IDisposable { ], { batchId, staged, squash }, ); + replayedBatchSet.add(pendingMessage.batchInfo); continue; } // else: batchMetadataFlag === true (It's a typical multi-message batch) @@ -881,6 +887,7 @@ export class PendingStateManager implements IDisposable { } this.stateHandler.reSubmitBatch(batch, { batchId, staged, squash }); + replayedBatchSet.add(pendingMessage.batchInfo); } if (!committingStagedBatches) { @@ -898,6 +905,8 @@ export class PendingStateManager implements IDisposable { clientId: this.stateHandler.clientId(), }); } + + return [...replayedBatchSet]; } /** @@ -908,11 +917,13 @@ export class PendingStateManager implements IDisposable { // callback will only be given staged messages with a valid runtime op (i.e. not empty batch and not an initial message with only serialized content) stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage }, ) => void, - ): void { + ): IPendingMessage["batchInfo"][] { + const batchSet = new Set(); while (!this.pendingMessages.isEmpty()) { const stagedMessage = this.pendingMessages.peekBack(); if (stagedMessage?.batchInfo.staged === true) { this.pendingMessages.pop(); + batchSet.add(stagedMessage.batchInfo); if (hasTypicalRuntimeOp(stagedMessage)) { callback(stagedMessage); @@ -925,6 +936,7 @@ export class PendingStateManager implements IDisposable { this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true), 0xb89 /* Shouldn't be any more staged messages */, ); + return [...batchSet]; } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 729a0d088226..14f0f6b00620 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1170,7 +1170,7 @@ describe("Runtime", () => { const getMockPendingStateManager = (): PendingStateManager => { let pendingMessages = 0; return { - replayPendingStates: () => {}, + replayPendingStates: () => [], hasPendingMessages: (): boolean => pendingMessages > 0, hasPendingUserChanges: (): boolean => pendingMessages > 0, processInboundMessages: (inbound: InboundMessageResult, _local: boolean) => { @@ -1795,6 +1795,7 @@ describe("Runtime", () => { enableGroupedBatching: true, // Redundant, but makes the JSON.stringify yield the same result as the logs explicitSchemaControl: false, createBlobPayloadPending: undefined, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, } as const satisfies ContainerRuntimeOptionsInternal; const mergedRuntimeOptions = { ...defaultRuntimeOptions, ...runtimeOptions } as const; @@ -3754,6 +3755,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -3814,6 +3816,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: false, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -3853,6 +3856,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -3892,6 +3896,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -3930,6 +3935,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -3976,6 +3982,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: "on", enableGroupedBatching: false, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -4037,6 +4044,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, // idCompressor is undefined, since that represents a logical state (off) enableGroupedBatching: true, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -4078,6 +4086,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + stagingModeAutoFlushThreshold: 1000, disableSchemaUpgrade: false, }; @@ -4367,6 +4376,566 @@ describe("Runtime", () => { assert.equal(containerRuntime.isDirty, false, "Runtime should not be dirty anymore"); }); + + describe("stagingModeAutoFlushThreshold", () => { + let runtimeWithThreshold: ContainerRuntime_WithPrivates; + let mockContext: Partial; + + async function createRuntimeWithThreshold( + threshold: number, + ): Promise { + mockContext = getMockContext() as IContainerContext; + return (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: threshold, + // Disable grouped batching so each op is individually submitted to the wire, + // making it easier to verify op counts. + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + } + + afterEach(() => { + runtimeWithThreshold?.dispose(); + runtimeWithThreshold = undefined as unknown as ContainerRuntime_WithPrivates; + }); + + it("ops accumulate under threshold during staging mode", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(10); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + // Submit 5 ops across multiple turns — under the threshold of 10 + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "4", genTestDataStoreMessage("op4")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "5", genTestDataStoreMessage("op5")); + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 0, + "No ops should be submitted while under threshold in staging mode", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 5, + "All 5 ops should be in the outbox", + ); + + controls.commitChanges(); + assert.equal( + submittedOps.length, + 5, + "All 5 ops should be submitted after commitChanges", + ); + }); + + it("ops flush when threshold is reached", async () => { + const logger = new MockLogger(); + const threshold = 3; + mockContext = getMockContext({ logger }) as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: threshold, + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + // Submit 3 ops — exactly at the threshold + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal(submittedOps.length, 0, "Under threshold, no flush yet"); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox", + ); + + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); + // The 3rd op reaches the threshold, so scheduleFlush falls through to normal scheduling + await Promise.resolve(); + + // Ops are not submitted to the wire during staging mode, but the outbox should be + // emptied (ops moved from outbox into PendingStateManager as a staged batch). + assert.equal( + submittedOps.length, + 0, + "Ops should not be submitted to wire while in staging mode", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty after threshold flush", + ); + + // Exit staging mode and verify perf event includes auto-flush count + controls.commitChanges(); + logger.assertMatchAny([ + { + eventName: "ContainerRuntime:ExitStagingMode_commit_end", + category: "performance", + }, + ]); + }); + + it("incoming ops break the batch regardless of threshold", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal(submittedOps.length, 0, "No ops submitted yet"); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox", + ); + + // Simulate an incoming op — bumps lastSequenceNumber and emits "op" + // The deltaManager "op" handler calls this.flush() directly, + // which moves pending ops from outbox into PSM (as staged batches). + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); + + // Ops are not submitted to the wire during staging mode, but the incoming op + // should have flushed the outbox into PendingStateManager as a staged batch. + assert.equal( + submittedOps.length, + 0, + "Ops should not be submitted to wire while in staging mode", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty after incoming op flush", + ); + }); + + it("exit staging mode flushes remaining ops", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); + assert.equal(submittedOps.length, 0, "No ops submitted while staging"); + + controls.commitChanges(); + + assert(submittedOps.length > 0, "Ops should be submitted after commitChanges"); + }); + + it("has no effect outside staging mode", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + // Submit ops without entering staging mode + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + + // Normal turn-based flush should still happen + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 1, + "Op should flush normally when not in staging mode", + ); + }); + + it("default threshold suppresses turn-based flushing during staging mode", async () => { + // Create runtime WITHOUT explicit stagingModeAutoFlushThreshold — uses the default (1000) + mockContext = getMockContext() as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: {}, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + // Submit a few ops across turns — well under the default threshold + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 0, + "Default threshold should suppress turn-based flushing during staging mode", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "Both ops should still be in the outbox (unflushed)", + ); + + controls.commitChanges(); + }); + + it("discardChanges flushes outbox before rollback", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + const channelCollectionStub = stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp( + runtimeWithThreshold, + "1", + genTestDataStoreMessage("op1"), + "META1", + ); + submitDataStoreOp( + runtimeWithThreshold, + "2", + genTestDataStoreMessage("op2"), + "META2", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox before discard", + ); + + controls.discardChanges(); + + // Outbox should have been drained before rollback + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty after discard", + ); + // Staged ops should be rolled back (LIFO order) + assert.equal( + channelCollectionStub.rollbackDataStoreOp.callCount, + 2, + "Both ops should be rolled back", + ); + // Nothing sent to wire + assert.equal( + submittedOps.length, + 0, + "No ops should be submitted to wire after discard", + ); + }); + + it("config override takes precedence over runtime option", async () => { + const configThreshold = 5; + const runtimeOptionThreshold = 50; + mockContext = getMockContext({ + settings: { + "Fluid.ContainerRuntime.StagingModeAutoFlushThreshold": configThreshold, + }, + }) as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: runtimeOptionThreshold, + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).stagingModeAutoFlushThreshold, + configThreshold, + "Config override threshold (5) should win over runtime option (50)", + ); + }); + + it("runtime option takes precedence over default", async () => { + const runtimeOptionThreshold = 5; + runtimeWithThreshold = await createRuntimeWithThreshold(runtimeOptionThreshold); + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).stagingModeAutoFlushThreshold, + runtimeOptionThreshold, + "Runtime option (5) should win over default (1000)", + ); + }); + + it("incoming non-runtime op breaks batch during staging mode", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox", + ); + + // Simulate an incoming non-runtime op (e.g. ClientJoin signal) + // The deltaManager "op" handler calls this.flush() directly. + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); + + assert.equal(submittedOps.length, 0, "No ops sent to wire during staging mode"); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty — non-runtime op should break the batch", + ); + }); + + it("IdAllocation + reconnect while in staging mode does not hit coherency check", async () => { + // This is the highest-risk scenario from Mark's test plan. + // The fix in b4e1fd1dd25 added scheduleFlush() after submitIdAllocationOpIfNeeded + // during replayPendingStates. With threshold suppression, that scheduleFlush() + // returns early if in staging mode and under threshold. But the "op" handler + // calls flush() directly, so the IdAllocation op still gets flushed before + // new ops with different refSeqs are submitted. + + // Start disconnected so we can trigger IdAllocation on reconnect + mockContext = getMockContext({ connected: false }) as IContainerContext; + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: Infinity, + enableRuntimeIdCompressor: "on", + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + // Generate 1st compressed ID while disconnected — queues an IdAllocation op + runtimeWithThreshold.idCompressor?.generateCompressedId(); + + // Reconnect — replayPendingStates submits IdAllocation op + calls scheduleFlush() + // scheduleFlush() returns early due to staging mode threshold suppression + changeConnectionState(runtimeWithThreshold, true, mockClientId); + + // Simulate a remote op arriving (bumps refSeq) + // The "op" handler calls this.flush() directly, draining the IdAllocation op + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); + + // Generate 2nd compressed ID — its IdAllocation op has a new refSeq + const id2 = runtimeWithThreshold.idCompressor?.generateCompressedId(); + + // This would throw outboxSequenceNumberCoherencyCheck if the first IdAllocation + // op wasn't flushed before the refSeq changed. + assert.doesNotThrow( + () => + submitDataStoreOp( + runtimeWithThreshold, + "someDS", + genTestDataStoreMessage({ id: id2 }), + ), + "Should not throw coherency check — IdAllocation op should have been flushed by the 'op' handler", + ); + }); + }); + + it("enterStagingMode flushes any pending outbox contents as non-staged", async () => { + const context = getMockContext() as IContainerContext; + const runtime = (await ContainerRuntime.loadRuntime2({ + context, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtime); + submittedOps.length = 0; + + // Submit ops before entering staging mode (not yet flushed — still in outbox) + submitDataStoreOp(runtime, "1", genTestDataStoreMessage("pre1")); + submitDataStoreOp(runtime, "2", genTestDataStoreMessage("pre2")); + assert.equal(submittedOps.length, 0, "Ops not yet flushed"); + + // Enter staging mode — should flush pre-staging ops as non-staged + const controls = runtime.enterStagingMode(); + assert.equal( + submittedOps.length, + 2, + "Pre-staging ops should be flushed on enterStagingMode", + ); + + // Submit more ops while in staging mode + submitDataStoreOp(runtime, "3", genTestDataStoreMessage("staged1")); + runtime.flush(); + assert.equal(submittedOps.length, 2, "Staged ops should NOT be submitted to wire"); + + controls.commitChanges(); + assert.equal(submittedOps.length, 3, "All ops should be submitted after commit"); + runtime.dispose(); + }); + + it("reconnect breaks batch during staging mode", async () => { + const context = getMockContext() as IContainerContext; + const runtime = (await ContainerRuntime.loadRuntime2({ + context, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtime); + submittedOps.length = 0; + + runtime.enterStagingMode(); + + // Submit ops while connected (sitting in outbox under threshold) + submitDataStoreOp(runtime, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtime, "2", genTestDataStoreMessage("op2")); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtime as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox before disconnect", + ); + + // Disconnect + changeConnectionState(runtime, false, "disconnectedClientId"); + + // Reconnect — triggers flush and replayPendingStates + changeConnectionState(runtime, true, mockClientId); + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtime as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be drained after reconnect", + ); + runtime.dispose(); + }); + + it("reconnect resubmits pre-staged batches", async () => { + const context = getMockContext() as IContainerContext; + const runtime = (await ContainerRuntime.loadRuntime2({ + context, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtime); + submittedOps.length = 0; + + // Submit ops BEFORE entering staging mode + submitDataStoreOp(runtime, "pre1", genTestDataStoreMessage("pre-staging-op")); + await Promise.resolve(); + assert.equal(submittedOps.length, 1, "Pre-staging op should be submitted"); + + // Enter staging mode and submit more ops + const controls = runtime.enterStagingMode(); + submitDataStoreOp( + runtime, + "staged1", + genTestDataStoreMessage("staged-op"), + "STAGED_META", + ); + runtime.flush(); + assert.equal(submittedOps.length, 1, "Staged op should not be submitted to wire"); + + // Disconnect + changeConnectionState(runtime, false, "disconnectedClientId"); + submittedOps.length = 0; + + // Reconnect — replayPendingStates resubmits all pending ops + changeConnectionState(runtime, true, mockClientId); + await Promise.resolve(); + + // The pre-staging op should be resubmitted to the wire. + // The staged op stays in PSM as staged (not sent to wire). + assert( + submittedOps.length > 0, + "Pre-staging op should be resubmitted after reconnect", + ); + const opsAfterReconnect = submittedOps.length; + + // Verify we can still commit the staged changes + controls.commitChanges(); + assert( + submittedOps.length > opsAfterReconnect, + "Staged op should be submitted after commitChanges", + ); + runtime.dispose(); + }); }); }); }); diff --git a/packages/service-clients/azure-client/src/test/AzureClient.spec.ts b/packages/service-clients/azure-client/src/test/AzureClient.spec.ts index 67e90a040afc..20875f56d72d 100644 --- a/packages/service-clients/azure-client/src/test/AzureClient.spec.ts +++ b/packages/service-clients/azure-client/src/test/AzureClient.spec.ts @@ -416,6 +416,7 @@ for (const compatibilityMode of ["1", "2"] as const) { explicitSchemaControl: false, createBlobPayloadPending: undefined, disableSchemaUpgrade: false, + stagingModeAutoFlushThreshold: 1000, } as const satisfies ContainerRuntimeOptionsInternal; const expectedRuntimeOptions2 = { summaryOptions: {}, @@ -433,6 +434,7 @@ for (const compatibilityMode of ["1", "2"] as const) { explicitSchemaControl: true, createBlobPayloadPending: undefined, disableSchemaUpgrade: false, + stagingModeAutoFlushThreshold: 1000, } as const satisfies ContainerRuntimeOptionsInternal; const expectedRuntimeOptions = diff --git a/packages/test/test-service-load/src/optionsMatrix.ts b/packages/test/test-service-load/src/optionsMatrix.ts index 15e6c8039f98..12eaae8287f7 100644 --- a/packages/test/test-service-load/src/optionsMatrix.ts +++ b/packages/test/test-service-load/src/optionsMatrix.ts @@ -119,6 +119,7 @@ export function generateRuntimeOptions( createBlobPayloadPending: [true, undefined], explicitSchemaControl: [true, false], disableSchemaUpgrade: [false], + stagingModeAutoFlushThreshold: [undefined], }; const pairwiseOptions = generatePairwiseOptions(