Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 33 additions & 40 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2014,6 +2014,28 @@ export class ContainerRuntime
}),
reSubmit: this.reSubmit.bind(this),
opReentrancy: () => this.dataModelChangeRunner.running,
generateIdAllocationOp: (
useUnfinalizedRange: boolean,
): LocalBatchMessage | undefined => {
if (this._idCompressor === undefined) {
return undefined;
}
const idRange = useUnfinalizedRange
? this._idCompressor.takeUnfinalizedCreationRange()
: this._idCompressor.takeNextCreationRange();
if (idRange.ids === undefined) {
return undefined;
}
const idAllocationMessage: ContainerRuntimeIdAllocationMessage = {
type: ContainerMessageType.IdAllocation,
contents: idRange,
};
return {
runtimeOp: idAllocationMessage,
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
staged: false,
};
},
});

this._quorum = quorum;
Expand Down Expand Up @@ -2715,12 +2737,11 @@ export class ContainerRuntime
this.emitDirtyDocumentEvent = false;

try {
// Any ID Allocation ops that failed to submit after the pending state was queued need to have
// the corresponding ranges resubmitted (note this call replaces the typical resubmit flow).
// Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from
// before staging mode so we can simply say staged: false.
this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false });
this.scheduleFlush();
// Signal to the outbox that the next JIT ID allocation should use the comprehensive
// unfinalized range (instead of incremental). The range will be prepended to the
// first replayed batch, preserving batch structure for fork detection.
this.outbox.requestUnfinalizedIdRanges();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This mechanism can go away if we merge #26784 first

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in! 🎉

//* CLAUDE: We need to handle the case where replayPendingStates submits to ops.

// replay the ops
this.pendingStateManager.replayPendingStates();
Expand Down Expand Up @@ -3630,9 +3651,10 @@ export class ContainerRuntime

this.stageControls = undefined;

// During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops).
// Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode.
this.submitIdAllocationOpIfNeeded({ staged: false });
// During Staging Mode, we avoid submitting any ID Allocation ops.
// IDs accumulated during staging will be captured by JIT ID allocation
// at the next non-staged flush (during commit replay or the next regular submit).
// For discard, IDs harmlessly remain in the compressor.
discardOrCommit();

this.channelCollection.notifyStagingMode(false);
Expand Down Expand Up @@ -4663,33 +4685,6 @@ export class ContainerRuntime
return this.blobManager.lookupTemporaryBlobStorageId(localId);
}

private submitIdAllocationOpIfNeeded({
resubmitOutstandingRanges = false,
staged,
}: {
resubmitOutstandingRanges?: boolean;
staged: boolean;
}): void {
if (this._idCompressor) {
const idRange = resubmitOutstandingRanges
? this._idCompressor.takeUnfinalizedCreationRange()
: this._idCompressor.takeNextCreationRange();
// Don't include the idRange if there weren't any Ids allocated
if (idRange.ids !== undefined) {
const idAllocationMessage: ContainerRuntimeIdAllocationMessage = {
type: ContainerMessageType.IdAllocation,
contents: idRange,
};
const idAllocationBatchMessage: LocalBatchMessage = {
runtimeOp: idAllocationMessage,
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
staged,
};
this.outbox.submitIdAllocation(idAllocationBatchMessage);
}
}
}

private submit(
containerRuntimeMessage: LocalContainerRuntimeMessage,
localOpMetadata: unknown = undefined,
Expand Down Expand Up @@ -4733,10 +4728,8 @@ export class ContainerRuntime
0xbba /* Unexpected message type submitted in Staging Mode */,
);

// Before submitting any non-staged change, submit the ID Allocation op to cover any compressed IDs included in the op.
if (!staged) {
this.submitIdAllocationOpIfNeeded({ staged: false });
}
// ID Allocation ops are generated just-in-time at flush time by the Outbox,
// ensuring correct refSeq even after rebase. No need to submit them here.

// Allow document schema controller to send a message if it needs to propose change in document schema.
// If it needs to send a message, it will call provided callback with payload of such message and rely
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ export interface IBatchManagerOptions {
* If true, the outbox is allowed to rebase the batch during flushing.
*/
readonly canRebase: boolean;

/**
* If true, don't compare batchID of incoming batches to this. e.g. ID Allocation Batch IDs should be ignored
*/
readonly ignoreBatchId?: boolean;
}

export interface BatchSequenceNumbers {
Expand Down Expand Up @@ -127,8 +122,11 @@ export class BatchManager {

/**
* Gets the pending batch and clears state for the next batch.
*
* @remarks The returned batch does not have batch metadata stamped (batch start/end markers, batchId).
* The caller is responsible for calling {@link addBatchMetadata} after any modifications (e.g. prepending messages).
*/
public popBatch(batchId?: BatchId): LocalBatch {
public popBatch(): LocalBatch {
assert(this.pendingBatch[0] !== undefined, 0xb8a /* expected non-empty batch */);
const batch: LocalBatch = {
messages: this.pendingBatch,
Expand All @@ -141,7 +139,7 @@ export class BatchManager {
this.clientSequenceNumber = undefined;
this.hasReentrantOps = false;

return addBatchMetadata(batch, batchId);
return batch;
}

/**
Expand Down
98 changes: 62 additions & 36 deletions packages/runtime/container-runtime/src/opLifecycle/outbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {

import {
BatchManager,
addBatchMetadata,
type BatchSequenceNumbers,
sequenceNumbersMatch,
type BatchId,
Expand Down Expand Up @@ -71,6 +72,17 @@ export interface IOutboxParameters {
readonly getCurrentSequenceNumbers: () => BatchSequenceNumbers;
readonly reSubmit: (message: PendingMessageResubmitData, squash: boolean) => void;
readonly opReentrancy: () => boolean;
/**
* JIT callback to generate an ID allocation op at flush time.
* Called after rebase (if any), so the returned message has the correct refSeq.
*
* @param useUnfinalizedRange - If true, use `takeUnfinalizedCreationRange()` (for reconnect).
* Otherwise use `takeNextCreationRange()` (normal flush).
* @returns A LocalBatchMessage for the ID allocation op, or undefined if no IDs need allocating.
*/
readonly generateIdAllocationOp: (
useUnfinalizedRange: boolean,
) => LocalBatchMessage | undefined;
}

/**
Expand Down Expand Up @@ -195,9 +207,9 @@ export class Outbox {
private readonly logger: ITelemetryLoggerExt;
private readonly mainBatch: BatchManager;
private readonly blobAttachBatch: BatchManager;
private readonly idAllocationBatch: BatchManager;
private batchRebasesToReport = 5;
private rebasing = false;
private useUnfinalizedRangeOnNextFlush = false;

/**
* Track the number of ops which were detected to have a mismatched
Expand All @@ -213,14 +225,10 @@ export class Outbox {

this.mainBatch = new BatchManager({ canRebase: true });
this.blobAttachBatch = new BatchManager({ canRebase: true });
this.idAllocationBatch = new BatchManager({
canRebase: false,
ignoreBatchId: true,
});
}

public get messageCount(): number {
return this.mainBatch.length + this.blobAttachBatch.length + this.idAllocationBatch.length;
return this.mainBatch.length + this.blobAttachBatch.length;
}

public get mainBatchMessageCount(): number {
Expand All @@ -231,9 +239,11 @@ export class Outbox {
return this.blobAttachBatch.length;
}

public get idAllocationBatchMessageCount(): number {
return this.idAllocationBatch.length;
}
/**
* @remarks With JIT ID allocation, pending IDs are tracked by the compressor, not the outbox.
* This always returns 0. ID allocation ops are generated at flush time.
*/
public readonly idAllocationBatchMessageCount: number = 0;

public get isEmpty(): boolean {
return this.messageCount === 0;
Expand All @@ -260,19 +270,16 @@ export class Outbox {
private maybeFlushPartialBatch(): void {
const mainBatchSeqNums = this.mainBatch.sequenceNumbers;
const blobAttachSeqNums = this.blobAttachBatch.sequenceNumbers;
const idAllocSeqNums = this.idAllocationBatch.sequenceNumbers;
assert(
sequenceNumbersMatch(mainBatchSeqNums, blobAttachSeqNums) &&
sequenceNumbersMatch(mainBatchSeqNums, idAllocSeqNums),
sequenceNumbersMatch(mainBatchSeqNums, blobAttachSeqNums),
0x58d /* Reference sequence numbers from both batches must be in sync */,
);

const currentSequenceNumbers = this.params.getCurrentSequenceNumbers();

if (
sequenceNumbersMatch(mainBatchSeqNums, currentSequenceNumbers) &&
sequenceNumbersMatch(blobAttachSeqNums, currentSequenceNumbers) &&
sequenceNumbersMatch(idAllocSeqNums, currentSequenceNumbers)
sequenceNumbersMatch(blobAttachSeqNums, currentSequenceNumbers)
) {
// The reference sequence numbers are stable, there is nothing to do
return;
Expand Down Expand Up @@ -340,10 +347,13 @@ export class Outbox {
this.addMessageToBatchManager(this.blobAttachBatch, message);
}

public submitIdAllocation(message: LocalBatchMessage): void {
this.maybeFlushPartialBatch();

this.addMessageToBatchManager(this.idAllocationBatch, message);
/**
* Signals that on the next JIT ID allocation, the outbox should use
* `takeUnfinalizedCreationRange()` (comprehensive) instead of `takeNextCreationRange()` (incremental).
* Used during reconnect so the first replayed batch includes all outstanding ID ranges.
*/
public requestUnfinalizedIdRanges(): void {
this.useUnfinalizedRangeOnNextFlush = true;
}

private addMessageToBatchManager(
Expand All @@ -366,8 +376,9 @@ export class Outbox {
*/
public flush(resubmitInfo?: BatchResubmitInfo): void {
// We have nothing to flush if all batchManagers are empty, and we we're not needing to resubmit an empty batch placeholder
// Note that it's possible that there are unfinalized ranges in the ID Compressor,
// but there's no urgency to flush those if they're not referenced in any messages.
if (
this.idAllocationBatch.empty &&
this.blobAttachBatch.empty &&
this.mainBatch.empty &&
resubmitInfo?.batchId === undefined
Expand All @@ -383,8 +394,7 @@ export class Outbox {
}

private flushAll(resubmitInfo?: BatchResubmitInfo): void {
const allBatchesEmpty =
this.idAllocationBatch.empty && this.blobAttachBatch.empty && this.mainBatch.empty;
const allBatchesEmpty = this.blobAttachBatch.empty && this.mainBatch.empty;
if (allBatchesEmpty) {
// If we're resubmitting with a batchId and all batches are empty, we need to flush an empty batch.
// Note that we currently resubmit one batch at a time, so on resubmit, 1 of the 2 batches will *always* be empty.
Expand All @@ -397,21 +407,19 @@ export class Outbox {
return;
}

// Don't use resubmittingBatchId for idAllocationBatch.
// ID Allocation messages are not directly resubmitted so don't pass the resubmitInfo
this.flushInternal({
batchManager: this.idAllocationBatch,
// Note: For now, we will never stage ID Allocation messages.
// They won't contain personal info and no harm in extra allocations in case of discarding the staged changes
});
// JIT ID allocation: generateJIT=true on both calls. takeNextCreationRange() is
// cumulative, so the first non-empty batch captures all pending IDs. The second
// call will return undefined (no remaining IDs). This is simpler than picking a target.
this.flushInternal({
batchManager: this.blobAttachBatch,
disableGroupedBatching: true,
resubmitInfo,
generateJIT: true,
});
this.flushInternal({
batchManager: this.mainBatch,
resubmitInfo,
generateJIT: true,
});
}

Expand Down Expand Up @@ -448,13 +456,14 @@ export class Outbox {
batchManager: BatchManager;
disableGroupedBatching?: boolean;
resubmitInfo?: BatchResubmitInfo; // undefined if not resubmitting
generateJIT?: boolean;
}): void {
const { batchManager, disableGroupedBatching = false, resubmitInfo } = params;
const { batchManager, disableGroupedBatching = false, resubmitInfo, generateJIT } = params;
if (batchManager.empty) {
return;
}

const rawBatch = batchManager.popBatch(resubmitInfo?.batchId);
let rawBatch = batchManager.popBatch();

// On resubmit we use the original batch's staged state, so these should match as well.
const staged = rawBatch.staged === true;
Expand All @@ -471,16 +480,32 @@ export class Outbox {
groupingEnabled
) {
assert(!this.rebasing, 0x6fa /* A rebased batch should never have reentrant ops */);
// Rebase the current batch (resubmit the ops one-by-one) and then reinvoke flushInternal.
// If a batch contains reentrant ops (ops created as a result from processing another op)
// it needs to be rebased so that we can ensure consistent reference sequence numbers
// and eventual consistency at the DDS level.
// Note: Since this is happening in the same turn the ops were originally created with,
// and they haven't gone to PendingStateManager yet, we can just let them respect
// ContainerRuntime.inStagingMode. So we do not plumb local 'staged' variable through here.
this.rebase(rawBatch, batchManager);
this.rebase(rawBatch, batchManager, generateJIT);
return;
}

// Generate ID Allocation ops just-in-time, after rebase (if any).
// This ensures the refSeq is correct (matching the rest of the batch) and that
// ID ranges aren't lost during rebase (since reSubmit drops IdAllocation ops).
// Only generate for non-staged batches — ID alloc ops are always non-staged.
if (generateJIT === true && !staged) {
const useUnfinalized = this.useUnfinalizedRangeOnNextFlush;
this.useUnfinalizedRangeOnNextFlush = false;
const idAllocMsg = this.params.generateIdAllocationOp(useUnfinalized);
if (idAllocMsg !== undefined) {
rawBatch = { ...rawBatch, messages: [idAllocMsg, ...rawBatch.messages] };
}
}

addBatchMetadata(rawBatch, resubmitInfo?.batchId);

let clientSequenceNumber: number | undefined;
// Did we disconnect? (i.e. is shouldSend false?)
// If so, do nothing, as pending state manager will resubmit it correctly on reconnect.
Expand All @@ -499,7 +524,6 @@ export class Outbox {
rawBatch.messages,
clientSequenceNumber,
staged,
batchManager.options.ignoreBatchId,
);
}

Expand All @@ -509,7 +533,11 @@ export class Outbox {
*
* @param rawBatch - the batch to be rebased
*/
private rebase(rawBatch: LocalBatch, batchManager: BatchManager): void {
private rebase(
rawBatch: LocalBatch,
batchManager: BatchManager,
generateJIT?: boolean,
): void {
assert(!this.rebasing, 0x6fb /* Reentrancy */);
assert(batchManager.options.canRebase, 0x9a7 /* BatchManager does not support rebase */);

Expand Down Expand Up @@ -538,7 +566,7 @@ export class Outbox {
this.batchRebasesToReport--;
}

this.flushInternal({ batchManager });
this.flushInternal({ batchManager, generateJIT });
this.rebasing = false;
}

Expand Down Expand Up @@ -679,15 +707,13 @@ export class Outbox {
*/
public getBatchCheckpoints(): {
mainBatch: IBatchCheckpoint;
idAllocationBatch: IBatchCheckpoint;
blobAttachBatch: IBatchCheckpoint;
} {
// This variable is declared with a specific type so that we have a standard import of the IBatchCheckpoint type.
// When the type is inferred, the generated .d.ts uses a dynamic import which doesn't resolve.
const mainBatch: IBatchCheckpoint = this.mainBatch.checkpoint();
return {
mainBatch,
idAllocationBatch: this.idAllocationBatch.checkpoint(),
blobAttachBatch: this.blobAttachBatch.checkpoint(),
};
}
Expand Down
Loading
Loading