diff --git a/packages/runtime/container-runtime/src/containerRuntime.ts b/packages/runtime/container-runtime/src/containerRuntime.ts index bc9a5327c784..36f9319d1298 100644 --- a/packages/runtime/container-runtime/src/containerRuntime.ts +++ b/packages/runtime/container-runtime/src/containerRuntime.ts @@ -2710,12 +2710,10 @@ export class ContainerRuntime this.emitDirtyDocumentEvent = false; try { - // Any ID Allocation ops that failed to submit after the pending state was queued need to have - // the corresponding ranges resubmitted (note this call replaces the typical resubmit flow). - // Since we don't submit ID Allocation ops when staged, any outstanding ranges would be from - // before staging mode so we can simply say staged: false. - this.submitIdAllocationOpIfNeeded({ resubmitOutstandingRanges: true, staged: false }); - this.scheduleFlush(); + // Any ID Allocation ops that failed to submit need to have their ranges included + // in the next allocation op. Reset the compressor's unfinalized range cursor so that the next + // call to takeNextCreationRange (during replay) will include those unfinalized ranges. + this._idCompressor?.resetUnfinalizedCreationRange(); // replay the ops this.pendingStateManager.replayPendingStates(); diff --git a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts index edcd40dc68c3..729a0d088226 100644 --- a/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts +++ b/packages/runtime/container-runtime/src/test/containerRuntime.spec.ts @@ -614,8 +614,8 @@ describe("Runtime", () => { ); }); - it("IdAllocation op from replayPendingStates is flushed, preventing outboxSequenceNumberCoherencyCheck error", async () => { - // Start out disconnected since step 1 is to trigger ID Allocation op on reconnect + it("IDs unfinalized due to disconnect are properly finalized after reconnect", async () => { + // Start out disconnected since step 1 is to generate IDs before reconnect const connected = false; const mockContext = getMockContext({ connected }) as IContainerContext; const mockDeltaManager = mockContext.deltaManager as MockDeltaManager; @@ -628,33 +628,49 @@ describe("Runtime", () => { provideEntryPoint: mockProvideEntryPoint, }); - // 1st compressed id – queued while disconnected (goes to idAllocationBatch). - containerRuntime.idCompressor?.generateCompressedId(); + const compressor = containerRuntime.idCompressor; + assert(compressor !== undefined, "Expected idCompressor to be defined"); + + // Generate an ID while disconnected, and take the creation range, + // but do not submit the op to leave the range unfinalized (same as if the op were submitted but not sequenced). + const id1 = compressor.generateCompressedId(); + compressor.takeNextCreationRange(); - // Re-connect – replayPendingStates will submit only an idAllocation op. - // It's now in the Outbox and a flush is scheduled (including this flush was a bug fix) + // Re-connect – replayPendingStates releases unfinalized ranges + // (no IdAllocation op is submitted at this point, but the next will contain the unfinalized range). changeConnectionState(containerRuntime, true, mockClientId); // Simulate a remote op arriving before we submit anything else. - // Bump refSeq and continue execution at the end of the microtask queue. - // This is how Inbound Queue works, and this is necessary to simulate here to allow scheduled flush to happen + // Bump refSeq, emit the "op" event, and continue execution at the end of the microtask queue. + // This is how Inbound Queue works, and it makes sure we get coverage of ref seq coherency in this test. ++mockDeltaManager.lastSequenceNumber; + mockDeltaManager.emit("op", {}); await Promise.resolve(); - // 2nd compressed id – its idAllocation op will enter Outbox *after* the ref seq# bumped. - const id2 = containerRuntime.idCompressor?.generateCompressedId(); - - // This would throw a DataProcessingError from codepath "outboxSequenceNumberCoherencyCheck" - // if we didn't schedule a flush after the idAllocation op submitted during the reconnect. - // (On account of the two ID Allocation ops having different refSeqs but being in the same batch) + // Generate another ID and submit a data store op referencing it. + // This triggers submitIdAllocationOpIfNeeded → takeNextCreationRange. + // Because the unfinalized range was released during replay, both IDs + // are included in the resulting allocation range. + const id2 = compressor.generateCompressedId(); submitDataStoreOp(containerRuntime, "someDS", genTestDataStoreMessage({ id: id2 })); // Let the Outbox flush so we can check submittedOps length await Promise.resolve(); - assert( - submittedOps.length === 3, - "Expected 3 ops to be submitted (2 ID Allocation, 1 data)", - ); + assert.strictEqual(submittedOps.length, 2, "Expected 1 ID Allocation + 1 data op"); + + // Simulate processing the ID Allocation ack — finalize the range. + // Without the call to resetUnfinalizedCreationRange in replayPendingStates, + // this results in a "Ranges finalized out of order" error. + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-member-access + compressor.finalizeCreationRange(submittedOps[0].contents); + + // Both IDs should now be finalized (positive final IDs in op space). + // Without resetUnfinalizedCreationRange, id1 would remain a local-only + // ID (negative) that could never be shared with other clients. + const id1OpSpace = compressor.normalizeToOpSpace(id1); + const id2OpSpace = compressor.normalizeToOpSpace(id2); + assert(id1OpSpace >= 0, "id1 should be finalized after allocation range is sequenced"); + assert(id2OpSpace >= 0, "id2 should be finalized after allocation range is sequenced"); }); }); diff --git a/packages/runtime/id-compressor/api-report/id-compressor.legacy.beta.api.md b/packages/runtime/id-compressor/api-report/id-compressor.legacy.beta.api.md index 25f44bc0f486..582f4f003c53 100644 --- a/packages/runtime/id-compressor/api-report/id-compressor.legacy.beta.api.md +++ b/packages/runtime/id-compressor/api-report/id-compressor.legacy.beta.api.md @@ -48,6 +48,7 @@ export interface IIdCompressor { export interface IIdCompressorCore { beginGhostSession(ghostSessionId: SessionId, ghostSessionCallback: () => void): void; finalizeCreationRange(range: IdCreationRange): void; + resetUnfinalizedCreationRange(): void; serialize(withSession: true): SerializedIdCompressorWithOngoingSession; serialize(withSession: false): SerializedIdCompressorWithNoSession; takeNextCreationRange(): IdCreationRange; diff --git a/packages/runtime/id-compressor/src/idCompressor.ts b/packages/runtime/id-compressor/src/idCompressor.ts index fb6cc6315d88..79196d2bcb8f 100644 --- a/packages/runtime/id-compressor/src/idCompressor.ts +++ b/packages/runtime/id-compressor/src/idCompressor.ts @@ -94,8 +94,12 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore { // #region Final state - // The gen count to be annotated on the range returned by the next call to `takeNextCreationRange`. - // This is updated to be equal to `generatedIdCount` + 1 each time it is called. + /** + * The gen count to be annotated on the range returned by the next call to `takeNextCreationRange`. + * This is advanced to `generatedIdCount` + 1 each time it is called. + * On the other hand, when `resetUnfinalizedCreationRange` is called, + * this is moved back to the start of the unfinalized range, to ensure those IDs are included in the next range. + */ private nextRangeBaseGenCount = 1; private readonly sessions = new Sessions(); private readonly finalSpace = new FinalSpace(); @@ -244,35 +248,23 @@ export class IdCompressor implements IIdCompressor, IIdCompressorCore { } public takeUnfinalizedCreationRange(): IdCreationRange { - const lastLocalCluster = this.localSession.getLastCluster(); - let count: number; - let firstGenCount: number; - if (lastLocalCluster === undefined) { - firstGenCount = 1; - count = this.localGenCount; - } else { - firstGenCount = genCountFromLocalId( - (lastLocalCluster.baseLocalId - lastLocalCluster.count) as LocalCompressedId, - ); - count = this.localGenCount - firstGenCount + 1; - } + this.resetUnfinalizedCreationRange(); + return this.takeNextCreationRange(); + } - if (count === 0) { - return { - sessionId: this.localSessionId, - }; - } + public resetUnfinalizedCreationRange(): void { + assert( + !this.ongoingGhostSession, + "IdCompressor should not be operated normally when in a ghost session", + ); - const range: IdCreationRange = { - ids: { - count, - firstGenCount, - localIdRanges: this.normalizer.getRangesBetween(firstGenCount, this.localGenCount), - requestedClusterSize: this.nextRequestedClusterSize, - }, - sessionId: this.localSessionId, - }; - return this.updateToRange(range); + const lastLocalCluster = this.localSession.getLastCluster(); + this.nextRangeBaseGenCount = + lastLocalCluster === undefined + ? 1 + : genCountFromLocalId( + (lastLocalCluster.baseLocalId - lastLocalCluster.count) as LocalCompressedId, + ); } private updateToRange(range: IdCreationRange): IdCreationRange { diff --git a/packages/runtime/id-compressor/src/test/idCompressor.spec.ts b/packages/runtime/id-compressor/src/test/idCompressor.spec.ts index 392b01e38eab..7ed7afab5328 100644 --- a/packages/runtime/id-compressor/src/test/idCompressor.spec.ts +++ b/packages/runtime/id-compressor/src/test/idCompressor.spec.ts @@ -461,6 +461,95 @@ describe("IdCompressor", () => { ); }); }); + + describe("by reserving unfinalized ranges for the next take", () => { + it("produces equivalent range to takeUnfinalizedCreationRange on next takeNextCreationRange", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + generateCompressedIds(compressor, 1); + compressor.takeNextCreationRange(); + + // Reserve instead of take + compressor.resetUnfinalizedCreationRange(); + + // Next takeNextCreationRange should cover the unfinalized IDs + const range = compressor.takeNextCreationRange(); + assert.deepEqual(range.ids, { + firstGenCount: 1, + count: 1, + localIdRanges: [[1, 1]], + requestedClusterSize: 2, + }); + }); + + it("includes new IDs generated after reserving", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + generateCompressedIds(compressor, 1); + compressor.takeNextCreationRange(); + + compressor.resetUnfinalizedCreationRange(); + generateCompressedIds(compressor, 1); + + const range = compressor.takeNextCreationRange(); + assert.deepEqual(range.ids, { + firstGenCount: 1, + count: 2, + localIdRanges: [[1, 2]], + requestedClusterSize: 2, + }); + }); + + it("is a no-op when there are no unfinalized IDs", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + generateCompressedIds(compressor, 1); + compressor.finalizeCreationRange(compressor.takeNextCreationRange()); + + compressor.resetUnfinalizedCreationRange(); + const range = compressor.takeNextCreationRange(); + assert.equal(range.ids, undefined); + }); + + it("is idempotent", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + generateCompressedIds(compressor, 2); + compressor.takeNextCreationRange(); + + compressor.resetUnfinalizedCreationRange(); + compressor.resetUnfinalizedCreationRange(); + compressor.resetUnfinalizedCreationRange(); + + const range = compressor.takeNextCreationRange(); + assert.deepEqual(range.ids, { + firstGenCount: 1, + count: 2, + localIdRanges: [[1, 2]], + requestedClusterSize: 2, + }); + }); + + it("works with multiple outstanding ranges", () => { + const compressor = CompressorFactory.createCompressor(Client.Client1, 2); + generateCompressedIds(compressor, 1); + const range1 = compressor.takeNextCreationRange(); + generateCompressedIds(compressor, 1); // one local + compressor.finalizeCreationRange(range1); + compressor.takeNextCreationRange(); + generateCompressedIds(compressor, 1); // one eager final + compressor.takeNextCreationRange(); + generateCompressedIds(compressor, 1); // one local + compressor.takeNextCreationRange(); + + compressor.resetUnfinalizedCreationRange(); + const range = compressor.takeNextCreationRange(); + assert.deepEqual(range.ids?.firstGenCount, 2); + assert.deepEqual(range.ids?.count, 3); + assert.deepEqual(range.ids?.localIdRanges, [ + [2, 1], + [4, 1], + ]); + + compressor.finalizeCreationRange(range); + }); + }); }); describe("Finalizing", () => { diff --git a/packages/runtime/id-compressor/src/types/idCompressor.ts b/packages/runtime/id-compressor/src/types/idCompressor.ts index 832e6d38177a..c15b8097dbeb 100644 --- a/packages/runtime/id-compressor/src/types/idCompressor.ts +++ b/packages/runtime/id-compressor/src/types/idCompressor.ts @@ -97,6 +97,22 @@ export interface IIdCompressorCore { */ takeUnfinalizedCreationRange(): IdCreationRange; + /** + * Resets the next creation range to include all unfinalized IDs. + * + * @remarks + * IMPORTANT: This must only be called if it's CERTAIN that the unfinalized range will never be finalized as-is (e.g. by in-flight ops). + * + * After calling this, the next call to {@link IIdCompressorCore.takeNextCreationRange} will produce a range + * covering all unfinalized IDs (equivalent to what {@link IIdCompressorCore.takeUnfinalizedCreationRange} would + * have returned) plus any IDs generated after this call. + * + * Unlike {@link IIdCompressorCore.takeUnfinalizedCreationRange}, this method does not produce or return a range, + * and does not advance the internal range counter. It is useful when the caller wants to + * defer the actual range submission to the next natural {@link IIdCompressorCore.takeNextCreationRange} call. + */ + resetUnfinalizedCreationRange(): void; + /** * Finalizes the supplied range of IDs (which may be from either a remote or local session). * @param range - the range of session-local IDs to finalize. diff --git a/packages/runtime/test-runtime-utils/package.json b/packages/runtime/test-runtime-utils/package.json index 6acf090abf70..973031aaf01f 100644 --- a/packages/runtime/test-runtime-utils/package.json +++ b/packages/runtime/test-runtime-utils/package.json @@ -154,7 +154,11 @@ "typescript": "~5.4.5" }, "typeValidation": { - "broken": {}, + "broken": { + "Class_MockFluidDataStoreContext": { + "forwardCompat": false + } + }, "entrypoint": "legacy" } } diff --git a/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts b/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts index 615d2c59cf9e..5da3539c74f8 100644 --- a/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts +++ b/packages/runtime/test-runtime-utils/src/test/types/validateTestRuntimeUtilsPrevious.generated.ts @@ -168,6 +168,7 @@ declare type current_as_old_for_Class_MockDeltaQueue = requireAssignableTo, TypeOnly> /* diff --git a/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts b/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts index 16ad95a722f9..b4c086cfeb7c 100644 --- a/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts +++ b/packages/test/local-server-tests/src/test/opsOnReconnect.spec.ts @@ -260,6 +260,89 @@ describe("Ops on Reconnect", () => { "Did not receive the ops that were sent in Nack'd state", ); }); + + it("ID Allocations properly finalized across reconnect", async () => { + // Setup with idCompressor enabled + await setupFirstContainer({ enableRuntimeIdCompressor: "on" }); + await setupSecondContainersDataObject(); + + const compressor1 = container1Object1.context.idCompressor; + assert(compressor1 !== undefined, "IdCompressor must be enabled"); + + // Capture any container-closing error so we can surface it as the test failure. + // Without resetUnfinalizedCreationRange, finalizeCreationRange throws + // "Ranges finalized out of order" during op processing, which closes the + // container. ensureSynchronized filters out closed containers and returns + // normally, so we need to re-throw the closing error ourselves. + let containerCloseError: unknown; + container1.on("closed", (error) => { + containerCloseError = error; + }); + + // Round-trip a first ID so the compressor has a finalized cluster. + // This ensures the failure path exercises the contiguity check + // (lastCluster.baseLocalId - lastCluster.count !== rangeBaseLocal) + // rather than the first-cluster check (rangeBaseLocal !== -1). + const id1 = compressor1.generateCompressedId(); + container1Object1Map1.set("first-op", id1); + await loaderContainerTracker.ensureSynchronized(); + + // Generate a second ID and submit a DDS op while connected. + // submit() calls submitIdAllocationOpIfNeeded -> takeNextCreationRange, + // which advances nextRangeBaseGenCount past id2's range and queues the + // IdAllocation + DDS op in the outbox batch managers. Because the flush + // mode is TurnBased, the actual flush is deferred to a microtask. + const id2 = compressor1.generateCompressedId(); + container1Object1Map1.set("pre-disconnect", id2); + + // Disconnect synchronously, before the TurnBased microtask flush fires. + // The IdAllocation and DDS ops are still sitting in the outbox. + assert(container1.clientId); + documentServiceFactory.disconnectClient(container1.clientId, "Disconnected for testing"); + assert.equal(container1.connectionState, ConnectionState.Disconnected); + + // Reconnect. During setConnectionStateCore: + // 1. flush() moves the outbox batches (including the IdAllocation + // created at submit time) into PendingStateManager. + // canSendOps is still false so nothing goes to the server. + // 2. canSendOps becomes true. + // 3. replayPendingStates() replays from PendingStateManager: + // - resetUnfinalizedCreationRange is called, which releases the range for id2 + // back to the compressor to be included in the next creation range. + // - IdAllocation ops are skipped during resubmit (by design). + // - The DDS op is resubmitted, which calls submitIdAllocationOpIfNeeded + // -> takeNextCreationRange. This range includes id2 (even though id2 itself may be unused). + // NOTE: Without resetUnfinalizedCreationRange, nextRangeBaseGenCount is already + // past id2 so count=0 and NO IdAllocation is emitted for the replayed batch. + await waitForContainerConnection(container1); + + // Generate a third ID after reconnect and submit a DDS op. + // takeNextCreationRange produces a range starting at genCount 3. When the server + // sequences this IdAllocation, finalizeCreationRange checks that ranges are + // contiguous. Without resetUnfinalizedCreationRange the genCount-2 range + // was never re-sent, so finalizing the genCount-3 range throws + // "Ranges finalized out of order". + const id3 = compressor1.generateCompressedId(); + container1Object1Map1.set("post-reconnect", id3); + + await loaderContainerTracker.ensureSynchronized(); + + // If the container closed during op processing, surface its error as the + // test failure. This is what makes the test fail with "Ranges finalized + // out of order" when resetUnfinalizedCreationRange is commented out. + if (containerCloseError !== undefined) { + throw containerCloseError as Error; + } + assert(!container1.closed, "Container should not have closed"); + + // Verify all IDs are finalized and usable. + const opSpaceId1 = compressor1.normalizeToOpSpace(id1); + assert(opSpaceId1 >= 0, "First ID should be finalized"); + const opSpaceId2 = compressor1.normalizeToOpSpace(id2); + assert(opSpaceId2 >= 0, "Second ID should be finalized after reconnect"); + const opSpaceId3 = compressor1.normalizeToOpSpace(id3); + assert(opSpaceId3 >= 0, "Third ID should be finalized after reconnect"); + }); }); describe("Ordering of ops that are sent in disconnected state", () => {