From b0f47818b1b5f06da0577865cd6f87910040b4dc Mon Sep 17 00:00:00 2001 From: anthony-murphy-agent Date: Thu, 26 Feb 2026 16:17:37 -0800 Subject: [PATCH 01/21] Add stagingModeMaxBatchOps to control batch boundaries during staging mode During staging mode, the runtime flushes ops into separate staged batches at every JS turn boundary. This means consumers like Word that want to accumulate ops across many turns into fewer, larger batches get fragmented results. Add a `stagingModeMaxBatchOps` option to `ContainerRuntimeOptionsInternal` that suppresses automatic (turn-based/async) flush scheduling during staging mode until the accumulated batch reaches the specified op count. Incoming ops still break the current batch regardless (they change the reference sequence number via direct flush() calls that bypass scheduleFlush()). Default: 1000 ops. This was chosen based on production telemetry analysis: - Copy-paste operations routinely produce batches of 1000+ ops (435K instances of >=1000 ops observed over 30 days via GroupLargeBatch telemetry) - All are non-reentrant single-turn batches from normal user actions - Receivers on modern Fluid versions (2.74+) handle these without jank (p99 processing duration ~5ms for typical batches) - 1000 matches the existing "large batch" telemetry threshold in OpGroupingManager - The threshold only affects cross-turn accumulation; single-turn operations (like paste) are unaffected since all ops are submitted synchronously Consumers can override: set to Infinity to only break batches on system events, or to a lower value for tighter batch control. Co-Authored-By: anthony-murphy Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/containerCompatibility.ts | 1 + .../container-runtime/src/containerRuntime.ts | 41 ++++ .../src/test/containerRuntime.spec.ts | 181 ++++++++++++++++++ 3 files changed, 223 insertions(+) diff --git a/packages/runtime/container-runtime/src/containerCompatibility.ts b/packages/runtime/container-runtime/src/containerCompatibility.ts index 638f5ae5e8b4..4590eeb1ace7 100644 --- a/packages/runtime/container-runtime/src/containerCompatibility.ts +++ b/packages/runtime/container-runtime/src/containerCompatibility.ts @@ -43,6 +43,7 @@ export type RuntimeOptionsAffectingDocSchema = Omit< | "maxBatchSizeInBytes" | "loadSequenceNumberVerification" | "summaryOptions" + | "stagingModeMaxBatchOps" >; /** diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index f56287ea9668..bf47b4f4acf3 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -513,6 +513,18 @@ export interface ContainerRuntimeOptionsInternal extends ContainerRuntimeOptions * In that case, batched messages will be sent individually (but still all at the same time). */ readonly enableGroupedBatching: boolean; + + /** + * Controls automatic batch flushing during staging mode. + * Normal turn-based/async flush scheduling is suppressed while in staging mode + * until the accumulated batch reaches this many ops, at which point the batch + * is flushed. Incoming ops always break the current batch regardless of this setting. + * + * Set to Infinity to only break batches on system events (incoming ops). + * + * @defaultValue 1000 + */ + readonly stagingModeMaxBatchOps?: number; } /** @@ -608,6 +620,16 @@ const defaultMaxBatchSizeInBytes = 700 * 1024; const defaultChunkSizeInBytes = 204800; +/** + * Default maximum ops per staging-mode batch before automatic flush scheduling resumes. + * + * Chosen based on production telemetry: copy-paste operations routinely produce batches + * of 1000+ ops (435K instances over 30 days), and receivers on modern Fluid versions + * handle them without issues. 1000 also matches the existing "large batch" telemetry + * threshold ({@link OpGroupingManager}). + */ +const defaultStagingModeMaxBatchOps = 1000; + /** * The default time to wait for pending ops to be processed during summarization */ @@ -987,6 +1009,7 @@ export class ContainerRuntime ? disabledCompressionConfig : defaultConfigs.compressionOptions, createBlobPayloadPending = defaultConfigs.createBlobPayloadPending, + stagingModeMaxBatchOps, }: IContainerRuntimeOptionsInternal = runtimeOptions; // If explicitSchemaControl is off, ensure that options which require explicitSchemaControl are not enabled. @@ -1215,6 +1238,7 @@ export class ContainerRuntime enableGroupedBatching, explicitSchemaControl, createBlobPayloadPending, + stagingModeMaxBatchOps, }; validateMinimumVersionForCollab(updatedMinVersionForCollab); @@ -1393,6 +1417,7 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; + private readonly stagingModeMaxBatchOps: number | undefined; /** * BatchId tracking is needed whenever there's a possibility of a "forked Container", * where the same local state is pending in two different running Containers, each of @@ -1849,6 +1874,7 @@ export class ContainerRuntime } else { this._flushMode = runtimeOptions.flushMode; } + this.stagingModeMaxBatchOps = runtimeOptions.stagingModeMaxBatchOps; this.batchIdTrackingEnabled = this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ?? this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ?? @@ -4809,6 +4835,21 @@ export class ContainerRuntime } private scheduleFlush(): void { + // During staging mode with a batch threshold, suppress automatic flush scheduling. + // Only flush when the main batch exceeds the threshold. + // Incoming ops still break the batch via direct this.flush() calls elsewhere + // (deltaManager "op" handler, process(), connection changes, getPendingLocalState, + // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. + // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects + // sequence number changes and forces a flush as a safety net. + if ( + this.inStagingMode && + this.outbox.mainBatchMessageCount < + (this.stagingModeMaxBatchOps ?? defaultStagingModeMaxBatchOps) + ) { + return; + } + if (this.flushScheduled) { return; } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 952d2839ea7e..c75c7fda4faf 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4441,6 +4441,187 @@ describe("Runtime", () => { assert.equal(containerRuntime.isDirty, false, "Runtime should not be dirty anymore"); }); + + describe("stagingModeMaxBatchOps", () => { + let runtimeWithThreshold: ContainerRuntime_WithPrivates; + let mockContext: Partial; + + async function createRuntimeWithThreshold( + threshold: number, + ): Promise { + mockContext = getMockContext() as IContainerContext; + return (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeMaxBatchOps: threshold, + // Disable grouped batching so each op is individually submitted to the wire, + // making it easier to verify op counts. + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + } + + afterEach(() => { + runtimeWithThreshold?.dispose(); + runtimeWithThreshold = undefined as unknown as ContainerRuntime_WithPrivates; + }); + + it("ops accumulate under threshold during staging mode", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(10); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + // Submit 5 ops across multiple turns — under the threshold of 10 + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "4", genTestDataStoreMessage("op4")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "5", genTestDataStoreMessage("op5")); + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 0, + "No ops should be submitted while under threshold in staging mode", + ); + + controls.commitChanges(); + assert.equal( + submittedOps.length, + 5, + "All 5 ops should be submitted after commitChanges", + ); + }); + + it("ops flush when threshold is reached", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(3); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + // Submit 3 ops — exactly at the threshold + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal(submittedOps.length, 0, "Under threshold, no flush yet"); + + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); + // The 3rd op should trigger scheduleFlush to fall through to normal scheduling + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 0, + "Ops should not be submitted while in staging mode (flushed into PSM only)", + ); + }); + + it("incoming ops break the batch regardless of threshold", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal(submittedOps.length, 0, "No ops submitted yet"); + + // Simulate an incoming op — bumps lastSequenceNumber and emits "op" + // The deltaManager "op" handler calls this.flush() directly, + // which moves pending ops from outbox into PSM (as staged batches). + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); + + // Ops are not submitted to the wire during staging mode — they're flushed to PSM + assert.equal( + submittedOps.length, + 0, + "Ops should not be submitted to wire while in staging mode", + ); + }); + + it("exit staging mode flushes remaining ops", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); + assert.equal(submittedOps.length, 0, "No ops submitted while staging"); + + controls.commitChanges(); + + assert(submittedOps.length > 0, "Ops should be submitted after commitChanges"); + }); + + it("has no effect outside staging mode", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + // Submit ops without entering staging mode + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + + // Normal turn-based flush should still happen + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 1, + "Op should flush normally when not in staging mode", + ); + }); + + it("default threshold suppresses turn-based flushing during staging mode", async () => { + // Create runtime WITHOUT explicit stagingModeMaxBatchOps — uses the default (1000) + mockContext = getMockContext() as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: {}, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + // Submit a few ops across turns — well under the default threshold + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + await Promise.resolve(); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + await Promise.resolve(); + + assert.equal( + submittedOps.length, + 0, + "Default threshold should suppress turn-based flushing during staging mode", + ); + + controls.commitChanges(); + }); + }); }); }); }); From 0a3a73daa6601740324f7909f4eff0ea96712b98 Mon Sep 17 00:00:00 2001 From: anthony-murphy-agent Date: Thu, 26 Feb 2026 16:24:33 -0800 Subject: [PATCH 02/21] Rename stagingModeMaxBatchOps to stagingModeAutoFlushThreshold The option controls when automatic flush scheduling kicks in, not a cap on batch size. A batch can contain far more ops if a single synchronous turn pushes many ops past the threshold (e.g. paste). The new name makes it clear that only automatic/scheduled flushes are affected, not direct flush calls from incoming ops, connection changes, or exit staging mode. Co-Authored-By: anthony-murphy Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/containerCompatibility.ts | 2 +- .../container-runtime/src/containerRuntime.ts | 14 +++++++------- .../src/test/containerRuntime.spec.ts | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerCompatibility.ts b/packages/runtime/container-runtime/src/containerCompatibility.ts index 4590eeb1ace7..a89b937d6909 100644 --- a/packages/runtime/container-runtime/src/containerCompatibility.ts +++ b/packages/runtime/container-runtime/src/containerCompatibility.ts @@ -43,7 +43,7 @@ export type RuntimeOptionsAffectingDocSchema = Omit< | "maxBatchSizeInBytes" | "loadSequenceNumberVerification" | "summaryOptions" - | "stagingModeMaxBatchOps" + | "stagingModeAutoFlushThreshold" >; /** diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index bf47b4f4acf3..5c16ab3df92b 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -524,7 +524,7 @@ export interface ContainerRuntimeOptionsInternal extends ContainerRuntimeOptions * * @defaultValue 1000 */ - readonly stagingModeMaxBatchOps?: number; + readonly stagingModeAutoFlushThreshold?: number; } /** @@ -628,7 +628,7 @@ const defaultChunkSizeInBytes = 204800; * handle them without issues. 1000 also matches the existing "large batch" telemetry * threshold ({@link OpGroupingManager}). */ -const defaultStagingModeMaxBatchOps = 1000; +const defaultStagingModeAutoFlushThreshold = 1000; /** * The default time to wait for pending ops to be processed during summarization @@ -1009,7 +1009,7 @@ export class ContainerRuntime ? disabledCompressionConfig : defaultConfigs.compressionOptions, createBlobPayloadPending = defaultConfigs.createBlobPayloadPending, - stagingModeMaxBatchOps, + stagingModeAutoFlushThreshold, }: IContainerRuntimeOptionsInternal = runtimeOptions; // If explicitSchemaControl is off, ensure that options which require explicitSchemaControl are not enabled. @@ -1238,7 +1238,7 @@ export class ContainerRuntime enableGroupedBatching, explicitSchemaControl, createBlobPayloadPending, - stagingModeMaxBatchOps, + stagingModeAutoFlushThreshold, }; validateMinimumVersionForCollab(updatedMinVersionForCollab); @@ -1417,7 +1417,7 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; - private readonly stagingModeMaxBatchOps: number | undefined; + private readonly stagingModeAutoFlushThreshold: number | undefined; /** * BatchId tracking is needed whenever there's a possibility of a "forked Container", * where the same local state is pending in two different running Containers, each of @@ -1874,7 +1874,7 @@ export class ContainerRuntime } else { this._flushMode = runtimeOptions.flushMode; } - this.stagingModeMaxBatchOps = runtimeOptions.stagingModeMaxBatchOps; + this.stagingModeAutoFlushThreshold = runtimeOptions.stagingModeAutoFlushThreshold; this.batchIdTrackingEnabled = this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ?? this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ?? @@ -4845,7 +4845,7 @@ export class ContainerRuntime if ( this.inStagingMode && this.outbox.mainBatchMessageCount < - (this.stagingModeMaxBatchOps ?? defaultStagingModeMaxBatchOps) + (this.stagingModeAutoFlushThreshold ?? defaultStagingModeAutoFlushThreshold) ) { return; } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index c75c7fda4faf..20c217d36531 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4442,7 +4442,7 @@ describe("Runtime", () => { assert.equal(containerRuntime.isDirty, false, "Runtime should not be dirty anymore"); }); - describe("stagingModeMaxBatchOps", () => { + describe("stagingModeAutoFlushThreshold", () => { let runtimeWithThreshold: ContainerRuntime_WithPrivates; let mockContext: Partial; @@ -4455,7 +4455,7 @@ describe("Runtime", () => { registry: new FluidDataStoreRegistry([]), existing: false, runtimeOptions: { - stagingModeMaxBatchOps: threshold, + stagingModeAutoFlushThreshold: threshold, // Disable grouped batching so each op is individually submitted to the wire, // making it easier to verify op counts. enableGroupedBatching: false, @@ -4593,7 +4593,7 @@ describe("Runtime", () => { }); it("default threshold suppresses turn-based flushing during staging mode", async () => { - // Create runtime WITHOUT explicit stagingModeMaxBatchOps — uses the default (1000) + // Create runtime WITHOUT explicit stagingModeAutoFlushThreshold — uses the default (1000) mockContext = getMockContext() as IContainerContext; runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ context: mockContext as IContainerContext, From ebf2e663c7a1a5ab7f59c02bd3e4f7c1685499d2 Mon Sep 17 00:00:00 2001 From: anthony-murphy-agent Date: Thu, 26 Feb 2026 16:26:39 -0800 Subject: [PATCH 03/21] Apply default at assignment and add config override Address PR feedback: - Field is now always `number` (not `number | undefined`), with the default applied at construction time - Add config override via Fluid.ContainerRuntime.StagingModeAutoFlushThreshold for runtime tuning without code changes - Config override takes precedence over runtime option, which takes precedence over the default (1000) Co-Authored-By: anthony-murphy Co-Authored-By: Claude Opus 4.6 (1M context) --- .../runtime/container-runtime/src/containerRuntime.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 5c16ab3df92b..1a83fef381b8 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1417,7 +1417,7 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; - private readonly stagingModeAutoFlushThreshold: number | undefined; + private readonly stagingModeAutoFlushThreshold: number; /** * BatchId tracking is needed whenever there's a possibility of a "forked Container", * where the same local state is pending in two different running Containers, each of @@ -1874,7 +1874,10 @@ export class ContainerRuntime } else { this._flushMode = runtimeOptions.flushMode; } - this.stagingModeAutoFlushThreshold = runtimeOptions.stagingModeAutoFlushThreshold; + this.stagingModeAutoFlushThreshold = + this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ?? + runtimeOptions.stagingModeAutoFlushThreshold ?? + defaultStagingModeAutoFlushThreshold; this.batchIdTrackingEnabled = this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ?? this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ?? @@ -4844,8 +4847,7 @@ export class ContainerRuntime // sequence number changes and forces a flush as a safety net. if ( this.inStagingMode && - this.outbox.mainBatchMessageCount < - (this.stagingModeAutoFlushThreshold ?? defaultStagingModeAutoFlushThreshold) + this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold ) { return; } From a977adc24499617c4841c1c4629de69e02d6becf Mon Sep 17 00:00:00 2001 From: anthony-murphy-agent Date: Thu, 26 Feb 2026 16:38:16 -0800 Subject: [PATCH 04/21] Address PR review feedback - Fix comment accuracy: scheduleFlush threshold triggers at "reaches or exceeds", not just "exceeds" - Fix maybeFlushPartialBatch comment: by default it throws on unexpected sequence number changes, only forces a flush when partial-batch flushing is enabled via Fluid.ContainerRuntime.DisableFlushBeforeProcess - Strengthen threshold and incoming-op tests: assert that the outbox is actually emptied (mainBatchMessageCount drops to 0) rather than only checking that nothing was submitted to the wire Co-Authored-By: anthony-murphy Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 8 +++-- .../src/test/containerRuntime.spec.ts | 33 +++++++++++++++++-- 2 files changed, 35 insertions(+), 6 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 1a83fef381b8..a96bc6cb66d6 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -4838,13 +4838,15 @@ export class ContainerRuntime } private scheduleFlush(): void { - // During staging mode with a batch threshold, suppress automatic flush scheduling. - // Only flush when the main batch exceeds the threshold. + // During staging mode, suppress automatic flush scheduling until the main batch + // reaches or exceeds the threshold. // Incoming ops still break the batch via direct this.flush() calls elsewhere // (deltaManager "op" handler, process(), connection changes, getPendingLocalState, // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects - // sequence number changes and forces a flush as a safety net. + // sequence number changes. By default it throws if unexpected changes are detected; it only + // forces a flush as a safety net when partial-batch flushing is enabled via + // Fluid.ContainerRuntime.DisableFlushBeforeProcess. if ( this.inStagingMode && this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 20c217d36531..f5f4058b582a 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4513,15 +4513,29 @@ describe("Runtime", () => { submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); assert.equal(submittedOps.length, 0, "Under threshold, no flush yet"); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox", + ); submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3")); - // The 3rd op should trigger scheduleFlush to fall through to normal scheduling + // The 3rd op reaches the threshold, so scheduleFlush falls through to normal scheduling await Promise.resolve(); + // Ops are not submitted to the wire during staging mode, but the outbox should be + // emptied (ops moved from outbox into PendingStateManager as a staged batch). assert.equal( submittedOps.length, 0, - "Ops should not be submitted while in staging mode (flushed into PSM only)", + "Ops should not be submitted to wire while in staging mode", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty after threshold flush", ); }); @@ -4535,6 +4549,12 @@ describe("Runtime", () => { submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); assert.equal(submittedOps.length, 0, "No ops submitted yet"); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox", + ); // Simulate an incoming op — bumps lastSequenceNumber and emits "op" // The deltaManager "op" handler calls this.flush() directly, @@ -4549,12 +4569,19 @@ describe("Runtime", () => { contents: "test content", }); - // Ops are not submitted to the wire during staging mode — they're flushed to PSM + // Ops are not submitted to the wire during staging mode, but the incoming op + // should have flushed the outbox into PendingStateManager as a staged batch. assert.equal( submittedOps.length, 0, "Ops should not be submitted to wire while in staging mode", ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty after incoming op flush", + ); }); it("exit staging mode flushes remaining ops", async () => { From 7ad47837407c03a267b37b747b7c817ac47877c7 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 13:55:28 -0700 Subject: [PATCH 05/21] Remove DisableFlushBeforeProcess feature flag The kill-bit switch for the flush-before-process simplification has been in production long enough to confirm correctness. Remove the flag, hardcode the default behavior (flush before process), and clean up the partial-batch flushing code path that was only reachable when the flag was enabled. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 34 +---- .../src/opLifecycle/outbox.ts | 17 +-- .../src/test/containerRuntime.spec.ts | 127 +++++++--------- .../src/test/opLifecycle/outbox.spec.ts | 143 ------------------ 4 files changed, 63 insertions(+), 258 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 7e15fc79e85a..ba2f531b5bdb 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1532,13 +1532,6 @@ export class ContainerRuntime return runtimeCompatDetailsForLoader; } - /** - * If true, will skip Outbox flushing before processing an incoming message (and on DeltaManager "op" event for loader back-compat), - * and instead the Outbox will check for a split batch on every submit. - * This is a kill-bit switch for this simplification of logic, in case it causes unexpected issues. - */ - private readonly skipSafetyFlushDuringProcessStack: boolean; - private readonly extensions = new Map(); /***/ @@ -1988,10 +1981,6 @@ export class ContainerRuntime const legacySendBatchFn = makeLegacySendBatchFn(submitFn, this.innerDeltaManager); - this.skipSafetyFlushDuringProcessStack = - // Keep the old flag name even though we renamed the class member (it shipped in 2.31.0) - this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableFlushBeforeProcess") === true; - this.outbox = new Outbox({ shouldSend: () => this.shouldSendOps(), pendingStateManager: this.pendingStateManager, @@ -2002,8 +1991,6 @@ export class ContainerRuntime config: { compressionOptions, maxBatchSizeInBytes: runtimeOptions.maxBatchSizeInBytes, - // If we disable flush before process, we must be ready to flush partial batches - flushPartialBatches: this.skipSafetyFlushDuringProcessStack, }, logger: this.mc.logger, groupingManager: opGroupingManager, @@ -2060,14 +2047,12 @@ export class ContainerRuntime this.lastEmittedDirty = this.computeCurrentDirtyState(); context.updateDirtyContainerState(this.lastEmittedDirty); - if (!this.skipSafetyFlushDuringProcessStack) { - // Reference Sequence Number may have just changed, and it must be consistent across a batch, - // so we should flush now to clear the way for the next ops. - // NOTE: This will be redundant whenever CR.process was called for the op (since we flush there too) - - // But we need this coverage for old loaders that don't call ContainerRuntime.process for non-runtime messages. - // (We have to call flush _before_ processing a runtime op, but after is ok for non-runtime op) - this.deltaManager.on("op", () => this.flush()); - } + // Reference Sequence Number may have just changed, and it must be consistent across a batch, + // so we should flush now to clear the way for the next ops. + // NOTE: This will be redundant whenever CR.process was called for the op (since we flush there too) - + // But we need this coverage for old loaders that don't call ContainerRuntime.process for non-runtime messages. + // (We have to call flush _before_ processing a runtime op, but after is ok for non-runtime op) + this.deltaManager.on("op", () => this.flush()); // logging hardware telemetry this.baseLogger.send({ @@ -2093,7 +2078,6 @@ export class ContainerRuntime featureGates: JSON.stringify({ ...featureGatesForTelemetry, closeSummarizerDelayOverride, - disableFlushBeforeProcess: this.skipSafetyFlushDuringProcessStack, }), telemetryDocumentId: this.telemetryDocumentId, groupedBatchingEnabled: this.groupedBatchingEnabled, @@ -3015,10 +2999,8 @@ export class ContainerRuntime this.verifyNotClosed(); - if (!this.skipSafetyFlushDuringProcessStack) { - // Reference Sequence Number may be about to change, and it must be consistent across a batch, so flush now - this.flush(); - } + // Reference Sequence Number may be about to change, and it must be consistent across a batch, so flush now + this.flush(); this.ensureNoDataModelChanges(() => { this.processInboundMessageOrBatch(messageCopy, local); diff --git a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts index 102580b2baa1..7f6dd35cfcdd 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/outbox.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/outbox.ts @@ -48,12 +48,6 @@ export interface IOutboxConfig { * The maximum size of a batch that we can send over the wire. */ readonly maxBatchSizeInBytes: number; - /** - * If true, maybeFlushPartialBatch will flush the batch if the reference sequence number changed - * since the batch started. Otherwise, it will throw in this case (apart from reentrancy which is handled elsewhere). - * Once the new throw-based flow is proved in a production environment, this option will be removed. - */ - readonly flushPartialBatches: boolean; } export interface IOutboxParameters { @@ -295,10 +289,7 @@ export class Outbox { this.logger.sendTelemetryEvent( { // Only log error if this is truly unexpected - category: - expectedDueToReentrancy || this.params.config.flushPartialBatches - ? "generic" - : "error", + category: expectedDueToReentrancy ? "generic" : "error", eventName: "ReferenceSequenceNumberMismatch", details: { expectedDueToReentrancy, @@ -314,12 +305,6 @@ export class Outbox { ); } - // If we're configured to flush partial batches, do that now and return (don't throw) - if (this.params.config.flushPartialBatches) { - this.flushAll(); - return; - } - // If we are in a reentrant context, we know this can happen without causing any harm. if (expectedDueToReentrancy) { return; diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index a30ab00997a6..f02d864c56f1 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -552,88 +552,71 @@ describe("Runtime", () => { // NOTE: This test is examining a case that only occurs with an old Loader that doesn't tell ContainerRuntime when processing system ops. // In other words, when the MockDeltaManager bumps its lastSequenceNumber, ContainerRuntime.process would be called in the current code, but not with legacy loader. - for (const skipSafetyFlushDuringProcessStack of [true, undefined]) { - it(`Inbound (non-runtime) op triggers flush due to refSeq changing [skipSafetyFlush=${skipSafetyFlushDuringProcessStack}]`, async () => { - const submittedBatches: { - messages: IBatchMessage[]; - referenceSequenceNumber: number; - }[] = []; - - const mockContext = getMockContext({ - settings: { - "Fluid.ContainerRuntime.DisableFlushBeforeProcess": - skipSafetyFlushDuringProcessStack, - }, - }); - ( - mockContext as { submitBatchFn: IContainerContext["submitBatchFn"] } - ).submitBatchFn = ( - messages: IBatchMessage[], - referenceSequenceNumber: number = -1, - ) => { + it("Inbound (non-runtime) op triggers flush due to refSeq changing", async () => { + const submittedBatches: { + messages: IBatchMessage[]; + referenceSequenceNumber: number; + }[] = []; + + const mockContext = getMockContext(); + (mockContext as { submitBatchFn: IContainerContext["submitBatchFn"] }).submitBatchFn = + (messages: IBatchMessage[], referenceSequenceNumber: number = -1) => { submittedOps.push(...messages); // Reusing submittedOps since submitFn won't be invoked due to submitBatchFn's presence submittedBatches.push({ messages, referenceSequenceNumber }); return 999; // CSN not used in test asserts below }; - const containerRuntime = await ContainerRuntime.loadRuntime2({ - context: mockContext as IContainerContext, - registry: new FluidDataStoreRegistry([]), - existing: false, - runtimeOptions: {}, - provideEntryPoint: mockProvideEntryPoint, - }); + const containerRuntime = await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: {}, + provideEntryPoint: mockProvideEntryPoint, + }); - // Submit the first message - submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); - assert.strictEqual(submittedOps.length, 0, "No ops submitted yet"); + // Submit the first message + submitDataStoreOp(containerRuntime, "1", testDataStoreMessage); + assert.strictEqual(submittedOps.length, 0, "No ops submitted yet"); - // Bump lastSequenceNumber and trigger the "op" event artificially to simulate processing a non-runtime op - // When [skipSafetyFlushDuringProcessStack: FALSE], this will trigger a flush, which allows us to safely submit more ops next - const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; - ++mockDeltaManager.lastSequenceNumber; - mockDeltaManager.emit("op", { - clientId: mockClientId, - sequenceNumber: mockDeltaManager.lastSequenceNumber, - clientSequenceNumber: 1, - type: MessageType.ClientJoin, - contents: "test content", - }); + // Bump lastSequenceNumber and trigger the "op" event artificially to simulate processing a non-runtime op + // This will trigger a flush, which allows us to safely submit more ops next + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); - const expectedSubmitCount = skipSafetyFlushDuringProcessStack === true ? 0 : 1; - assert.equal( - submittedOps.length, - expectedSubmitCount, - "Submitted op count wrong after first op", - ); + assert.equal(submittedOps.length, 1, "Submitted op count wrong after first op"); - // Submit the second message - // When [skipSafetyFlushDuringProcessStack: TRUE], this will trigger a flush via Outbox.maybeFlushPartialBatch - submitDataStoreOp(containerRuntime, "2", { - type: "op", - content: { address: "test-address", contents: "test-contents2" }, - }); - assert.equal( - submittedOps.length, - 1, - "By now we expect the first op to have been submitted in both configurations", - ); + // Submit the second message + submitDataStoreOp(containerRuntime, "2", { + type: "op", + content: { address: "test-address", contents: "test-contents2" }, + }); + assert.equal( + submittedOps.length, + 1, + "Second op not yet submitted (scheduled for next microtask)", + ); - // Wait for the next tick for the second message to be flushed - await Promise.resolve(); + // Wait for the next tick for the second message to be flushed + await Promise.resolve(); - // Validate that the messages were submitted - assert.equal(submittedOps.length, 2, "Two messages should be submitted"); - assert.deepEqual( - submittedBatches, - [ - { messages: [submittedOps[0]], referenceSequenceNumber: 0 }, // The first op - { messages: [submittedOps[1]], referenceSequenceNumber: 1 }, // The second op - ], - "Two batches should be submitted with different refSeq", - ); - }); - } + // Validate that the messages were submitted + assert.equal(submittedOps.length, 2, "Two messages should be submitted"); + assert.deepEqual( + submittedBatches, + [ + { messages: [submittedOps[0]], referenceSequenceNumber: 0 }, // The first op + { messages: [submittedOps[1]], referenceSequenceNumber: 1 }, // The second op + ], + "Two batches should be submitted with different refSeq", + ); + }); it("IdAllocation op from replayPendingStates is flushed, preventing outboxSequenceNumberCoherencyCheck error", async () => { // Start out disconnected since step 1 is to trigger ID Allocation op on reconnect @@ -1826,7 +1809,6 @@ describe("Runtime", () => { const featureGates = { "Fluid.ContainerRuntime.IdCompressorEnabled": true, "Fluid.ContainerRuntime.Test.CloseSummarizerDelayOverrideMs": 1337, - "Fluid.ContainerRuntime.DisableFlushBeforeProcess": true, }; await ContainerRuntime.loadRuntime2({ context: localGetMockContext(featureGates) as IContainerContext, @@ -1844,7 +1826,6 @@ describe("Runtime", () => { idCompressorMode: "on", featureGates: JSON.stringify({ closeSummarizerDelayOverride: 1337, - disableFlushBeforeProcess: true, }), groupedBatchingEnabled: true, }, diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 9209d504209b..83041efb94bf 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -243,7 +243,6 @@ describe("Outbox", () => { chunkSizeInBytes?: number; opGroupingConfig?: OpGroupingManagerConfig; immediateMode?: boolean; - flushPartialBatches?: boolean; }) => { const { submitFn, submitBatchFn, deltaManager } = params.context; @@ -262,7 +261,6 @@ describe("Outbox", () => { config: { maxBatchSizeInBytes: params.maxBatchSize ?? maxBatchSizeInBytes, compressionOptions: params.compressionOptions ?? DefaultCompressionOptions, - flushPartialBatches: params.flushPartialBatches ?? false, }, logger: mockLogger, groupingManager: new OpGroupingManager( @@ -858,147 +856,6 @@ describe("Outbox", () => { ); }); - it("Splits the batch when an out of order message is detected (if partial flushing is enabled)", () => { - const outbox = getOutbox({ - context: getMockContext(), - flushPartialBatches: true, - }); - const messages = [ - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "0"), - referenceSequenceNumber: 0, - }, - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "1"), - referenceSequenceNumber: 1, - }, - ]; - - currentSeqNumbers.referenceSequenceNumber = 1; - - outbox.submit(messages[0]); - outbox.submit(messages[1]); - outbox.flush(); - - assert.equal(state.opsSubmitted, messages.length); - assert.equal(state.individualOpsSubmitted.length, 0); - assert.equal(state.batchesSubmitted.length, 2); - assert.deepEqual( - state.batchesSubmitted.map((x) => x.messages), - [[toSubmittedMessage(messages[0])], [toSubmittedMessage(messages[1])]], - ); - assert.deepEqual( - state.batchesSubmitted.map((x) => x.referenceSequenceNumber), - [0, 1], - ); - assert.equal(state.deltaManagerFlushCalls, 0); - const rawMessagesInFlushOrder = [messages[0], messages[1]]; - assert.deepEqual( - state.pendingOpContents, - rawMessagesInFlushOrder.map((message, i) => ({ - runtimeOp: message.runtimeOp, - referenceSequenceNumber: message.referenceSequenceNumber, - localOpMetadata: message.localOpMetadata, - opMetadata: message.metadata, - batchStartCsn: i + 1, // Each message should have been in its own batch. CSN starts at 1. - })), - ); - - mockLogger.assertMatch([ - { - eventName: "Outbox:ReferenceSequenceNumberMismatch", - }, - ]); - }); - - for (const messages of [ - [ - { - ...createMessage(ContainerMessageType.IdAllocation, "0"), - referenceSequenceNumber: 0, - }, - { - ...createMessage(ContainerMessageType.IdAllocation, "0"), - referenceSequenceNumber: 0, - }, - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "0"), - referenceSequenceNumber: 1, - }, - ], - [ - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "0"), - referenceSequenceNumber: 0, - }, - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "0"), - referenceSequenceNumber: 0, - }, - { - ...createMessage(ContainerMessageType.IdAllocation, "0"), - referenceSequenceNumber: 1, - }, - ], - ] as LocalBatchMessage[][]) { - it("Flushes all batches when an out of order message is detected in either flow (if partial flushing is enabled)", () => { - const outbox = getOutbox({ - context: getMockContext(), - flushPartialBatches: true, - }); - for (const message of messages) { - currentSeqNumbers.referenceSequenceNumber = message.referenceSequenceNumber; - if (typeFromBatchedOp(message) === ContainerMessageType.IdAllocation) { - outbox.submitIdAllocation(message); - } else { - outbox.submit(message); - } - } - - assert.equal(state.opsSubmitted, messages.length - 1); - assert.equal(state.individualOpsSubmitted.length, 0); - assert.equal(state.batchesSubmitted.length, 1); - assert.deepEqual( - state.batchesSubmitted.map((x) => x.messages), - [[toSubmittedMessage(messages[0]), toSubmittedMessage(messages[1])]], - ); - - mockLogger.assertMatch([ - { - eventName: "Outbox:ReferenceSequenceNumberMismatch", - }, - ]); - }); - } - - it("Does not throw when an out of order message is detected (if partial flushing is enabled)", () => { - const outbox = getOutbox({ - context: getMockContext(), - flushPartialBatches: true, - }); - const messages: LocalBatchMessage[] = [ - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "0"), - referenceSequenceNumber: 0, - }, - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "1"), - referenceSequenceNumber: 1, - }, - { - ...createMessage(ContainerMessageType.FluidDataStoreOp, "1"), - referenceSequenceNumber: 2, - }, - ]; - - assert.doesNotThrow(() => { - for (const message of messages) { - currentSeqNumbers.referenceSequenceNumber = message.referenceSequenceNumber; - outbox.submit(message); - } - }, "Shouldn't throw if partial flushing is enabled"); - }); - it("Log at most 3 reference sequence number mismatch events", () => { state.isReentrant = true; // This avoids the error being thrown - but it will still log const outbox = getOutbox({ From 2e5e536cc4bcc1b1cffcbb92425a0a1e5792fc8e Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 13:56:37 -0700 Subject: [PATCH 06/21] Tie defaultStagingModeAutoFlushThreshold to largeBatchThreshold constant Extract a shared largeBatchThreshold constant from OpGroupingManager and use it for the staging-mode auto-flush default, so both values stay in sync. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../runtime/container-runtime/src/containerRuntime.ts | 7 ++++--- .../runtime/container-runtime/src/opLifecycle/index.ts | 1 + .../src/opLifecycle/opGroupingManager.ts | 9 ++++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index a96bc6cb66d6..dcf16b73a159 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -244,6 +244,7 @@ import { DuplicateBatchDetector, ensureContentsDeserialized, type IBatchCheckpoint, + largeBatchThreshold, OpCompressor, OpDecompressor, OpGroupingManager, @@ -625,10 +626,10 @@ const defaultChunkSizeInBytes = 204800; * * Chosen based on production telemetry: copy-paste operations routinely produce batches * of 1000+ ops (435K instances over 30 days), and receivers on modern Fluid versions - * handle them without issues. 1000 also matches the existing "large batch" telemetry - * threshold ({@link OpGroupingManager}). + * handle them without issues. Uses {@link largeBatchThreshold} to stay aligned with + * the existing "large batch" telemetry threshold ({@link OpGroupingManager}). */ -const defaultStagingModeAutoFlushThreshold = 1000; +const defaultStagingModeAutoFlushThreshold = largeBatchThreshold; /** * The default time to wait for pending ops to be processed during summarization diff --git a/packages/runtime/container-runtime/src/opLifecycle/index.ts b/packages/runtime/container-runtime/src/opLifecycle/index.ts index 051612b5245a..d898f99f84d9 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/index.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/index.ts @@ -45,6 +45,7 @@ export { } from "./remoteMessageProcessor.js"; export { type EmptyGroupedBatch, + largeBatchThreshold, OpGroupingManager, type OpGroupingManagerConfig, isGroupedBatch, diff --git a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts index 3a8d6a857d5d..85e0a2bad539 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts @@ -17,6 +17,13 @@ import type { OutboundSingletonBatch, } from "./definitions.js"; +/** + * The number of ops in a batch at or above which the batch is considered "large" + * for telemetry purposes. Used by both {@link OpGroupingManager} (GroupLargeBatch event) + * and as the default staging-mode auto-flush threshold. + */ +export const largeBatchThreshold = 1000; + /** * Grouping makes assumptions about the shape of message contents. This interface codifies those assumptions, but does not validate them. */ @@ -123,7 +130,7 @@ export class OpGroupingManager { return batch as OutboundSingletonBatch; } - if (batch.messages.length >= 1000) { + if (batch.messages.length >= largeBatchThreshold) { this.logger.sendTelemetryEvent({ eventName: "GroupLargeBatch", length: batch.messages.length, From 7d02235ad44d9b6c2f50b483019d6b6278ccb0c3 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 14:04:01 -0700 Subject: [PATCH 07/21] Expose stagingModeAutoFlushThreshold on public ContainerRuntimeOptions Move stagingModeAutoFlushThreshold from ContainerRuntimeOptionsInternal to the public ContainerRuntimeOptions interface so consumers can configure it. Make it required (with a default of 1000) to match the fully-required convention of the options interfaces. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 52147e46a637..67f29f47a8b1 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -475,6 +475,18 @@ export interface ContainerRuntimeOptions { * When enabled (`true`), createBlob will return a handle before the blob upload completes. */ readonly createBlobPayloadPending: true | undefined; + + /** + * Controls automatic batch flushing during staging mode. + * Normal turn-based/async flush scheduling is suppressed while in staging mode + * until the accumulated batch reaches this many ops, at which point the batch + * is flushed. Incoming ops always break the current batch regardless of this setting. + * + * Set to Infinity to only break batches on system events (incoming ops). + * + * @defaultValue 1000 + */ + readonly stagingModeAutoFlushThreshold: number; } /** @@ -512,18 +524,6 @@ export interface ContainerRuntimeOptionsInternal extends ContainerRuntimeOptions * In that case, batched messages will be sent individually (but still all at the same time). */ readonly enableGroupedBatching: boolean; - - /** - * Controls automatic batch flushing during staging mode. - * Normal turn-based/async flush scheduling is suppressed while in staging mode - * until the accumulated batch reaches this many ops, at which point the batch - * is flushed. Incoming ops always break the current batch regardless of this setting. - * - * Set to Infinity to only break batches on system events (incoming ops). - * - * @defaultValue 1000 - */ - readonly stagingModeAutoFlushThreshold?: number; } /** @@ -983,6 +983,7 @@ export class ContainerRuntime loadSequenceNumberVerification: "close", maxBatchSizeInBytes: defaultMaxBatchSizeInBytes, chunkSizeInBytes: defaultChunkSizeInBytes, + stagingModeAutoFlushThreshold: defaultStagingModeAutoFlushThreshold, }; const defaultConfigs = { @@ -1008,7 +1009,7 @@ export class ContainerRuntime ? disabledCompressionConfig : defaultConfigs.compressionOptions, createBlobPayloadPending = defaultConfigs.createBlobPayloadPending, - stagingModeAutoFlushThreshold, + stagingModeAutoFlushThreshold = defaultConfigs.stagingModeAutoFlushThreshold, }: IContainerRuntimeOptionsInternal = runtimeOptions; // If explicitSchemaControl is off, ensure that options which require explicitSchemaControl are not enabled. @@ -4810,9 +4811,7 @@ export class ContainerRuntime // (deltaManager "op" handler, process(), connection changes, getPendingLocalState, // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects - // sequence number changes. By default it throws if unexpected changes are detected; it only - // forces a flush as a safety net when partial-batch flushing is enabled via - // Fluid.ContainerRuntime.DisableFlushBeforeProcess. + // sequence number changes and throws if unexpected changes are detected. if ( this.inStagingMode && this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold From b4921d8143a0d9d56e7f6f5ad24e6ab3742aea3b Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 14:30:16 -0700 Subject: [PATCH 08/21] Add staging mode threshold tests and fix telemetry expectations Add new unit tests for stagingModeAutoFlushThreshold: - discardChanges flushes outbox before rollback - enterStagingMode flushes pending outbox as non-staged - config override > runtime option > default precedence (2 tests) - incoming non-runtime op breaks batch during staging mode - reconnect breaks batch during staging mode Also fix ContainerLoadStats telemetry expectations to include stagingModeAutoFlushThreshold, update type validation for the new public API surface, and remove unused typeFromBatchedOp helper. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime.legacy.beta.api.md | 1 + .../runtime/container-runtime/package.json | 6 +- .../src/test/containerRuntime.spec.ts | 222 ++++++++++++++++++ .../src/test/opLifecycle/outbox.spec.ts | 5 - ...idateContainerRuntimePrevious.generated.ts | 1 + 5 files changed, 229 insertions(+), 6 deletions(-) diff --git a/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md b/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md index 4fe44106fbff..0abf729785db 100644 --- a/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md +++ b/packages/runtime/container-runtime/api-report/container-runtime.legacy.beta.api.md @@ -45,6 +45,7 @@ export interface ContainerRuntimeOptions { readonly gcOptions: IGCRuntimeOptions; readonly loadSequenceNumberVerification: "close" | "log" | "bypass"; readonly maxBatchSizeInBytes: number; + readonly stagingModeAutoFlushThreshold: number; // (undocumented) readonly summaryOptions: ISummaryRuntimeOptions; } diff --git a/packages/runtime/container-runtime/package.json b/packages/runtime/container-runtime/package.json index 2685419b72c6..eba1f51d5173 100644 --- a/packages/runtime/container-runtime/package.json +++ b/packages/runtime/container-runtime/package.json @@ -219,7 +219,11 @@ "typescript": "~5.4.5" }, "typeValidation": { - "broken": {}, + "broken": { + "Interface_ContainerRuntimeOptions": { + "forwardCompat": false + } + }, "entrypoint": "legacy" } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 8ff34b65288b..51200bb54277 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1783,6 +1783,7 @@ describe("Runtime", () => { enableGroupedBatching: true, // Redundant, but makes the JSON.stringify yield the same result as the logs explicitSchemaControl: false, createBlobPayloadPending: undefined, + stagingModeAutoFlushThreshold: 1000, } as const satisfies ContainerRuntimeOptionsInternal; const mergedRuntimeOptions = { ...defaultRuntimeOptions, ...runtimeOptions } as const; @@ -3739,6 +3740,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -3798,6 +3800,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: false, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -3836,6 +3839,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -3874,6 +3878,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -3911,6 +3916,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -3956,6 +3962,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: "on", enableGroupedBatching: false, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -4015,6 +4022,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, // idCompressor is undefined, since that represents a logical state (off) enableGroupedBatching: true, explicitSchemaControl: false, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -4055,6 +4063,7 @@ describe("Runtime", () => { enableRuntimeIdCompressor: undefined, enableGroupedBatching: true, explicitSchemaControl: true, + stagingModeAutoFlushThreshold: 1000, }; logger.assertMatchAny([ @@ -4550,6 +4559,219 @@ describe("Runtime", () => { controls.commitChanges(); }); + + it("discardChanges flushes outbox before rollback", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + const channelCollectionStub = stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + const controls = runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp( + runtimeWithThreshold, + "1", + genTestDataStoreMessage("op1"), + "META1", + ); + submitDataStoreOp( + runtimeWithThreshold, + "2", + genTestDataStoreMessage("op2"), + "META2", + ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox before discard", + ); + + controls.discardChanges(); + + // Outbox should have been drained before rollback + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty after discard", + ); + // Staged ops should be rolled back (LIFO order) + assert.equal( + channelCollectionStub.rollbackDataStoreOp.callCount, + 2, + "Both ops should be rolled back", + ); + // Nothing sent to wire + assert.equal( + submittedOps.length, + 0, + "No ops should be submitted to wire after discard", + ); + }); + + it("enterStagingMode flushes any pending outbox contents as non-staged", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + // Submit ops before entering staging mode (not yet flushed — still in outbox) + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("pre1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("pre2")); + assert.equal(submittedOps.length, 0, "Ops not yet flushed"); + + // Enter staging mode — should flush pre-staging ops as non-staged + const controls = runtimeWithThreshold.enterStagingMode(); + assert.equal( + submittedOps.length, + 2, + "Pre-staging ops should be flushed on enterStagingMode", + ); + + // Submit more ops while in staging mode + submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("staged1")); + runtimeWithThreshold.flush(); + assert.equal(submittedOps.length, 2, "Staged ops should NOT be submitted to wire"); + + controls.commitChanges(); + assert.equal(submittedOps.length, 3, "All ops should be submitted after commit"); + }); + + it("config override takes precedence over runtime option", async () => { + const configThreshold = 5; + const runtimeOptionThreshold = 50; + mockContext = getMockContext({ + settings: { + "Fluid.ContainerRuntime.StagingModeAutoFlushThreshold": configThreshold, + }, + }) as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: runtimeOptionThreshold, + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + // Submit ops up to the config override threshold (5), not the runtime option (50) + for (let i = 0; i < configThreshold; i++) { + submitDataStoreOp(runtimeWithThreshold, `${i}`, genTestDataStoreMessage(`op${i}`)); + } + // Threshold reached — scheduleFlush falls through, flush happens on next microtask + await Promise.resolve(); + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty — config override threshold (5) should win over runtime option (50)", + ); + }); + + it("runtime option takes precedence over default", async () => { + const runtimeOptionThreshold = 5; + runtimeWithThreshold = await createRuntimeWithThreshold(runtimeOptionThreshold); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + for (let i = 0; i < runtimeOptionThreshold; i++) { + submitDataStoreOp(runtimeWithThreshold, `${i}`, genTestDataStoreMessage(`op${i}`)); + } + await Promise.resolve(); + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty — runtime option (5) should win over default (1000)", + ); + }); + + it("incoming non-runtime op breaks batch during staging mode", async () => { + runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox", + ); + + // Simulate an incoming non-runtime op (e.g. ClientJoin signal) + // The deltaManager "op" handler calls this.flush() directly. + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); + + assert.equal(submittedOps.length, 0, "No ops sent to wire during staging mode"); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be empty — non-runtime op should break the batch", + ); + }); + + it("reconnect breaks batch during staging mode", async () => { + mockContext = getMockContext() as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: Infinity, + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + // Submit ops while connected (sitting in outbox under threshold) + submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox before disconnect", + ); + + // Disconnect + changeConnectionState(runtimeWithThreshold, false, "disconnectedClientId"); + + // Reconnect — triggers flush and replayPendingStates + changeConnectionState(runtimeWithThreshold, true, mockClientId); + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be drained after reconnect", + ); + }); }); }); }); diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 83041efb94bf..2cff6c523a6c 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -48,11 +48,6 @@ import type { IPendingMessage, } from "../../pendingStateManager.js"; -function typeFromBatchedOp(message: LocalBatchMessage): string { - assert(message.runtimeOp !== undefined, "PRECONDITION: runtimeOp is undefined"); - return message.runtimeOp.type; -} - // Make a mock op with distinguishable contents function op(data: string): LocalContainerRuntimeMessage { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions diff --git a/packages/runtime/container-runtime/src/test/types/validateContainerRuntimePrevious.generated.ts b/packages/runtime/container-runtime/src/test/types/validateContainerRuntimePrevious.generated.ts index 1d89e87936aa..e8e366341928 100644 --- a/packages/runtime/container-runtime/src/test/types/validateContainerRuntimePrevious.generated.ts +++ b/packages/runtime/container-runtime/src/test/types/validateContainerRuntimePrevious.generated.ts @@ -96,6 +96,7 @@ declare type current_as_old_for_Function_loadContainerRuntime = requireAssignabl * typeValidation.broken: * "Interface_ContainerRuntimeOptions": {"forwardCompat": false} */ +// @ts-expect-error compatibility expected to be broken declare type old_as_current_for_Interface_ContainerRuntimeOptions = requireAssignableTo, TypeOnly> /* From 839491dad75b1a234aac4c2ba4b759b91d5e6e10 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 14:37:28 -0700 Subject: [PATCH 09/21] Add IdAllocation+reconnect and pre-staged resubmit tests Test 3 (highest risk): Verify that IdAllocation ops submitted during replayPendingStates in staging mode are properly flushed by the "op" handler before new ops with different refSeqs arrive, preventing the outboxSequenceNumberCoherencyCheck error. Test 4: Verify that pre-staged batches are correctly resubmitted on reconnect while the threshold is active, and that staged changes can still be committed afterward. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/test/containerRuntime.spec.ts | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 51200bb54277..64c7ed5b86d1 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4772,6 +4772,126 @@ describe("Runtime", () => { "Outbox should be drained after reconnect", ); }); + + it("IdAllocation + reconnect while in staging mode does not hit coherency check", async () => { + // This is the highest-risk scenario from Mark's test plan. + // The fix in b4e1fd1dd25 added scheduleFlush() after submitIdAllocationOpIfNeeded + // during replayPendingStates. With threshold suppression, that scheduleFlush() + // returns early if in staging mode and under threshold. But the "op" handler + // calls flush() directly, so the IdAllocation op still gets flushed before + // new ops with different refSeqs are submitted. + + // Start disconnected so we can trigger IdAllocation on reconnect + mockContext = getMockContext({ connected: false }) as IContainerContext; + const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; + + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: Infinity, + enableRuntimeIdCompressor: "on", + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + runtimeWithThreshold.enterStagingMode(); + + // Generate 1st compressed ID while disconnected — queues an IdAllocation op + runtimeWithThreshold.idCompressor?.generateCompressedId(); + + // Reconnect — replayPendingStates submits IdAllocation op + calls scheduleFlush() + // scheduleFlush() returns early due to staging mode threshold suppression + changeConnectionState(runtimeWithThreshold, true, mockClientId); + + // Simulate a remote op arriving (bumps refSeq) + // The "op" handler calls this.flush() directly, draining the IdAllocation op + ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", { + clientId: mockClientId, + sequenceNumber: mockDeltaManager.lastSequenceNumber, + clientSequenceNumber: 1, + type: MessageType.ClientJoin, + contents: "test content", + }); + + // Generate 2nd compressed ID — its IdAllocation op has a new refSeq + const id2 = runtimeWithThreshold.idCompressor?.generateCompressedId(); + + // This would throw outboxSequenceNumberCoherencyCheck if the first IdAllocation + // op wasn't flushed before the refSeq changed. + assert.doesNotThrow( + () => + submitDataStoreOp( + runtimeWithThreshold, + "someDS", + genTestDataStoreMessage({ id: id2 }), + ), + "Should not throw coherency check — IdAllocation op should have been flushed by the 'op' handler", + ); + }); + + it("reconnect resubmits pre-staged batches with threshold active", async () => { + mockContext = getMockContext() as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: Infinity, + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtimeWithThreshold); + submittedOps.length = 0; + + // Submit ops BEFORE entering staging mode + submitDataStoreOp( + runtimeWithThreshold, + "pre1", + genTestDataStoreMessage("pre-staging-op"), + ); + await Promise.resolve(); + assert.equal(submittedOps.length, 1, "Pre-staging op should be submitted"); + + // Enter staging mode and submit more ops + const controls = runtimeWithThreshold.enterStagingMode(); + submitDataStoreOp( + runtimeWithThreshold, + "staged1", + genTestDataStoreMessage("staged-op"), + "STAGED_META", + ); + runtimeWithThreshold.flush(); + assert.equal(submittedOps.length, 1, "Staged op should not be submitted to wire"); + + // Disconnect + changeConnectionState(runtimeWithThreshold, false, "disconnectedClientId"); + submittedOps.length = 0; + + // Reconnect — replayPendingStates resubmits all pending ops + changeConnectionState(runtimeWithThreshold, true, mockClientId); + await Promise.resolve(); + + // The pre-staging op should be resubmitted to the wire. + // The staged op stays in PSM as staged (not sent to wire). + assert( + submittedOps.length > 0, + "Pre-staging op should be resubmitted after reconnect", + ); + const opsAfterReconnect = submittedOps.length; + + // Verify we can still commit the staged changes + controls.commitChanges(); + assert( + submittedOps.length > opsAfterReconnect, + "Staged op should be submitted after commitChanges", + ); + }); }); }); }); From 9c29e85ebe9ba02eaffbdb3c06608fc66cbf361a Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 14:42:23 -0700 Subject: [PATCH 10/21] Log StagingModeAutoFlush telemetry when threshold is hit Emit a telemetry event when the staging mode auto-flush threshold is reached, including the threshold value and current batch message count. This helps operators distinguish threshold-triggered flushes from other flush causes. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 17 +++++++++----- .../src/test/containerRuntime.spec.ts | 22 ++++++++++++++++++- 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 67f29f47a8b1..28d54292a68e 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -4812,11 +4812,18 @@ export class ContainerRuntime // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects // sequence number changes and throws if unexpected changes are detected. - if ( - this.inStagingMode && - this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold - ) { - return; + if (this.inStagingMode) { + if (this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold) { + return; + } + this.mc.logger.sendTelemetryEvent({ + eventName: "StagingModeAutoFlush", + category: "generic", + details: { + threshold: this.stagingModeAutoFlushThreshold, + mainBatchMessageCount: this.outbox.mainBatchMessageCount, + }, + }); } if (this.flushScheduled) { diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 64c7ed5b86d1..eb801fc76c32 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4414,7 +4414,19 @@ describe("Runtime", () => { }); it("ops flush when threshold is reached", async () => { - runtimeWithThreshold = await createRuntimeWithThreshold(3); + const logger = new MockLogger(); + const threshold = 3; + mockContext = getMockContext({ logger }) as IContainerContext; + runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ + context: mockContext as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + stagingModeAutoFlushThreshold: threshold, + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; stubChannelCollection(runtimeWithThreshold); submittedOps.length = 0; @@ -4448,6 +4460,14 @@ describe("Runtime", () => { 0, "Outbox should be empty after threshold flush", ); + + // Verify telemetry was logged when threshold was hit + logger.assertMatch([ + { + eventName: "ContainerRuntime:StagingModeAutoFlush", + category: "generic", + }, + ]); }); it("incoming ops break the batch regardless of threshold", async () => { From 66cb2850c1b4727604bb50f1f8559518005c1831 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 14:52:33 -0700 Subject: [PATCH 11/21] Wrap exitStagingMode in a PerformanceEvent Use PerformanceEvent.timedExec to measure exitStagingMode duration and report autoFlushCount, autoFlushThreshold, and exitMethod. The perf event is passed to the discardOrCommit callback so callers can add properties in the future. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 57 ++++++++++++------- .../src/test/containerRuntime.spec.ts | 14 +++-- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 28d54292a68e..a3be9287a6b6 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1418,6 +1418,11 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; private readonly stagingModeAutoFlushThreshold: number; + /** + * Tracks auto-flush events during the current staging mode session. + * Reset on enter, incremented when threshold is hit, reported on exit. + */ + private stagingModeAutoFlushCount: number = 0; /** * BatchId tracking is needed whenever there's a possibility of a "forked Container", * where the same local state is pending in two different running Containers, each of @@ -3634,21 +3639,36 @@ export class ContainerRuntime // Make sure Outbox is empty before entering staging mode, // since we mark whole batches as "staged" or not to indicate whether to submit them. this.flush(); + this.stagingModeAutoFlushCount = 0; - const exitStagingMode = (discardOrCommit: () => void): void => { + const exitStagingMode = ( + discardOrCommit: (event: PerformanceEvent) => void, + exitMethod: "commit" | "discard", + ): void => { try { - // Final flush of any last staged changes - // NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c - this.outbox.flush(); + PerformanceEvent.timedExec( + this.mc.logger, + { + eventName: "ExitStagingMode", + exitMethod, + autoFlushCount: this.stagingModeAutoFlushCount, + autoFlushThreshold: this.stagingModeAutoFlushThreshold, + }, + (event) => { + // Final flush of any last staged changes + // NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c + this.outbox.flush(); - this.stageControls = undefined; + this.stageControls = undefined; - // During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops). - // Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode. - this.submitIdAllocationOpIfNeeded({ staged: false }); - discardOrCommit(); + // During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops). + // Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode. + this.submitIdAllocationOpIfNeeded({ staged: false }); + discardOrCommit(event); - this.channelCollection.notifyStagingMode(false); + this.channelCollection.notifyStagingMode(false); + }, + ); } catch (error) { const normalizedError = normalizeError(error); this.closeFn(normalizedError); @@ -3658,23 +3678,23 @@ export class ContainerRuntime const stageControls: StageControlsInternal = { discardChanges: () => - exitStagingMode(() => { + exitStagingMode((_event) => { // Pop all staged batches from the PSM and roll them back in LIFO order this.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => { this.rollbackStagedChange(runtimeOp, localOpMetadata); }); this.updateDocumentDirtyState(); - }), + }, "discard"), commitChanges: (options) => { const { squash } = { ...defaultStagingCommitOptions, ...options }; - exitStagingMode(() => { + exitStagingMode((_event) => { // Replay all staged batches in typical FIFO order. // We'll be out of staging mode so they'll be sent to the service finally. this.pendingStateManager.replayPendingStates({ committingStagedBatches: true, squash, }); - }); + }, "commit"); }, }; @@ -4816,14 +4836,7 @@ export class ContainerRuntime if (this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold) { return; } - this.mc.logger.sendTelemetryEvent({ - eventName: "StagingModeAutoFlush", - category: "generic", - details: { - threshold: this.stagingModeAutoFlushThreshold, - mainBatchMessageCount: this.outbox.mainBatchMessageCount, - }, - }); + this.stagingModeAutoFlushCount++; } if (this.flushScheduled) { diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index eb801fc76c32..d1b9a227c7ba 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4430,7 +4430,7 @@ describe("Runtime", () => { stubChannelCollection(runtimeWithThreshold); submittedOps.length = 0; - runtimeWithThreshold.enterStagingMode(); + const controls = runtimeWithThreshold.enterStagingMode(); // Submit 3 ops — exactly at the threshold submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); @@ -4461,11 +4461,15 @@ describe("Runtime", () => { "Outbox should be empty after threshold flush", ); - // Verify telemetry was logged when threshold was hit - logger.assertMatch([ + // Exit staging mode and verify perf event includes auto-flush count + controls.commitChanges(); + logger.assertMatchAny([ { - eventName: "ContainerRuntime:StagingModeAutoFlush", - category: "generic", + eventName: "ContainerRuntime:ExitStagingMode_end", + category: "performance", + exitMethod: "commit", + autoFlushCount: 1, + autoFlushThreshold: threshold, }, ]); }); From c82ea67354a4efd950904457ea4862617a9cb641 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 17:16:19 -0700 Subject: [PATCH 12/21] Wrap exitStagingMode in PerformanceEvent with batch telemetry Use PerformanceEvent.timedExec to measure exitStagingMode, reporting: - exitMethod (commit/discard) - autoFlushCount and autoFlushThreshold - batches count and batchesOverThreshold (via reportProgress) Both commit and discard paths return batchInfo arrays (deduplicated by CSN) so exitStagingMode can compute batch stats uniformly. Also make replayPendingStates return the replayed batchInfo array. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 27 ++++++++++++------- .../src/pendingStateManager.ts | 20 ++++++++++---- .../src/test/containerRuntime.spec.ts | 2 +- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index a3be9287a6b6..cab569b8528c 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -258,6 +258,7 @@ import { type IPendingLocalState, PendingStateManager, type PendingBatchResubmitMetadata, + type IPendingMessage, } from "./pendingStateManager.js"; import { BatchRunCounter, RunCounter } from "./runCounter.js"; import { @@ -3642,7 +3643,7 @@ export class ContainerRuntime this.stagingModeAutoFlushCount = 0; const exitStagingMode = ( - discardOrCommit: (event: PerformanceEvent) => void, + discardOrCommit: () => IPendingMessage["batchInfo"][], exitMethod: "commit" | "discard", ): void => { try { @@ -3664,8 +3665,13 @@ export class ContainerRuntime // During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops). // Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode. this.submitIdAllocationOpIfNeeded({ staged: false }); - discardOrCommit(event); - + const batchInfos = discardOrCommit(); + event.reportProgress({ + batches: batchInfos.length, + batchesOverThreshold: batchInfos.filter( + (b) => b.length > this.stagingModeAutoFlushThreshold, + ).length, + }); this.channelCollection.notifyStagingMode(false); }, ); @@ -3678,19 +3684,22 @@ export class ContainerRuntime const stageControls: StageControlsInternal = { discardChanges: () => - exitStagingMode((_event) => { + exitStagingMode(() => { // Pop all staged batches from the PSM and roll them back in LIFO order - this.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => { - this.rollbackStagedChange(runtimeOp, localOpMetadata); - }); + const batchInfos = this.pendingStateManager.popStagedBatches( + ({ runtimeOp, localOpMetadata }) => { + this.rollbackStagedChange(runtimeOp, localOpMetadata); + }, + ); this.updateDocumentDirtyState(); + return batchInfos; }, "discard"), commitChanges: (options) => { const { squash } = { ...defaultStagingCommitOptions, ...options }; - exitStagingMode((_event) => { + exitStagingMode(() => { // Replay all staged batches in typical FIFO order. // We'll be out of staging mode so they'll be sent to the service finally. - this.pendingStateManager.replayPendingStates({ + return this.pendingStateManager.replayPendingStates({ committingStagedBatches: true, squash, }); diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index f1edd45d9284..ac0f4865459c 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -427,7 +427,7 @@ export class PendingStateManager implements IDisposable { clientId !== undefined, 0xa33 /* clientId (from stateHandler) could only be undefined if we've never connected, but we have a CSN so we know that's not the case */, ); - + const batchInfo = { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }; for (const message of batch) { const { runtimeOp, @@ -442,8 +442,7 @@ export class PendingStateManager implements IDisposable { runtimeOp, localOpMetadata, opMetadata, - // Note: We only will read this off the first message, but put it on all for simplicity - batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged }, + batchInfo, }; this.pendingMessages.push(pendingMessage); } @@ -748,7 +747,9 @@ export class PendingStateManager implements IDisposable { * states in its queue. This includes triggering resubmission of unacked ops. * ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer) */ - public replayPendingStates(options?: ReplayPendingStateOptions): void { + public replayPendingStates( + options?: ReplayPendingStateOptions, + ): IPendingMessage["batchInfo"][] { const { committingStagedBatches, squash } = { ...defaultReplayPendingStatesOptions, ...options, @@ -775,6 +776,7 @@ export class PendingStateManager implements IDisposable { const initialPendingMessagesCount = this.pendingMessages.length; let remainingPendingMessagesCount = this.pendingMessages.length; + const replayedBatches: Record = {}; let seenStagedBatch = false; @@ -812,6 +814,7 @@ export class PendingStateManager implements IDisposable { if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) { // Resubmit no messages, with the batchId. Will result in another empty batch marker. this.stateHandler.reSubmitBatch([], { batchId, staged, squash }); + replayedBatches[pendingMessage.batchInfo.batchStartCsn] ??= pendingMessage.batchInfo; continue; } @@ -838,6 +841,7 @@ export class PendingStateManager implements IDisposable { ], { batchId, staged, squash }, ); + replayedBatches[pendingMessage.batchInfo.batchStartCsn] ??= pendingMessage.batchInfo; continue; } // else: batchMetadataFlag === true (It's a typical multi-message batch) @@ -877,6 +881,7 @@ export class PendingStateManager implements IDisposable { } this.stateHandler.reSubmitBatch(batch, { batchId, staged, squash }); + replayedBatches[pendingMessage.batchInfo.batchStartCsn] ??= pendingMessage.batchInfo; } if (!committingStagedBatches) { @@ -894,6 +899,8 @@ export class PendingStateManager implements IDisposable { clientId: this.stateHandler.clientId(), }); } + + return Object.values(replayedBatches); } /** @@ -904,13 +911,15 @@ export class PendingStateManager implements IDisposable { // callback will only be given staged messages with a valid runtime op (i.e. not empty batch and not an initial message with only serialized content) stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage }, ) => void, - ): void { + ): IPendingMessage["batchInfo"][] { + const batches: Record = {}; while (!this.pendingMessages.isEmpty()) { const stagedMessage = this.pendingMessages.peekBack(); if (stagedMessage?.batchInfo.staged === true) { this.pendingMessages.pop(); if (hasTypicalRuntimeOp(stagedMessage)) { + batches[stagedMessage.batchInfo.batchStartCsn] ??= stagedMessage.batchInfo; callback(stagedMessage); } } else { @@ -921,6 +930,7 @@ export class PendingStateManager implements IDisposable { this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true), 0xb89 /* Shouldn't be any more staged messages */, ); + return Object.values(batches); } } diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index d1b9a227c7ba..17d8dcc67301 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -1158,7 +1158,7 @@ describe("Runtime", () => { const getMockPendingStateManager = (): PendingStateManager => { let pendingMessages = 0; return { - replayPendingStates: () => {}, + replayPendingStates: () => [], hasPendingMessages: (): boolean => pendingMessages > 0, hasPendingUserChanges: (): boolean => pendingMessages > 0, processInboundMessages: (inbound: InboundMessageResult, _local: boolean) => { From fad8c0e279b394fb88b3c139ad51c9336723aa88 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 17:22:27 -0700 Subject: [PATCH 13/21] Remove e2e test for deleted DisableFlushBeforeProcess flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The flag was removed in this PR, so the test passing it is now redundant — it behaves identically to the remaining test which covers flush-before-process behavior. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/test/fewerBatches.spec.ts | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts b/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts index c7ea2dfe1bc1..31ba5a860f37 100644 --- a/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts +++ b/packages/test/test-end-to-end-tests/src/test/fewerBatches.spec.ts @@ -160,18 +160,6 @@ describeCompat("Fewer batches", "NoCompat", (getTestObjectProvider, apis) => { }, ]; - itExpects( - "Reference sequence number mismatch when doing op reentry submits two batches", - expectedErrors, - async () => { - // By default, we would flush a batch when we detect a reference sequence number mismatch - await processOutOfOrderOp({ - ["Fluid.ContainerRuntime.DisableFlushBeforeProcess"]: true, - }); - assert.strictEqual(capturedBatches.length, 2); - }, - ); - itExpects( "Op reentry submits two batches due to flush before processing", expectedErrors, From 4f3aa387af491f0e46daf5850db6f598f5c915fd Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 18 Mar 2026 17:54:42 -0700 Subject: [PATCH 14/21] Remove unused typeFromBatchedOp helper The function was only used by tests for the removed flushPartialBatches code path. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/test/opLifecycle/outbox.spec.ts | 5 ----- 1 file changed, 5 deletions(-) diff --git a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts index 83041efb94bf..2cff6c523a6c 100644 --- a/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts +++ b/packages/runtime/container-runtime/src/test/opLifecycle/outbox.spec.ts @@ -48,11 +48,6 @@ import type { IPendingMessage, } from "../../pendingStateManager.js"; -function typeFromBatchedOp(message: LocalBatchMessage): string { - assert(message.runtimeOp !== undefined, "PRECONDITION: runtimeOp is undefined"); - return message.runtimeOp.type; -} - // Make a mock op with distinguishable contents function op(data: string): LocalContainerRuntimeMessage { // eslint-disable-next-line @typescript-eslint/consistent-type-assertions From 0dfedf72f2b0c7bf2a0fa5e30e6e40f89fc0eaad Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Thu, 19 Mar 2026 12:51:59 -0700 Subject: [PATCH 15/21] Fix GroupLargeBatch threshold and staged batch counting - Change GroupLargeBatch check from >= to > so that staging-mode auto-flush batches (exactly at the threshold) don't trigger the event. Only genuinely oversized batches are logged. - Fix batch counting in replayPendingStates and popStagedBatches: staged batches all get batchStartCsn=-1, so using Record collapsed them to 1 entry. Use an array (replayPendingStates) or Set by object identity (popStagedBatches) instead. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../src/opLifecycle/opGroupingManager.ts | 5 ++++- .../container-runtime/src/pendingStateManager.ts | 16 ++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts index 85e0a2bad539..ee01950843ae 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts @@ -130,7 +130,10 @@ export class OpGroupingManager { return batch as OutboundSingletonBatch; } - if (batch.messages.length >= largeBatchThreshold) { + // Use > (not >=) so that batches flushed exactly at the staging-mode + // auto-flush threshold (which defaults to largeBatchThreshold) don't + // trigger this event. Only genuinely oversized batches are logged. + if (batch.messages.length > largeBatchThreshold) { this.logger.sendTelemetryEvent({ eventName: "GroupLargeBatch", length: batch.messages.length, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index ac0f4865459c..b411226aa5f2 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -776,7 +776,7 @@ export class PendingStateManager implements IDisposable { const initialPendingMessagesCount = this.pendingMessages.length; let remainingPendingMessagesCount = this.pendingMessages.length; - const replayedBatches: Record = {}; + const replayedBatchInfos: IPendingMessage["batchInfo"][] = []; let seenStagedBatch = false; @@ -814,7 +814,7 @@ export class PendingStateManager implements IDisposable { if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) { // Resubmit no messages, with the batchId. Will result in another empty batch marker. this.stateHandler.reSubmitBatch([], { batchId, staged, squash }); - replayedBatches[pendingMessage.batchInfo.batchStartCsn] ??= pendingMessage.batchInfo; + replayedBatchInfos.push(pendingMessage.batchInfo); continue; } @@ -841,7 +841,7 @@ export class PendingStateManager implements IDisposable { ], { batchId, staged, squash }, ); - replayedBatches[pendingMessage.batchInfo.batchStartCsn] ??= pendingMessage.batchInfo; + replayedBatchInfos.push(pendingMessage.batchInfo); continue; } // else: batchMetadataFlag === true (It's a typical multi-message batch) @@ -881,7 +881,7 @@ export class PendingStateManager implements IDisposable { } this.stateHandler.reSubmitBatch(batch, { batchId, staged, squash }); - replayedBatches[pendingMessage.batchInfo.batchStartCsn] ??= pendingMessage.batchInfo; + replayedBatchInfos.push(pendingMessage.batchInfo); } if (!committingStagedBatches) { @@ -900,7 +900,7 @@ export class PendingStateManager implements IDisposable { }); } - return Object.values(replayedBatches); + return replayedBatchInfos; } /** @@ -912,14 +912,14 @@ export class PendingStateManager implements IDisposable { stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage }, ) => void, ): IPendingMessage["batchInfo"][] { - const batches: Record = {}; + const batchSet = new Set(); while (!this.pendingMessages.isEmpty()) { const stagedMessage = this.pendingMessages.peekBack(); if (stagedMessage?.batchInfo.staged === true) { this.pendingMessages.pop(); + batchSet.add(stagedMessage.batchInfo); if (hasTypicalRuntimeOp(stagedMessage)) { - batches[stagedMessage.batchInfo.batchStartCsn] ??= stagedMessage.batchInfo; callback(stagedMessage); } } else { @@ -930,7 +930,7 @@ export class PendingStateManager implements IDisposable { this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true), 0xb89 /* Shouldn't be any more staged messages */, ); - return Object.values(batches); + return [...batchSet]; } } From fffd94f24c1d80f3afbeda39f25d6296269386f3 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Thu, 19 Mar 2026 13:59:37 -0700 Subject: [PATCH 16/21] Add stagingModeAutoFlushThreshold to downstream type satisfies checks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix build failures in test-service-load and azure-client tests that use `satisfies ContainerRuntimeOptionsInternal` — they need the new required field. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../service-clients/azure-client/src/test/AzureClient.spec.ts | 2 ++ packages/test/test-service-load/src/optionsMatrix.ts | 1 + 2 files changed, 3 insertions(+) diff --git a/packages/service-clients/azure-client/src/test/AzureClient.spec.ts b/packages/service-clients/azure-client/src/test/AzureClient.spec.ts index 67e90a040afc..20875f56d72d 100644 --- a/packages/service-clients/azure-client/src/test/AzureClient.spec.ts +++ b/packages/service-clients/azure-client/src/test/AzureClient.spec.ts @@ -416,6 +416,7 @@ for (const compatibilityMode of ["1", "2"] as const) { explicitSchemaControl: false, createBlobPayloadPending: undefined, disableSchemaUpgrade: false, + stagingModeAutoFlushThreshold: 1000, } as const satisfies ContainerRuntimeOptionsInternal; const expectedRuntimeOptions2 = { summaryOptions: {}, @@ -433,6 +434,7 @@ for (const compatibilityMode of ["1", "2"] as const) { explicitSchemaControl: true, createBlobPayloadPending: undefined, disableSchemaUpgrade: false, + stagingModeAutoFlushThreshold: 1000, } as const satisfies ContainerRuntimeOptionsInternal; const expectedRuntimeOptions = diff --git a/packages/test/test-service-load/src/optionsMatrix.ts b/packages/test/test-service-load/src/optionsMatrix.ts index bd43d393e62b..cabe94c7b64f 100644 --- a/packages/test/test-service-load/src/optionsMatrix.ts +++ b/packages/test/test-service-load/src/optionsMatrix.ts @@ -119,6 +119,7 @@ export function generateRuntimeOptions( createBlobPayloadPending: [true, undefined], explicitSchemaControl: [true, false], disableSchemaUpgrade: [true, false], + stagingModeAutoFlushThreshold: [undefined], }; const pairwiseOptions = generatePairwiseOptions( From bf6a381c6d18b77f9f037915f910015af9ebbae8 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Thu, 19 Mar 2026 14:59:43 -0700 Subject: [PATCH 17/21] Address review comments: remove autoFlushCount member, fix JSDoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove stagingModeAutoFlushCount member variable — auto-flush count is computable from batchesAtOrOverThreshold in the ExitStagingMode perf event (every batch >= threshold was auto-flushed) - Rename batchesOverThreshold to batchesAtOrOverThreshold with >= check to match the auto-flush condition - Fix largeBatchThreshold JSDoc: "at or above" → "above" to match the strict > comparison - Collapse lonely if in scheduleFlush Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 21 +++++++------------ .../src/opLifecycle/opGroupingManager.ts | 2 +- .../src/test/containerRuntime.spec.ts | 1 - 3 files changed, 8 insertions(+), 16 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index c1984f31a564..34b5318e1bba 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -1430,11 +1430,6 @@ export class ContainerRuntime private readonly batchRunner = new BatchRunCounter(); private readonly _flushMode: FlushMode; private readonly stagingModeAutoFlushThreshold: number; - /** - * Tracks auto-flush events during the current staging mode session. - * Reset on enter, incremented when threshold is hit, reported on exit. - */ - private stagingModeAutoFlushCount: number = 0; /** * BatchId tracking is needed whenever there's a possibility of a "forked Container", * where the same local state is pending in two different running Containers, each of @@ -3651,7 +3646,6 @@ export class ContainerRuntime // Make sure Outbox is empty before entering staging mode, // since we mark whole batches as "staged" or not to indicate whether to submit them. this.flush(); - this.stagingModeAutoFlushCount = 0; const exitStagingMode = ( discardOrCommit: () => IPendingMessage["batchInfo"][], @@ -3663,7 +3657,6 @@ export class ContainerRuntime { eventName: "ExitStagingMode", exitMethod, - autoFlushCount: this.stagingModeAutoFlushCount, autoFlushThreshold: this.stagingModeAutoFlushThreshold, }, (event) => { @@ -3679,8 +3672,8 @@ export class ContainerRuntime const batchInfos = discardOrCommit(); event.reportProgress({ batches: batchInfos.length, - batchesOverThreshold: batchInfos.filter( - (b) => b.length > this.stagingModeAutoFlushThreshold, + batchesAtOrOverThreshold: batchInfos.filter( + (b) => b.length >= this.stagingModeAutoFlushThreshold, ).length, }); this.channelCollection.notifyStagingMode(false); @@ -4852,11 +4845,11 @@ export class ContainerRuntime // exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check. // Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects // sequence number changes and throws if unexpected changes are detected. - if (this.inStagingMode) { - if (this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold) { - return; - } - this.stagingModeAutoFlushCount++; + if ( + this.inStagingMode && + this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold + ) { + return; } if (this.flushScheduled) { diff --git a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts index ee01950843ae..1a1bb70ee02d 100644 --- a/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts +++ b/packages/runtime/container-runtime/src/opLifecycle/opGroupingManager.ts @@ -18,7 +18,7 @@ import type { } from "./definitions.js"; /** - * The number of ops in a batch at or above which the batch is considered "large" + * The number of ops in a batch above which the batch is considered "large" * for telemetry purposes. Used by both {@link OpGroupingManager} (GroupLargeBatch event) * and as the default staging-mode auto-flush threshold. */ diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 9076df28a9f4..0b89f0beb6a3 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4478,7 +4478,6 @@ describe("Runtime", () => { eventName: "ContainerRuntime:ExitStagingMode_end", category: "performance", exitMethod: "commit", - autoFlushCount: 1, autoFlushThreshold: threshold, }, ]); From 8819175654d881bf9b5b1eb20749726e8fa50ad5 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Thu, 19 Mar 2026 15:04:04 -0700 Subject: [PATCH 18/21] Fix replayPendingStates batch dedup: use Set like popStagedBatches Array-based tracking pushed duplicate batchInfo refs for multi-message batches. Use Set with identity dedup, matching the pattern already used in popStagedBatches. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/pendingStateManager.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index b411226aa5f2..7fe45991ea54 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -776,7 +776,7 @@ export class PendingStateManager implements IDisposable { const initialPendingMessagesCount = this.pendingMessages.length; let remainingPendingMessagesCount = this.pendingMessages.length; - const replayedBatchInfos: IPendingMessage["batchInfo"][] = []; + const replayedBatchSet = new Set(); let seenStagedBatch = false; @@ -814,7 +814,7 @@ export class PendingStateManager implements IDisposable { if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) { // Resubmit no messages, with the batchId. Will result in another empty batch marker. this.stateHandler.reSubmitBatch([], { batchId, staged, squash }); - replayedBatchInfos.push(pendingMessage.batchInfo); + replayedBatchSet.add(pendingMessage.batchInfo); continue; } @@ -841,7 +841,7 @@ export class PendingStateManager implements IDisposable { ], { batchId, staged, squash }, ); - replayedBatchInfos.push(pendingMessage.batchInfo); + replayedBatchSet.add(pendingMessage.batchInfo); continue; } // else: batchMetadataFlag === true (It's a typical multi-message batch) @@ -881,7 +881,7 @@ export class PendingStateManager implements IDisposable { } this.stateHandler.reSubmitBatch(batch, { batchId, staged, squash }); - replayedBatchInfos.push(pendingMessage.batchInfo); + replayedBatchSet.add(pendingMessage.batchInfo); } if (!committingStagedBatches) { @@ -900,7 +900,7 @@ export class PendingStateManager implements IDisposable { }); } - return replayedBatchInfos; + return [...replayedBatchSet]; } /** From 3ead9292af00eaa3ca74935ee90d74a6cff50cf4 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 25 Mar 2026 14:04:19 -0700 Subject: [PATCH 19/21] Address PR review comments from markfields - Link @defaultValue to largeBatchThreshold instead of hardcoded 1000 - Move exitMethod into eventName for immediate telemetry visibility - Wrap telemetry props in details object to avoid new first-class columns - Add @returns doc comment to replayPendingStates - Add outbox count asserts in accumulate and default threshold tests - Simplify config/runtime option precedence tests to check property value - Move non-threshold tests out of stagingModeAutoFlushThreshold describe block Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/containerRuntime.ts | 17 +- .../src/pendingStateManager.ts | 2 + .../src/test/containerRuntime.spec.ts | 298 +++++++++--------- 3 files changed, 160 insertions(+), 157 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index af30103ea6cf..6f9f91877f49 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -485,7 +485,7 @@ export interface ContainerRuntimeOptions { * * Set to Infinity to only break batches on system events (incoming ops). * - * @defaultValue 1000 + * @defaultValue {@link largeBatchThreshold} (currently 1000) */ readonly stagingModeAutoFlushThreshold: number; @@ -3655,9 +3655,7 @@ export class ContainerRuntime PerformanceEvent.timedExec( this.mc.logger, { - eventName: "ExitStagingMode", - exitMethod, - autoFlushThreshold: this.stagingModeAutoFlushThreshold, + eventName: `ExitStagingMode_${exitMethod}`, }, (event) => { // Final flush of any last staged changes @@ -3671,10 +3669,13 @@ export class ContainerRuntime this.submitIdAllocationOpIfNeeded({ staged: false }); const batchInfos = discardOrCommit(); event.reportProgress({ - batches: batchInfos.length, - batchesAtOrOverThreshold: batchInfos.filter( - (b) => b.length >= this.stagingModeAutoFlushThreshold, - ).length, + details: { + autoFlushThreshold: this.stagingModeAutoFlushThreshold, + batches: batchInfos.length, + batchesAtOrOverThreshold: batchInfos.filter( + (b) => b.length >= this.stagingModeAutoFlushThreshold, + ).length, + }, }); this.channelCollection.notifyStagingMode(false); }, diff --git a/packages/runtime/container-runtime/src/pendingStateManager.ts b/packages/runtime/container-runtime/src/pendingStateManager.ts index 39b0b19ce526..d5e24970b11b 100644 --- a/packages/runtime/container-runtime/src/pendingStateManager.ts +++ b/packages/runtime/container-runtime/src/pendingStateManager.ts @@ -750,6 +750,8 @@ export class PendingStateManager implements IDisposable { * Called when the Container's connection state changes. If the Container gets connected, it replays all the pending * states in its queue. This includes triggering resubmission of unacked ops. * ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer) + * + * @returns The unique batch infos for all batches that were replayed. */ public replayPendingStates( options?: ReplayPendingStateOptions, diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 942be6d8bab4..68d2be7708fa 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4412,6 +4412,12 @@ describe("Runtime", () => { 0, "No ops should be submitted while under threshold in staging mode", ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 5, + "All 5 ops should be in the outbox", + ); controls.commitChanges(); assert.equal( @@ -4473,10 +4479,8 @@ describe("Runtime", () => { controls.commitChanges(); logger.assertMatchAny([ { - eventName: "ContainerRuntime:ExitStagingMode_end", + eventName: "ContainerRuntime:ExitStagingMode_commit_end", category: "performance", - exitMethod: "commit", - autoFlushThreshold: threshold, }, ]); }); @@ -4587,6 +4591,12 @@ describe("Runtime", () => { 0, "Default threshold should suppress turn-based flushing during staging mode", ); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtimeWithThreshold as any).outbox.mainBatchMessageCount, + 2, + "Both ops should still be in the outbox (unflushed)", + ); controls.commitChanges(); }); @@ -4640,33 +4650,6 @@ describe("Runtime", () => { ); }); - it("enterStagingMode flushes any pending outbox contents as non-staged", async () => { - runtimeWithThreshold = await createRuntimeWithThreshold(Infinity); - stubChannelCollection(runtimeWithThreshold); - submittedOps.length = 0; - - // Submit ops before entering staging mode (not yet flushed — still in outbox) - submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("pre1")); - submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("pre2")); - assert.equal(submittedOps.length, 0, "Ops not yet flushed"); - - // Enter staging mode — should flush pre-staging ops as non-staged - const controls = runtimeWithThreshold.enterStagingMode(); - assert.equal( - submittedOps.length, - 2, - "Pre-staging ops should be flushed on enterStagingMode", - ); - - // Submit more ops while in staging mode - submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("staged1")); - runtimeWithThreshold.flush(); - assert.equal(submittedOps.length, 2, "Staged ops should NOT be submitted to wire"); - - controls.commitChanges(); - assert.equal(submittedOps.length, 3, "All ops should be submitted after commit"); - }); - it("config override takes precedence over runtime option", async () => { const configThreshold = 5; const runtimeOptionThreshold = 50; @@ -4685,44 +4668,24 @@ describe("Runtime", () => { }, provideEntryPoint: mockProvideEntryPoint, })) as unknown as ContainerRuntime_WithPrivates; - stubChannelCollection(runtimeWithThreshold); - submittedOps.length = 0; - - runtimeWithThreshold.enterStagingMode(); - - // Submit ops up to the config override threshold (5), not the runtime option (50) - for (let i = 0; i < configThreshold; i++) { - submitDataStoreOp(runtimeWithThreshold, `${i}`, genTestDataStoreMessage(`op${i}`)); - } - // Threshold reached — scheduleFlush falls through, flush happens on next microtask - await Promise.resolve(); assert.equal( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any - (runtimeWithThreshold as any).outbox.mainBatchMessageCount, - 0, - "Outbox should be empty — config override threshold (5) should win over runtime option (50)", + (runtimeWithThreshold as any).stagingModeAutoFlushThreshold, + configThreshold, + "Config override threshold (5) should win over runtime option (50)", ); }); it("runtime option takes precedence over default", async () => { const runtimeOptionThreshold = 5; runtimeWithThreshold = await createRuntimeWithThreshold(runtimeOptionThreshold); - stubChannelCollection(runtimeWithThreshold); - submittedOps.length = 0; - - runtimeWithThreshold.enterStagingMode(); - - for (let i = 0; i < runtimeOptionThreshold; i++) { - submitDataStoreOp(runtimeWithThreshold, `${i}`, genTestDataStoreMessage(`op${i}`)); - } - await Promise.resolve(); assert.equal( // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any - (runtimeWithThreshold as any).outbox.mainBatchMessageCount, - 0, - "Outbox should be empty — runtime option (5) should win over default (1000)", + (runtimeWithThreshold as any).stagingModeAutoFlushThreshold, + runtimeOptionThreshold, + "Runtime option (5) should win over default (1000)", ); }); @@ -4763,47 +4726,6 @@ describe("Runtime", () => { ); }); - it("reconnect breaks batch during staging mode", async () => { - mockContext = getMockContext() as IContainerContext; - runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ - context: mockContext as IContainerContext, - registry: new FluidDataStoreRegistry([]), - existing: false, - runtimeOptions: { - stagingModeAutoFlushThreshold: Infinity, - enableGroupedBatching: false, - }, - provideEntryPoint: mockProvideEntryPoint, - })) as unknown as ContainerRuntime_WithPrivates; - stubChannelCollection(runtimeWithThreshold); - submittedOps.length = 0; - - runtimeWithThreshold.enterStagingMode(); - - // Submit ops while connected (sitting in outbox under threshold) - submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1")); - submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2")); - assert.equal( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any - (runtimeWithThreshold as any).outbox.mainBatchMessageCount, - 2, - "2 ops in outbox before disconnect", - ); - - // Disconnect - changeConnectionState(runtimeWithThreshold, false, "disconnectedClientId"); - - // Reconnect — triggers flush and replayPendingStates - changeConnectionState(runtimeWithThreshold, true, mockClientId); - - assert.equal( - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any - (runtimeWithThreshold as any).outbox.mainBatchMessageCount, - 0, - "Outbox should be drained after reconnect", - ); - }); - it("IdAllocation + reconnect while in staging mode does not hit coherency check", async () => { // This is the highest-risk scenario from Mark's test plan. // The fix in b4e1fd1dd25 added scheduleFlush() after submitIdAllocationOpIfNeeded @@ -4864,65 +4786,143 @@ describe("Runtime", () => { "Should not throw coherency check — IdAllocation op should have been flushed by the 'op' handler", ); }); + }); - it("reconnect resubmits pre-staged batches with threshold active", async () => { - mockContext = getMockContext() as IContainerContext; - runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({ - context: mockContext as IContainerContext, - registry: new FluidDataStoreRegistry([]), - existing: false, - runtimeOptions: { - stagingModeAutoFlushThreshold: Infinity, - enableGroupedBatching: false, - }, - provideEntryPoint: mockProvideEntryPoint, - })) as unknown as ContainerRuntime_WithPrivates; - stubChannelCollection(runtimeWithThreshold); - submittedOps.length = 0; + it("enterStagingMode flushes any pending outbox contents as non-staged", async () => { + const context = getMockContext() as IContainerContext; + const runtime = (await ContainerRuntime.loadRuntime2({ + context: context as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtime); + submittedOps.length = 0; - // Submit ops BEFORE entering staging mode - submitDataStoreOp( - runtimeWithThreshold, - "pre1", - genTestDataStoreMessage("pre-staging-op"), - ); - await Promise.resolve(); - assert.equal(submittedOps.length, 1, "Pre-staging op should be submitted"); + // Submit ops before entering staging mode (not yet flushed — still in outbox) + submitDataStoreOp(runtime, "1", genTestDataStoreMessage("pre1")); + submitDataStoreOp(runtime, "2", genTestDataStoreMessage("pre2")); + assert.equal(submittedOps.length, 0, "Ops not yet flushed"); - // Enter staging mode and submit more ops - const controls = runtimeWithThreshold.enterStagingMode(); - submitDataStoreOp( - runtimeWithThreshold, - "staged1", - genTestDataStoreMessage("staged-op"), - "STAGED_META", - ); - runtimeWithThreshold.flush(); - assert.equal(submittedOps.length, 1, "Staged op should not be submitted to wire"); + // Enter staging mode — should flush pre-staging ops as non-staged + const controls = runtime.enterStagingMode(); + assert.equal( + submittedOps.length, + 2, + "Pre-staging ops should be flushed on enterStagingMode", + ); - // Disconnect - changeConnectionState(runtimeWithThreshold, false, "disconnectedClientId"); - submittedOps.length = 0; + // Submit more ops while in staging mode + submitDataStoreOp(runtime, "3", genTestDataStoreMessage("staged1")); + runtime.flush(); + assert.equal(submittedOps.length, 2, "Staged ops should NOT be submitted to wire"); - // Reconnect — replayPendingStates resubmits all pending ops - changeConnectionState(runtimeWithThreshold, true, mockClientId); - await Promise.resolve(); + controls.commitChanges(); + assert.equal(submittedOps.length, 3, "All ops should be submitted after commit"); + runtime.dispose(); + }); - // The pre-staging op should be resubmitted to the wire. - // The staged op stays in PSM as staged (not sent to wire). - assert( - submittedOps.length > 0, - "Pre-staging op should be resubmitted after reconnect", - ); - const opsAfterReconnect = submittedOps.length; + it("reconnect breaks batch during staging mode", async () => { + const context = getMockContext() as IContainerContext; + const runtime = (await ContainerRuntime.loadRuntime2({ + context: context as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtime); + submittedOps.length = 0; - // Verify we can still commit the staged changes - controls.commitChanges(); - assert( - submittedOps.length > opsAfterReconnect, - "Staged op should be submitted after commitChanges", - ); - }); + runtime.enterStagingMode(); + + // Submit ops while connected (sitting in outbox under threshold) + submitDataStoreOp(runtime, "1", genTestDataStoreMessage("op1")); + submitDataStoreOp(runtime, "2", genTestDataStoreMessage("op2")); + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtime as any).outbox.mainBatchMessageCount, + 2, + "2 ops in outbox before disconnect", + ); + + // Disconnect + changeConnectionState(runtime, false, "disconnectedClientId"); + + // Reconnect — triggers flush and replayPendingStates + changeConnectionState(runtime, true, mockClientId); + + assert.equal( + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-explicit-any + (runtime as any).outbox.mainBatchMessageCount, + 0, + "Outbox should be drained after reconnect", + ); + runtime.dispose(); + }); + + it("reconnect resubmits pre-staged batches", async () => { + const context = getMockContext() as IContainerContext; + const runtime = (await ContainerRuntime.loadRuntime2({ + context: context as IContainerContext, + registry: new FluidDataStoreRegistry([]), + existing: false, + runtimeOptions: { + enableGroupedBatching: false, + }, + provideEntryPoint: mockProvideEntryPoint, + })) as unknown as ContainerRuntime_WithPrivates; + stubChannelCollection(runtime); + submittedOps.length = 0; + + // Submit ops BEFORE entering staging mode + submitDataStoreOp( + runtime, + "pre1", + genTestDataStoreMessage("pre-staging-op"), + ); + await Promise.resolve(); + assert.equal(submittedOps.length, 1, "Pre-staging op should be submitted"); + + // Enter staging mode and submit more ops + const controls = runtime.enterStagingMode(); + submitDataStoreOp( + runtime, + "staged1", + genTestDataStoreMessage("staged-op"), + "STAGED_META", + ); + runtime.flush(); + assert.equal(submittedOps.length, 1, "Staged op should not be submitted to wire"); + + // Disconnect + changeConnectionState(runtime, false, "disconnectedClientId"); + submittedOps.length = 0; + + // Reconnect — replayPendingStates resubmits all pending ops + changeConnectionState(runtime, true, mockClientId); + await Promise.resolve(); + + // The pre-staging op should be resubmitted to the wire. + // The staged op stays in PSM as staged (not sent to wire). + assert( + submittedOps.length > 0, + "Pre-staging op should be resubmitted after reconnect", + ); + const opsAfterReconnect = submittedOps.length; + + // Verify we can still commit the staged changes + controls.commitChanges(); + assert( + submittedOps.length > opsAfterReconnect, + "Staged op should be submitted after commitChanges", + ); + runtime.dispose(); }); }); }); From fa989ed14e5d6440a4f95d3a8da9433468a124e9 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 25 Mar 2026 15:10:04 -0700 Subject: [PATCH 20/21] fix: collapse multi-line call to satisfy biome formatting Co-Authored-By: Claude Opus 4.6 (1M context) --- .../container-runtime/src/test/containerRuntime.spec.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 6336cabc0c03..463081cadbf9 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4897,11 +4897,7 @@ describe("Runtime", () => { submittedOps.length = 0; // Submit ops BEFORE entering staging mode - submitDataStoreOp( - runtime, - "pre1", - genTestDataStoreMessage("pre-staging-op"), - ); + submitDataStoreOp(runtime, "pre1", genTestDataStoreMessage("pre-staging-op")); await Promise.resolve(); assert.equal(submittedOps.length, 1, "Pre-staging op should be submitted"); From 96ac9315077e26d4b0954cbfd32f7b56da6b3d86 Mon Sep 17 00:00:00 2001 From: Tony Murphy Date: Wed, 25 Mar 2026 15:47:33 -0700 Subject: [PATCH 21/21] fix: resolve api-extractor @link error and unnecessary type assertions Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/runtime/container-runtime/src/containerRuntime.ts | 2 +- .../container-runtime/src/test/containerRuntime.spec.ts | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index 2c7edb067493..407d93166c52 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -485,7 +485,7 @@ export interface ContainerRuntimeOptions { * * Set to Infinity to only break batches on system events (incoming ops). * - * @defaultValue {@link largeBatchThreshold} (currently 1000) + * @defaultValue `largeBatchThreshold` (currently 1000) */ readonly stagingModeAutoFlushThreshold: number; diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index 463081cadbf9..14f0f6b00620 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -4807,7 +4807,7 @@ describe("Runtime", () => { it("enterStagingMode flushes any pending outbox contents as non-staged", async () => { const context = getMockContext() as IContainerContext; const runtime = (await ContainerRuntime.loadRuntime2({ - context: context as IContainerContext, + context, registry: new FluidDataStoreRegistry([]), existing: false, runtimeOptions: { @@ -4844,7 +4844,7 @@ describe("Runtime", () => { it("reconnect breaks batch during staging mode", async () => { const context = getMockContext() as IContainerContext; const runtime = (await ContainerRuntime.loadRuntime2({ - context: context as IContainerContext, + context, registry: new FluidDataStoreRegistry([]), existing: false, runtimeOptions: { @@ -4885,7 +4885,7 @@ describe("Runtime", () => { it("reconnect resubmits pre-staged batches", async () => { const context = getMockContext() as IContainerContext; const runtime = (await ContainerRuntime.loadRuntime2({ - context: context as IContainerContext, + context, registry: new FluidDataStoreRegistry([]), existing: false, runtimeOptions: {