Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2710,12 +2710,10 @@ export class ContainerRuntime
this.emitDirtyDocumentEvent = false;

try {
// Any ID Allocation ops that failed to submit after the pending state was queued need to have
// the corresponding ranges resubmitted (note this call replaces the typical resubmit flow).
// Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from
// before staging mode so we can simply say staged: false.
this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false });
this.scheduleFlush();
// Any ID Allocation ops that failed to submit need to have their ranges included
// in the next allocation op. Release the unfinalized ranges back so that the next
// call to takeNextCreationRange (during replay) will include them.
this._idCompressor?.releaseUnfinalizedCreationRange();

// replay the ops
this.pendingStateManager.replayPendingStates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,8 @@ describe("Runtime", () => {
);
});

it("IdAllocation op from replayPendingStates is flushed, preventing outboxSequenceNumberCoherencyCheck error", async () => {
// Start out disconnected since step 1 is to trigger ID Allocation op on reconnect
it("IDs generated before a lost range are finalized after reconnect", async () => {
// Start out disconnected since step 1 is to generate IDs before reconnect
const connected = false;
const mockContext = getMockContext({ connected }) as IContainerContext;
const mockDeltaManager = mockContext.deltaManager as MockDeltaManager;
Expand All @@ -632,33 +632,45 @@ describe("Runtime", () => {
provideEntryPoint: mockProvideEntryPoint,
});

// 1st compressed id – queued while disconnected (goes to idAllocationBatch).
containerRuntime.idCompressor?.generateCompressedId();
const compressor = containerRuntime.idCompressor;
assert(compressor !== undefined, "Expected idCompressor to be defined");

// Generate an ID while disconnected.
const id1 = compressor.generateCompressedId();
// Simulate the range being taken but lost (e.g. nack'd / disconnect before submit).
compressor.takeNextCreationRange();

// Re-connect – replayPendingStates will submit only an idAllocation op.
// It's now in the Outbox and a flush is scheduled (including this flush was a bug fix)
// Re-connect – replayPendingStates releases unfinalized ranges
// (no IdAllocation op is submitted at this point).
changeConnectionState(containerRuntime, true, mockClientId);

// Simulate a remote op arriving before we submit anything else.
// Bump refSeq and continue execution at the end of the microtask queue.
// This is how Inbound Queue works, and this is necessary to simulate here to allow scheduled flush to happen
// This is how Inbound Queue works, and it makes sure we get coverage of ref seq coherency in this test.
++mockDeltaManager.lastSequenceNumber;
await Promise.resolve();

// 2nd compressed id – its idAllocation op will enter Outbox *after* the ref seq# bumped.
const id2 = containerRuntime.idCompressor?.generateCompressedId();

// This would throw a DataProcessingError from codepath "outboxSequenceNumberCoherencyCheck"
// if we didn't schedule a flush after the idAllocation op submitted during the reconnect.
// (On account of the two ID Allocation ops having different refSeqs but being in the same batch)
// Generate another ID and submit a data store op referencing it.
// This triggers submitIdAllocationOpIfNeeded → takeNextCreationRange.
// Because the unfinalized range was released during replay, both IDs
// are included in the resulting allocation range.
const id2 = compressor.generateCompressedId();
submitDataStoreOp(containerRuntime, "someDS", genTestDataStoreMessage({ id: id2 }));

// Let the Outbox flush so we can check submittedOps length
await Promise.resolve();
assert(
submittedOps.length === 3,
"Expected 3 ops to be submitted (2 ID Allocation, 1 data)",
);
assert.strictEqual(submittedOps.length, 2, "Expected 1 ID Allocation + 1 data op");

// Simulate processing the ID Allocation ack — finalize the range.
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access
compressor.finalizeCreationRange(submittedOps[0].contents);

// Both IDs should now be finalized (positive final IDs in op space).
// Without releaseUnfinalizedCreationRange, id1 would remain a local-only
// ID (negative) that could never be shared with other clients.
const id1OpSpace = compressor.normalizeToOpSpace(id1);
const id2OpSpace = compressor.normalizeToOpSpace(id2);
assert(id1OpSpace >= 0, "id1 should be finalized after allocation range is sequenced");
assert(id2OpSpace >= 0, "id2 should be finalized after allocation range is sequenced");
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface IIdCompressor {
export interface IIdCompressorCore {
beginGhostSession(ghostSessionId: SessionId, ghostSessionCallback: () => void): void;
finalizeCreationRange(range: IdCreationRange): void;
releaseUnfinalizedCreationRange(): void;
serialize(withSession: true): SerializedIdCompressorWithOngoingSession;
serialize(withSession: false): SerializedIdCompressorWithNoSession;
takeNextCreationRange(): IdCreationRange;
Expand Down
50 changes: 21 additions & 29 deletions packages/runtime/id-compressor/src/idCompressor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore {

// #region Final state

// The gen count to be annotated on the range returned by the next call to `takeNextCreationRange`.
// This is updated to be equal to `generatedIdCount` + 1 each time it is called.
/**
* The gen count to be annotated on the range returned by the next call to `takeNextCreationRange`.
* This is advanced to `generatedIdCount` + 1 each time it is called.
* On the other hand, when `releaseUnfinalizedCreationRange` is called,
* this is moved back to the start of the unfinalized range, to ensure those IDs are included in the next range.
*/
private nextRangeBaseGenCount = 1;
private readonly sessions = new Sessions();
private readonly finalSpace = new FinalSpace();
Expand Down Expand Up @@ -244,35 +248,23 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore {
}

public takeUnfinalizedCreationRange(): IdCreationRange {
const lastLocalCluster = this.localSession.getLastCluster();
let count: number;
let firstGenCount: number;
if (lastLocalCluster === undefined) {
firstGenCount = 1;
count = this.localGenCount;
} else {
firstGenCount = genCountFromLocalId(
(lastLocalCluster.baseLocalId - lastLocalCluster.count) as LocalCompressedId,
);
count = this.localGenCount - firstGenCount + 1;
}
this.releaseUnfinalizedCreationRange();
return this.takeNextCreationRange();
}

if (count === 0) {
return {
sessionId: this.localSessionId,
};
}
public releaseUnfinalizedCreationRange(): void {
assert(
!this.ongoingGhostSession,
"IdCompressor should not be operated normally when in a ghost session",
);

const range: IdCreationRange = {
ids: {
count,
firstGenCount,
localIdRanges: this.normalizer.getRangesBetween(firstGenCount, this.localGenCount),
requestedClusterSize: this.nextRequestedClusterSize,
},
sessionId: this.localSessionId,
};
return this.updateToRange(range);
const lastLocalCluster = this.localSession.getLastCluster();
this.nextRangeBaseGenCount =
lastLocalCluster === undefined
? 1
: genCountFromLocalId(
(lastLocalCluster.baseLocalId - lastLocalCluster.count) as LocalCompressedId,
);
}

private updateToRange(range: IdCreationRange): IdCreationRange {
Expand Down
89 changes: 89 additions & 0 deletions packages/runtime/id-compressor/src/test/idCompressor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,95 @@ describe("IdCompressor", () => {
);
});
});

describe("by reserving unfinalized ranges for the next take", () => {
it("produces equivalent range to takeUnfinalizedCreationRange on next takeNextCreationRange", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
generateCompressedIds(compressor, 1);
compressor.takeNextCreationRange();

// Reserve instead of take
compressor.releaseUnfinalizedCreationRange();

// Next takeNextCreationRange should cover the unfinalized IDs
const range = compressor.takeNextCreationRange();
assert.deepEqual(range.ids, {
firstGenCount: 1,
count: 1,
localIdRanges: [[1, 1]],
requestedClusterSize: 2,
});
});

it("includes new IDs generated after reserving", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
generateCompressedIds(compressor, 1);
compressor.takeNextCreationRange();

compressor.releaseUnfinalizedCreationRange();
generateCompressedIds(compressor, 1);

const range = compressor.takeNextCreationRange();
assert.deepEqual(range.ids, {
firstGenCount: 1,
count: 2,
localIdRanges: [[1, 2]],
requestedClusterSize: 2,
});
});

it("is a no-op when there are no unfinalized IDs", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
generateCompressedIds(compressor, 1);
compressor.finalizeCreationRange(compressor.takeNextCreationRange());

compressor.releaseUnfinalizedCreationRange();
const range = compressor.takeNextCreationRange();
assert.equal(range.ids, undefined);
});

it("is idempotent", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
generateCompressedIds(compressor, 2);
compressor.takeNextCreationRange();

compressor.releaseUnfinalizedCreationRange();
compressor.releaseUnfinalizedCreationRange();
compressor.releaseUnfinalizedCreationRange();

const range = compressor.takeNextCreationRange();
assert.deepEqual(range.ids, {
firstGenCount: 1,
count: 2,
localIdRanges: [[1, 2]],
requestedClusterSize: 2,
});
});

it("works with multiple outstanding ranges", () => {
const compressor = CompressorFactory.createCompressor(Client.Client1, 2);
generateCompressedIds(compressor, 1);
const range1 = compressor.takeNextCreationRange();
generateCompressedIds(compressor, 1); // one local
compressor.finalizeCreationRange(range1);
compressor.takeNextCreationRange();
generateCompressedIds(compressor, 1); // one eager final
compressor.takeNextCreationRange();
generateCompressedIds(compressor, 1); // one local
compressor.takeNextCreationRange();

compressor.releaseUnfinalizedCreationRange();
const range = compressor.takeNextCreationRange();
assert.deepEqual(range.ids?.firstGenCount, 2);
assert.deepEqual(range.ids?.count, 3);
assert.deepEqual(range.ids?.localIdRanges, [
[2, 1],
[4, 1],
]);

compressor.finalizeCreationRange(range);
});
});
});

describe("Finalizing", () => {
Expand Down
12 changes: 12 additions & 0 deletions packages/runtime/id-compressor/src/types/idCompressor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@ export interface IIdCompressorCore {
*/
takeUnfinalizedCreationRange(): IdCreationRange;

/**
* Resets the next creation range to include all unfinalized IDs.
* After calling this, the next call to `takeNextCreationRange` will produce a range
* covering all unfinalized IDs (equivalent to what `takeUnfinalizedCreationRange` would
* have returned) plus any IDs generated after this call.
*
* Unlike `takeUnfinalizedCreationRange`, this method does not produce or return a range,
* and does not advance the internal range counter. It is useful when the caller wants to
* defer the actual range submission to the next natural `takeNextCreationRange` call.
*/
releaseUnfinalizedCreationRange(): void;

/**
* Finalizes the supplied range of IDs (which may be from either a remote or local session).
* @param range - the range of session-local IDs to finalize.
Expand Down
6 changes: 5 additions & 1 deletion packages/runtime/test-runtime-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,11 @@
"typescript": "~5.4.5"
},
"typeValidation": {
"broken": {},
"broken": {
"Class_MockFluidDataStoreContext": {
"forwardCompat": false
}
},
"entrypoint": "legacy"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ declare type current_as_old_for_Class_MockDeltaQueue = requireAssignableTo<TypeO
* typeValidation.broken:
* "Class_MockFluidDataStoreContext": {"forwardCompat": false}
*/
// @ts-expect-error compatibility expected to be broken
declare type old_as_current_for_Class_MockFluidDataStoreContext = requireAssignableTo<TypeOnly<old.MockFluidDataStoreContext>, TypeOnly<current.MockFluidDataStoreContext>>

/*
Expand Down
83 changes: 83 additions & 0 deletions packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,89 @@ describe("Ops on Reconnect", () => {
"Did not receive the ops that were sent in Nack'd state",
);
});

it("ID Allocations properly finalized across resubmit", async () => {
// Setup with idCompressor enabled
await setupFirstContainer({ enableRuntimeIdCompressor: "on" });
await setupSecondContainersDataObject();

const compressor1 = container1Object1.context.idCompressor;
assert(compressor1 !== undefined, "IdCompressor must be enabled");

// Capture any container-closing error so we can surface it as the test failure.
// Without releaseUnfinalizedCreationRange, finalizeCreationRange throws
// "Ranges finalized out of order" during op processing, which closes the
// container. ensureSynchronized filters out closed containers and returns
// normally, so we need to re-throw the closing error ourselves.
let containerCloseError: unknown;
container1.on("closed", (error) => {
containerCloseError = error;
});

// Round-trip a first ID so the compressor has a finalized cluster.
// This ensures the failure path exercises the contiguity check
// (lastCluster.baseLocalId - lastCluster.count !== rangeBaseLocal)
// rather than the first-cluster check (rangeBaseLocal !== -1).
const id1 = compressor1.generateCompressedId();
container1Object1Map1.set("first-op", id1);
await loaderContainerTracker.ensureSynchronized();

// Generate a second ID and submit a DDS op while connected.
// submit() calls submitIdAllocationOpIfNeeded -> takeNextCreationRange,
// which advances nextRangeBaseGenCount past id2's range and queues the
// IdAllocation + DDS op in the outbox batch managers. Because the flush
// mode is TurnBased, the actual flush is deferred to a microtask.
const id2 = compressor1.generateCompressedId();
container1Object1Map1.set("pre-disconnect", id2);

// Disconnect synchronously, before the TurnBased microtask flush fires.
// The IdAllocation and DDS ops are still sitting in the outbox.
assert(container1.clientId);
documentServiceFactory.disconnectClient(container1.clientId, "Disconnected for testing");
assert.equal(container1.connectionState, ConnectionState.Disconnected);

// Reconnect. During setConnectionStateCore:
// 1. flush() moves the outbox batches (including the IdAllocation
// created at submit time) into PendingStateManager.
// canSendOps is still false so nothing goes to the server.
// 2. canSendOps becomes true.
// 3. replayPendingStates() replays from PendingStateManager:
// - releaseUnfinalizedCreationRange is called, which releases the range for id2
// back to the compressor to be included in the next creation range.
// - IdAllocation ops are skipped during resubmit (by design).
// - The DDS op is resubmitted, which calls submitIdAllocationOpIfNeeded
// -> takeNextCreationRange. This range includes id2 (even though id2 itself may be unused).
// NOTE: Without releaseUnfinalizedCreationRange, nextRangeBaseGenCount is already
// past id2 so count=0 and NO IdAllocation is emitted for the replayed batch.
await waitForContainerConnection(container1);

// Generate a third ID after reconnect and submit a DDS op.
// takeNextCreationRange produces a range starting at genCount 3. When the server
// sequences this IdAllocation, finalizeCreationRange checks that ranges are
// contiguous. Without releaseUnfinalizedCreationRange the genCount-2 range
// was never re-sent, so finalizing the genCount-3 range throws
// "Ranges finalized out of order".
const id3 = compressor1.generateCompressedId();
container1Object1Map1.set("post-reconnect", id3);

await loaderContainerTracker.ensureSynchronized();

// If the container closed during op processing, surface its error as the
// test failure. This is what makes the test fail with "Ranges finalized
// out of order" when releaseUnfinalizedCreationRange is commented out.
if (containerCloseError !== undefined) {
throw containerCloseError as Error;
}
assert(!container1.closed, "Container should not have closed");

// Verify all IDs are finalized and usable.
const opSpaceId1 = compressor1.normalizeToOpSpace(id1);
assert(opSpaceId1 >= 0, "First ID should be finalized");
const opSpaceId2 = compressor1.normalizeToOpSpace(id2);
assert(opSpaceId2 >= 0, "Second ID should be finalized after reconnect");
const opSpaceId3 = compressor1.normalizeToOpSpace(id3);
assert(opSpaceId3 >= 0, "Third ID should be finalized after reconnect");
});
});

describe("Ordering of ops that are sent in disconnected state", () => {
Expand Down
Loading