Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
b0f4781
Add stagingModeMaxBatchOps to control batch boundaries during staging…
anthony-murphy-agent Feb 27, 2026
0a3a73d
Rename stagingModeMaxBatchOps to stagingModeAutoFlushThreshold
anthony-murphy-agent Feb 27, 2026
ebf2e66
Apply default at assignment and add config override
anthony-murphy-agent Feb 27, 2026
a977adc
Address PR review feedback
anthony-murphy-agent Feb 27, 2026
7ad4783
Remove DisableFlushBeforeProcess feature flag
anthony-murphy Mar 18, 2026
2e5e536
Tie defaultStagingModeAutoFlushThreshold to largeBatchThreshold constant
anthony-murphy Mar 18, 2026
869a38c
Merge branch 'remove-disable-flush-before-process' into staging-batch…
anthony-murphy Mar 18, 2026
7d02235
Expose stagingModeAutoFlushThreshold on public ContainerRuntimeOptions
anthony-murphy Mar 18, 2026
b4921d8
Add staging mode threshold tests and fix telemetry expectations
anthony-murphy Mar 18, 2026
839491d
Add IdAllocation+reconnect and pre-staged resubmit tests
anthony-murphy Mar 18, 2026
9c29e85
Log StagingModeAutoFlush telemetry when threshold is hit
anthony-murphy Mar 18, 2026
66cb285
Wrap exitStagingMode in a PerformanceEvent
anthony-murphy Mar 18, 2026
c82ea67
Wrap exitStagingMode in PerformanceEvent with batch telemetry
anthony-murphy Mar 19, 2026
fad8c0e
Remove e2e test for deleted DisableFlushBeforeProcess flag
anthony-murphy Mar 19, 2026
b026458
Merge branch 'remove-disable-flush-before-process' into staging-batch…
anthony-murphy Mar 19, 2026
4f3aa38
Remove unused typeFromBatchedOp helper
anthony-murphy Mar 19, 2026
4b4a868
Merge branch 'remove-disable-flush-before-process' into staging-batch…
anthony-murphy Mar 19, 2026
a77b5c7
Merge remote-tracking branch 'upstream/main' into staging-batch-control
anthony-murphy Mar 19, 2026
0dfedf7
Fix GroupLargeBatch threshold and staged batch counting
anthony-murphy Mar 19, 2026
5c284e4
Merge main into staging-batch-control
anthony-murphy Mar 19, 2026
fffd94f
Add stagingModeAutoFlushThreshold to downstream type satisfies checks
anthony-murphy Mar 19, 2026
bf6a381
Address review comments: remove autoFlushCount member, fix JSDoc
anthony-murphy Mar 19, 2026
8819175
Fix replayPendingStates batch dedup: use Set like popStagedBatches
anthony-murphy Mar 19, 2026
d780ccd
Merge main into staging-batch-control
anthony-murphy Mar 23, 2026
c43a390
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy Mar 25, 2026
3ead929
Address PR review comments from markfields
anthony-murphy Mar 25, 2026
d590f0e
Merge remote-tracking branch 'upstream/main' into staging-batch-control
anthony-murphy Mar 25, 2026
fa989ed
fix: collapse multi-line call to satisfy biome formatting
anthony-murphy Mar 25, 2026
96ac931
fix: resolve api-extractor @link error and unnecessary type assertions
anthony-murphy Mar 25, 2026
d2d96a1
Merge branch 'main' of https://github.com/microsoft/FluidFramework in…
anthony-murphy Mar 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .claude/skills/trigger-pipelines-for-copilot-pr/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Second comment:
Posting those comments will trigger all our pipelines, which is necessary for PRs that are created by Copilot.

To post the comments first check if the GitHub CLI is available,
and if so use `gh pr comment <PULL_REQUEST_NUMBER> --repo microsoft/FluidFramework --body "<COMMENT_TEXT>"`.
and if so use `MSYS_NO_PATHCONV=1 gh pr comment <PULL_REQUEST_NUMBER> --repo microsoft/FluidFramework --body "<COMMENT_TEXT>"`.
Note: `MSYS_NO_PATHCONV=1` is required on Windows (Git Bash) to prevent `/azp` from being expanded to `C:/Program Files/Git/azp`.
If `gh` is not available but `$GITHUB_TOKEN` is, you can try the GitHub REST API directly, e.g.:

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export interface ContainerRuntimeOptions {
readonly gcOptions: IGCRuntimeOptions;
readonly loadSequenceNumberVerification: "close" | "log" | "bypass";
readonly maxBatchSizeInBytes: number;
readonly stagingModeAutoFlushThreshold: number;
// (undocumented)
readonly summaryOptions: ISummaryRuntimeOptions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ export type RuntimeOptionsAffectingDocSchema = Omit<
| "maxBatchSizeInBytes"
| "loadSequenceNumberVerification"
| "summaryOptions"
| "stagingModeAutoFlushThreshold"
| "disableSchemaUpgrade"
>;

Expand Down
106 changes: 87 additions & 19 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ import {
DuplicateBatchDetector,
ensureContentsDeserialized,
type IBatchCheckpoint,
largeBatchThreshold,
OpCompressor,
OpDecompressor,
OpGroupingManager,
Expand All @@ -257,6 +258,7 @@ import {
type IPendingLocalState,
PendingStateManager,
type PendingBatchResubmitMetadata,
type IPendingMessage,
} from "./pendingStateManager.js";
import { BatchRunCounter, RunCounter } from "./runCounter.js";
import {
Expand Down Expand Up @@ -475,6 +477,18 @@ export interface ContainerRuntimeOptions {
*/
readonly createBlobPayloadPending: true | undefined;

/**
* Controls automatic batch flushing during staging mode.
* Normal turn-based/async flush scheduling is suppressed while in staging mode
* until the accumulated batch reaches this many ops, at which point the batch
* is flushed. Incoming ops always break the current batch regardless of this setting.
*
* Set to Infinity to only break batches on system events (incoming ops).
*
* @defaultValue `largeBatchThreshold` (currently 1000)
*/
readonly stagingModeAutoFlushThreshold: number;

/**
* When this property is set to true, the runtime will never send DocumentSchemaChange ops
* and will throw an error if any incoming DocumentSchemaChange ops are received.
Expand Down Expand Up @@ -613,6 +627,16 @@ const defaultMaxBatchSizeInBytes = 700 * 1024;

const defaultChunkSizeInBytes = 204800;

/**
* Default maximum ops per staging-mode batch before automatic flush scheduling resumes.
*
* Chosen based on production telemetry: copy-paste operations routinely produce batches
* of 1000+ ops (435K instances over 30 days), and receivers on modern Fluid versions
* handle them without issues. Uses {@link largeBatchThreshold} to stay aligned with
* the existing "large batch" telemetry threshold ({@link OpGroupingManager}).
*/
const defaultStagingModeAutoFlushThreshold = largeBatchThreshold;

/**
* The default time to wait for pending ops to be processed during summarization
*/
Expand Down Expand Up @@ -967,6 +991,7 @@ export class ContainerRuntime
loadSequenceNumberVerification: "close",
maxBatchSizeInBytes: defaultMaxBatchSizeInBytes,
chunkSizeInBytes: defaultChunkSizeInBytes,
stagingModeAutoFlushThreshold: defaultStagingModeAutoFlushThreshold,
disableSchemaUpgrade: false,
};

Expand All @@ -993,6 +1018,7 @@ export class ContainerRuntime
? disabledCompressionConfig
: defaultConfigs.compressionOptions,
createBlobPayloadPending = defaultConfigs.createBlobPayloadPending,
stagingModeAutoFlushThreshold = defaultConfigs.stagingModeAutoFlushThreshold,
disableSchemaUpgrade = defaultConfigs.disableSchemaUpgrade,
}: IContainerRuntimeOptionsInternal = runtimeOptions;

Expand Down Expand Up @@ -1223,6 +1249,7 @@ export class ContainerRuntime
enableGroupedBatching,
explicitSchemaControl,
createBlobPayloadPending,
stagingModeAutoFlushThreshold,
disableSchemaUpgrade,
};

Expand Down Expand Up @@ -1402,6 +1429,7 @@ export class ContainerRuntime

private readonly batchRunner = new BatchRunCounter();
private readonly _flushMode: FlushMode;
private readonly stagingModeAutoFlushThreshold: number;
/**
* BatchId tracking is needed whenever there's a possibility of a "forked Container",
* where the same local state is pending in two different running Containers, each of
Expand Down Expand Up @@ -1834,6 +1862,10 @@ export class ContainerRuntime
this.closeFn(error);
throw error;
}
this.stagingModeAutoFlushThreshold =
this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ??
runtimeOptions.stagingModeAutoFlushThreshold ??
defaultStagingModeAutoFlushThreshold;
this.batchIdTrackingEnabled =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ??
this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ??
Expand Down Expand Up @@ -3613,20 +3645,39 @@ export class ContainerRuntime
// since we mark whole batches as "staged" or not to indicate whether to submit them.
this.flush();

const exitStagingMode = (discardOrCommit: () => void): void => {
const exitStagingMode = (
discardOrCommit: () => IPendingMessage["batchInfo"][],
exitMethod: "commit" | "discard",
): void => {
try {
// Final flush of any last staged changes
// NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c
this.outbox.flush();

this.stageControls = undefined;

// During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops).
// Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode.
this.submitIdAllocationOpIfNeeded({ staged: false });
discardOrCommit();

this.channelCollection.notifyStagingMode(false);
PerformanceEvent.timedExec(
this.mc.logger,
{
eventName: `ExitStagingMode_${exitMethod}`,
},
(event) => {
// Final flush of any last staged changes
// NOTE: We can't use this.flush() here, because orderSequentially uses StagingMode and in the rollback case we'll hit assert 0x24c
this.outbox.flush();

this.stageControls = undefined;

// During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops).
// Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode.
this.submitIdAllocationOpIfNeeded({ staged: false });
const batchInfos = discardOrCommit();
event.reportProgress({
details: {
autoFlushThreshold: this.stagingModeAutoFlushThreshold,
batches: batchInfos.length,
batchesAtOrOverThreshold: batchInfos.filter(
(b) => b.length >= this.stagingModeAutoFlushThreshold,
).length,
},
});
this.channelCollection.notifyStagingMode(false);
},
);
} catch (error) {
const normalizedError = normalizeError(error);
this.closeFn(normalizedError);
Expand All @@ -3638,21 +3689,24 @@ export class ContainerRuntime
discardChanges: () =>
exitStagingMode(() => {
// Pop all staged batches from the PSM and roll them back in LIFO order
this.pendingStateManager.popStagedBatches(({ runtimeOp, localOpMetadata }) => {
this.rollbackStagedChange(runtimeOp, localOpMetadata);
});
const batchInfos = this.pendingStateManager.popStagedBatches(
({ runtimeOp, localOpMetadata }) => {
this.rollbackStagedChange(runtimeOp, localOpMetadata);
},
);
this.updateDocumentDirtyState();
}),
return batchInfos;
}, "discard"),
commitChanges: (options) => {
const { squash } = { ...defaultStagingCommitOptions, ...options };
exitStagingMode(() => {
// Replay all staged batches in typical FIFO order.
// We'll be out of staging mode so they'll be sent to the service finally.
this.pendingStateManager.replayPendingStates({
return this.pendingStateManager.replayPendingStates({
committingStagedBatches: true,
squash,
});
});
}, "commit");
},
};

Expand Down Expand Up @@ -4783,6 +4837,20 @@ export class ContainerRuntime
}

private scheduleFlush(): void {
// During staging mode, suppress automatic flush scheduling until the main batch
// reaches or exceeds the threshold.
// Incoming ops still break the batch via direct this.flush() calls elsewhere
// (deltaManager "op" handler, process(), connection changes, getPendingLocalState,
// exitStagingMode). Those all bypass scheduleFlush(), so they're unaffected by this check.
// Additionally, outbox.maybeFlushPartialBatch() (called on every submit) detects
// sequence number changes and throws if unexpected changes are detected.
if (
this.inStagingMode &&
this.outbox.mainBatchMessageCount < this.stagingModeAutoFlushThreshold
) {
return;
}

if (this.flushScheduled) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export {
} from "./remoteMessageProcessor.js";
export {
type EmptyGroupedBatch,
largeBatchThreshold,
OpGroupingManager,
type OpGroupingManagerConfig,
isGroupedBatch,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ import type {
OutboundSingletonBatch,
} from "./definitions.js";

/**
* The number of ops in a batch above which the batch is considered "large"
* for telemetry purposes. Used by both {@link OpGroupingManager} (GroupLargeBatch event)
* and as the default staging-mode auto-flush threshold.
*/
export const largeBatchThreshold = 1000;

/**
* Grouping makes assumptions about the shape of message contents. This interface codifies those assumptions, but does not validate them.
*/
Expand Down Expand Up @@ -123,7 +130,10 @@ export class OpGroupingManager {
return batch as OutboundSingletonBatch;
}

if (batch.messages.length >= 1000) {
// Use > (not >=) so that batches flushed exactly at the staging-mode
// auto-flush threshold (which defaults to largeBatchThreshold) don't
// trigger this event. Only genuinely oversized batches are logged.
if (batch.messages.length > largeBatchThreshold) {
this.logger.sendTelemetryEvent({
eventName: "GroupLargeBatch",
length: batch.messages.length,
Expand Down
22 changes: 17 additions & 5 deletions packages/runtime/container-runtime/src/pendingStateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ export class PendingStateManager implements IDisposable {
clientId !== undefined,
0xa33 /* clientId (from stateHandler) could only be undefined if we've never connected, but we have a CSN so we know that's not the case */,
);

const batchInfo = { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged };
for (const message of batch) {
const {
runtimeOp,
Expand All @@ -446,8 +446,7 @@ export class PendingStateManager implements IDisposable {
runtimeOp,
localOpMetadata,
opMetadata,
// Note: We only will read this off the first message, but put it on all for simplicity
batchInfo: { clientId, batchStartCsn, length: batch.length, ignoreBatchId, staged },
batchInfo,
};
this.pendingMessages.push(pendingMessage);
}
Expand Down Expand Up @@ -751,8 +750,12 @@ export class PendingStateManager implements IDisposable {
* Called when the Container's connection state changes. If the Container gets connected, it replays all the pending
* states in its queue. This includes triggering resubmission of unacked ops.
* ! Note: successfully resubmitting an op that has been successfully sequenced is not possible due to checks in the ConnectionStateHandler (Loader layer)
*
* @returns The unique batch infos for all batches that were replayed.
*/
public replayPendingStates(options?: ReplayPendingStateOptions): void {
public replayPendingStates(
options?: ReplayPendingStateOptions,
): IPendingMessage["batchInfo"][] {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anthony-murphy, @dannimad -- Side note, sometimes I think about changing replayPendingStates to something like dequeuePendingBatchesForReplay. This would simplify the logic (no replay callback, no queue cursor management), and would give the ContainerRuntime access to all the batches both before and after replay. Something to keep in mind if that refactor would ever simplify future work in this area.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot create an issue for this

const { committingStagedBatches, squash } = {
...defaultReplayPendingStatesOptions,
...options,
Expand All @@ -779,6 +782,7 @@ export class PendingStateManager implements IDisposable {

const initialPendingMessagesCount = this.pendingMessages.length;
let remainingPendingMessagesCount = this.pendingMessages.length;
const replayedBatchSet = new Set<IPendingMessage["batchInfo"]>();

let seenStagedBatch = false;

Expand Down Expand Up @@ -816,6 +820,7 @@ export class PendingStateManager implements IDisposable {
if (asEmptyBatchLocalOpMetadata(pendingMessage.localOpMetadata)?.emptyBatch === true) {
// Resubmit no messages, with the batchId. Will result in another empty batch marker.
this.stateHandler.reSubmitBatch([], { batchId, staged, squash });
replayedBatchSet.add(pendingMessage.batchInfo);
continue;
}

Expand All @@ -842,6 +847,7 @@ export class PendingStateManager implements IDisposable {
],
{ batchId, staged, squash },
);
replayedBatchSet.add(pendingMessage.batchInfo);
continue;
}
// else: batchMetadataFlag === true (It's a typical multi-message batch)
Expand Down Expand Up @@ -881,6 +887,7 @@ export class PendingStateManager implements IDisposable {
}

this.stateHandler.reSubmitBatch(batch, { batchId, staged, squash });
replayedBatchSet.add(pendingMessage.batchInfo);
}

if (!committingStagedBatches) {
Expand All @@ -898,6 +905,8 @@ export class PendingStateManager implements IDisposable {
clientId: this.stateHandler.clientId(),
});
}

return [...replayedBatchSet];
}

/**
Expand All @@ -908,11 +917,13 @@ export class PendingStateManager implements IDisposable {
// callback will only be given staged messages with a valid runtime op (i.e. not empty batch and not an initial message with only serialized content)
stagedMessage: IPendingMessage & { runtimeOp: LocalContainerRuntimeMessage },
) => void,
): void {
): IPendingMessage["batchInfo"][] {
const batchSet = new Set<IPendingMessage["batchInfo"]>();
while (!this.pendingMessages.isEmpty()) {
const stagedMessage = this.pendingMessages.peekBack();
if (stagedMessage?.batchInfo.staged === true) {
this.pendingMessages.pop();
batchSet.add(stagedMessage.batchInfo);

if (hasTypicalRuntimeOp(stagedMessage)) {
callback(stagedMessage);
Expand All @@ -925,6 +936,7 @@ export class PendingStateManager implements IDisposable {
this.pendingMessages.toArray().every((m) => m.batchInfo.staged !== true),
0xb89 /* Shouldn't be any more staged messages */,
);
return [...batchSet];
}
}

Expand Down
Loading
Loading