Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b0f4781
Add stagingModeMaxBatchOps to control batch boundaries during staging…
anthony-murphy-agent Feb 27, 2026
0a3a73d
Rename stagingModeMaxBatchOps to stagingModeAutoFlushThreshold
anthony-murphy-agent Feb 27, 2026
ebf2e66
Apply default at assignment and add config override
anthony-murphy-agent Feb 27, 2026
a977adc
Address PR review feedback
anthony-murphy-agent Feb 27, 2026
7ad4783
Remove DisableFlushBeforeProcess feature flag
anthony-murphy Mar 18, 2026
2e5e536
Tie defaultStagingModeAutoFlushThreshold to largeBatchThreshold constant
anthony-murphy Mar 18, 2026
869a38c
Merge branch 'remove-disable-flush-before-process' into staging-batch…
anthony-murphy Mar 18, 2026
7d02235
Expose stagingModeAutoFlushThreshold on public ContainerRuntimeOptions
anthony-murphy Mar 18, 2026
b4921d8
Add staging mode threshold tests and fix telemetry expectations
anthony-murphy Mar 18, 2026
839491d
Add IdAllocation+reconnect and pre-staged resubmit tests
anthony-murphy Mar 18, 2026
9c29e85
Log StagingModeAutoFlush telemetry when threshold is hit
anthony-murphy Mar 18, 2026
66cb285
Wrap exitStagingMode in a PerformanceEvent
anthony-murphy Mar 18, 2026
c82ea67
Wrap exitStagingMode in PerformanceEvent with batch telemetry
anthony-murphy Mar 19, 2026
fad8c0e
Remove e2e test for deleted DisableFlushBeforeProcess flag
anthony-murphy Mar 19, 2026
b026458
Merge branch 'remove-disable-flush-before-process' into staging-batch…
anthony-murphy Mar 19, 2026
4f3aa38
Remove unused typeFromBatchedOp helper
anthony-murphy Mar 19, 2026
4b4a868
Merge branch 'remove-disable-flush-before-process' into staging-batch…
anthony-murphy Mar 19, 2026
a77b5c7
Merge remote-tracking branch 'upstream/main' into staging-batch-control
anthony-murphy Mar 19, 2026
0dfedf7
Fix GroupLargeBatch threshold and staged batch counting
anthony-murphy Mar 19, 2026
5c284e4
Merge main into staging-batch-control
anthony-murphy Mar 19, 2026
fffd94f
Add stagingModeAutoFlushThreshold to downstream type satisfies checks
anthony-murphy Mar 19, 2026
bf6a381
Address review comments: remove autoFlushCount member, fix JSDoc
anthony-murphy Mar 19, 2026
8819175
Fix replayPendingStates batch dedup: use Set like popStagedBatches
anthony-murphy Mar 19, 2026
d780ccd
Merge main into staging-batch-control
anthony-murphy Mar 23, 2026
c43a390
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy Mar 25, 2026
3ead929
Address PR review comments from markfields
anthony-murphy Mar 25, 2026
d590f0e
Merge remote-tracking branch 'upstream/main' into staging-batch-control
anthony-murphy Mar 25, 2026
fa989ed
fix: collapse multi-line call to satisfy biome formatting
anthony-murphy Mar 25, 2026
96ac931
fix: resolve api-extractor @link error and unnecessary type assertions
anthony-murphy Mar 25, 2026
d2d96a1
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export type RuntimeOptionsAffectingDocSchema = Omit<
| "maxBatchSizeInBytes"
| "loadSequenceNumberVerification"
| "summaryOptions"
| "stagingModeAutoFlushThreshold"
>;

/**
Expand Down
43 changes: 43 additions & 0 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,18 @@ export interface ContainerRuntimeOptionsInternal extends ContainerRuntimeOptions
* In that case, batched messages will be sent individually (but still all at the same time).
*/
readonly enableGroupedBatching: boolean;

/**
* Controls automatic batch flushing during staging mode.
* Normal turn-based/async flush scheduling is suppressed while in staging mode
* until the accumulated batch reaches this many ops, at which point the batch
* is flushed. Incoming ops always break the current batch regardless of this setting.
*
* Set to Infinity to only break batches on system events (incoming ops).
*
* @defaultValue 1000
*/
readonly stagingModeAutoFlushThreshold?: number;
}

/**
Expand Down Expand Up @@ -608,6 +620,16 @@ const defaultMaxBatchSizeInBytes = 700 * 1024;

const defaultChunkSizeInBytes = 204800;

/**
* Default maximum ops per staging-mode batch before automatic flush scheduling resumes.
*
* Chosen based on production telemetry: copy-paste operations routinely produce batches
* of 1000+ ops (435K instances over 30 days), and receivers on modern Fluid versions
* handle them without issues. 1000 also matches the existing "large batch" telemetry
* threshold ({@link OpGroupingManager}).
*/
const defaultStagingModeAutoFlushThreshold = 1000;

/**
* The default time to wait for pending ops to be processed during summarization
*/
Expand Down Expand Up @@ -987,6 +1009,7 @@ export class ContainerRuntime
? disabledCompressionConfig
: defaultConfigs.compressionOptions,
createBlobPayloadPending = defaultConfigs.createBlobPayloadPending,
stagingModeAutoFlushThreshold,
}: IContainerRuntimeOptionsInternal = runtimeOptions;

// If explicitSchemaControl is off, ensure that options which require explicitSchemaControl are not enabled.
Expand Down Expand Up @@ -1215,6 +1238,7 @@ export class ContainerRuntime
enableGroupedBatching,
explicitSchemaControl,
createBlobPayloadPending,
stagingModeAutoFlushThreshold,
};

validateMinimumVersionForCollab(updatedMinVersionForCollab);
Expand Down Expand Up @@ -1393,6 +1417,7 @@ export class ContainerRuntime

private readonly batchRunner = new BatchRunCounter();
private readonly _flushMode: FlushMode;
private readonly stagingModeAutoFlushThreshold: number;
/**
* BatchId tracking is needed whenever there's a possibility of a "forked Container",
* where the same local state is pending in two different running Containers, each of
Expand Down Expand Up @@ -1849,6 +1874,10 @@ export class ContainerRuntime
} else {
this._flushMode = runtimeOptions.flushMode;
}
this.stagingModeAutoFlushThreshold =
this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ??
runtimeOptions.stagingModeAutoFlushThreshold ??
defaultStagingModeAutoFlushThreshold;
this.batchIdTrackingEnabled =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ??
this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ??
Expand Down Expand Up @@ -4809,6 +4838,20 @@ export class ContainerRuntime
}

private scheduleFlush(): void {
// During staging mode with a batch threshold, suppress automatic flush scheduling.
// Only flush when the main batch exceeds the threshold.
// Incoming ops still break the batch via direct this.flush() calls elsewhere
// (deltaManager "op" handler, process(), connection changes, getPendingLocalState,
// exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check.
// Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects
// sequence number changes and forces a flush as a safety net.
if (
this.inStagingMode &&
this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold
) {
return;
}

if (this.flushScheduled) {
return;
}
Expand Down
181 changes: 181 additions & 0 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4441,6 +4441,187 @@ describe("Runtime", () => {

assert.equal(containerRuntime.isDirty, false, "Runtime should not be dirty anymore");
});

describe("stagingModeAutoFlushThreshold", () => {
let runtimeWithThreshold: ContainerRuntime_WithPrivates;
let mockContext: Partial<IContainerContext>;

async function createRuntimeWithThreshold(
threshold: number,
): Promise<ContainerRuntime_WithPrivates> {
mockContext = getMockContext() as IContainerContext;
return (await ContainerRuntime.loadRuntime2({
context: mockContext as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
stagingModeAutoFlushThreshold: threshold,
// Disable grouped batching so each op is individually submitted to the wire,
// making it easier to verify op counts.
enableGroupedBatching: false,
},
provideEntryPoint: mockProvideEntryPoint,
})) as unknown as ContainerRuntime_WithPrivates;
}

afterEach(() => {
runtimeWithThreshold?.dispose();
runtimeWithThreshold = undefined as unknown as ContainerRuntime_WithPrivates;
});

it("ops accumulate under threshold during staging mode", async () => {
runtimeWithThreshold = await createRuntimeWithThreshold(10);
stubChannelCollection(runtimeWithThreshold);
submittedOps.length = 0;

const controls = runtimeWithThreshold.enterStagingMode();

// Submit 5 ops across multiple turns — under the threshold of 10
submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1"));
await Promise.resolve();
submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2"));
await Promise.resolve();
submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3"));
await Promise.resolve();
submitDataStoreOp(runtimeWithThreshold, "4", genTestDataStoreMessage("op4"));
await Promise.resolve();
submitDataStoreOp(runtimeWithThreshold, "5", genTestDataStoreMessage("op5"));
await Promise.resolve();

assert.equal(
submittedOps.length,
0,
"No ops should be submitted while under threshold in staging mode",
);

controls.commitChanges();
assert.equal(
submittedOps.length,
5,
"All 5 ops should be submitted after commitChanges",
);
});

it("ops flush when threshold is reached", async () => {
runtimeWithThreshold = await createRuntimeWithThreshold(3);
stubChannelCollection(runtimeWithThreshold);
submittedOps.length = 0;

runtimeWithThreshold.enterStagingMode();

// Submit 3 ops — exactly at the threshold
submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1"));
submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2"));
assert.equal(submittedOps.length, 0, "Under threshold, no flush yet");

submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3"));
// The 3rd op should trigger scheduleFlush to fall through to normal scheduling
await Promise.resolve();

assert.equal(
submittedOps.length,
0,
"Ops should not be submitted while in staging mode (flushed into PSM only)",
);
});

it("incoming ops break the batch regardless of threshold", async () => {
runtimeWithThreshold = await createRuntimeWithThreshold(Infinity);
stubChannelCollection(runtimeWithThreshold);
submittedOps.length = 0;

runtimeWithThreshold.enterStagingMode();

submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1"));
submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2"));
assert.equal(submittedOps.length, 0, "No ops submitted yet");

// Simulate an incoming op — bumps lastSequenceNumber and emits "op"
// The deltaManager "op" handler calls this.flush() directly,
// which moves pending ops from outbox into PSM (as staged batches).
const mockDeltaManager = mockContext.deltaManager as MockDeltaManager;
++mockDeltaManager.lastSequenceNumber;
mockDeltaManager.emit("op", {
clientId: mockClientId,
sequenceNumber: mockDeltaManager.lastSequenceNumber,
clientSequenceNumber: 1,
type: MessageType.ClientJoin,
contents: "test content",
});

// Ops are not submitted to the wire during staging mode — they're flushed to PSM
assert.equal(
submittedOps.length,
0,
"Ops should not be submitted to wire while in staging mode",
);
});

it("exit staging mode flushes remaining ops", async () => {
runtimeWithThreshold = await createRuntimeWithThreshold(Infinity);
stubChannelCollection(runtimeWithThreshold);
submittedOps.length = 0;

const controls = runtimeWithThreshold.enterStagingMode();

submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1"));
submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2"));
submitDataStoreOp(runtimeWithThreshold, "3", genTestDataStoreMessage("op3"));
assert.equal(submittedOps.length, 0, "No ops submitted while staging");

controls.commitChanges();

assert(submittedOps.length > 0, "Ops should be submitted after commitChanges");
});

it("has no effect outside staging mode", async () => {
runtimeWithThreshold = await createRuntimeWithThreshold(Infinity);
stubChannelCollection(runtimeWithThreshold);
submittedOps.length = 0;

// Submit ops without entering staging mode
submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1"));

// Normal turn-based flush should still happen
await Promise.resolve();

assert.equal(
submittedOps.length,
1,
"Op should flush normally when not in staging mode",
);
});

it("default threshold suppresses turn-based flushing during staging mode", async () => {
// Create runtime WITHOUT explicit stagingModeAutoFlushThreshold — uses the default (1000)
mockContext = getMockContext() as IContainerContext;
runtimeWithThreshold = (await ContainerRuntime.loadRuntime2({
context: mockContext as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {},
provideEntryPoint: mockProvideEntryPoint,
})) as unknown as ContainerRuntime_WithPrivates;
stubChannelCollection(runtimeWithThreshold);
submittedOps.length = 0;

const controls = runtimeWithThreshold.enterStagingMode();

// Submit a few ops across turns — well under the default threshold
submitDataStoreOp(runtimeWithThreshold, "1", genTestDataStoreMessage("op1"));
await Promise.resolve();
submitDataStoreOp(runtimeWithThreshold, "2", genTestDataStoreMessage("op2"));
await Promise.resolve();

assert.equal(
submittedOps.length,
0,
"Default threshold should suppress turn-based flushing during staging mode",
);

controls.commitChanges();
});
});
});
});
});