Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2609,6 +2609,11 @@ export class ContainerRuntime
this.consecutiveReconnects = 0;
}

/**
* ID Allocation op generated before replaying pending states, to be submitted before the next op
*/
private outstandingUnfinalizedIdCreationRangeOp: LocalBatchMessage | undefined = undefined;

private replayPendingStates(): void {
// We need to be able to send ops to replay states
if (!this.shouldSendOps()) {
Expand All @@ -2623,12 +2628,13 @@ export class ContainerRuntime
this.emitDirtyDocumentEvent = false;

try {
// Any ID Allocation ops that failed to submit after the pending state was queued need to have
// the corresponding ranges resubmitted (note this call replaces the typical resubmit flow).
// Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from
// before staging mode so we can simply say staged: false.
this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false });
this.scheduleFlush();
// We need to get the ID Compressor's slate clean before replaying the ops.
// We don't submit this op yet - we'll include it before generating any "next creation range" ops,
// right before submitting other runtime ops.
this.outstandingUnfinalizedIdCreationRangeOp = this.generateIdAllocationOpIfNeeded({
resubmitOutstandingRanges: true,
staged: false, // This won't be submitted while in staging mode so we can always set this to false
});

// replay the ops
this.pendingStateManager.replayPendingStates();
Expand Down Expand Up @@ -3340,6 +3346,9 @@ export class ContainerRuntime

this.outbox.flush(resubmitInfo);
assert(this.outbox.isEmpty, 0x3cf /* reentrancy */);

// Regardless of whether this was set, once we flush we need to clear it because its reference sequence number will soon be out of date, if not already
this.outstandingUnfinalizedIdCreationRangeOp = undefined;
} catch (error) {
const error2 = normalizeError(error, {
props: {
Expand Down Expand Up @@ -3460,7 +3469,11 @@ export class ContainerRuntime

// During Staging Mode, we avoid submitting any ID Allocation ops (apart from resubmitting pre-staging ops).
// Now that we've exited, we need to submit an ID Allocation op for any IDs that were generated while in Staging Mode.
this.submitIdAllocationOpIfNeeded({ staged: false });
const idAllocationOp = this.generateIdAllocationOpIfNeeded({ staged: false });
if (idAllocationOp !== undefined) {
this.outbox.submitIdAllocation(idAllocationOp);
this.scheduleFlush();
}
discardOrCommit();

this.channelCollection.notifyStagingMode(false);
Expand Down Expand Up @@ -4459,13 +4472,26 @@ export class ContainerRuntime
return this.blobManager.createBlob(blob, signal);
}

private submitIdAllocationOpIfNeeded({
/**
* Generate an ID allocation op to be submitted ahead of any runtime ops that might depend
* on the outstanding changes to ID Compressor state.
* The default behavior uses takeNextCreationRange, and the resulting op should be submitted
* before submitting other ops that may have generated a compressed ID.
*
* @param resubmitOutstandingRanges - if true, use takeUnfinalizedCreationRange
* instead of takeNextCreationRange. This is needed before replaying pending state,
* to get a "clean slate" for the ID compressor with regard to any outstanding state from the first time around.
* @param staged - Indicates whether this ID Allocation op should be staged (not submitted yet). Typically expected to be false.
*
* @returns - The generated ID allocation operation or undefined.
*/
private generateIdAllocationOpIfNeeded({
resubmitOutstandingRanges = false,
staged,
}: {
resubmitOutstandingRanges?: boolean;
staged: boolean;
}): void {
}): LocalBatchMessage | undefined {
if (this._idCompressor) {
const idRange = resubmitOutstandingRanges
? this._idCompressor.takeUnfinalizedCreationRange()
Expand All @@ -4481,9 +4507,10 @@ export class ContainerRuntime
referenceSequenceNumber: this.deltaManager.lastSequenceNumber,
staged,
};
this.outbox.submitIdAllocation(idAllocationBatchMessage);
return idAllocationBatchMessage;
}
}
return undefined;
}

private submit(
Expand Down Expand Up @@ -4531,7 +4558,14 @@ export class ContainerRuntime

// Before submitting any non-staged change, submit the ID Allocation op to cover any compressed IDs included in the op.
if (!staged) {
this.submitIdAllocationOpIfNeeded({ staged: false });
if (this.outstandingUnfinalizedIdCreationRangeOp) {
this.outbox.submitIdAllocation(this.outstandingUnfinalizedIdCreationRangeOp);
this.outstandingUnfinalizedIdCreationRangeOp = undefined;
}
const idAllocationOp = this.generateIdAllocationOpIfNeeded({ staged: false });
if (idAllocationOp) {
this.outbox.submitIdAllocation(idAllocationOp);
}
}

// Allow document schema controller to send a message if it needs to propose change in document schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,9 @@ export class PendingStateManager implements IDisposable {
assert(batchMetadataFlag !== false, 0x41b /* We cannot process batches in chunks */);

// The next message starts a batch (possibly single-message), and we'll need its batchId.
const batchId = getEffectiveBatchId(pendingMessage);
const batchId = pendingMessage.batchInfo.ignoreBatchId
? undefined
: getEffectiveBatchId(pendingMessage);

const staged = pendingMessage.batchInfo.staged;

Expand Down
Loading