diff --git a/packages/framework/presence/src/test/batching.spec.ts b/packages/framework/presence/src/test/batching.spec.ts index 211482239c63..ee80ed92b01a 100644 --- a/packages/framework/presence/src/test/batching.spec.ts +++ b/packages/framework/presence/src/test/batching.spec.ts @@ -3,6 +3,8 @@ * Licensed under the MIT License. */ +import { strict as assert } from "node:assert"; + import { EventAndErrorTrackingLogger } from "@fluidframework/test-utils/internal"; import { describe, it, after, afterEach, before, beforeEach } from "mocha"; import { useFakeTimers, type SinonFakeTimers } from "sinon"; @@ -23,7 +25,8 @@ describe("Presence", () => { describe("batching", () => { let runtime: MockEphemeralRuntime; let logger: EventAndErrorTrackingLogger; - const initialTime = 1000; + const initialTime = 500; + const testStartTime = 1010; let clock: SinonFakeTimers; let presence: PresenceWithNotifications; @@ -35,8 +38,6 @@ describe("Presence", () => { logger = new EventAndErrorTrackingLogger(); runtime = new MockEphemeralRuntime(logger); - // Note that while the initialTime is set to 1000, the prepareConnectedPresence call advances - // it to 1010 so all tests start at that time. clock.setSystemTime(initialTime); // Set up the presence connection. @@ -47,6 +48,12 @@ describe("Presence", () => { clock, logger, ).presence; + + // Note that while the initialTime was set to 500, the prepareConnectedPresence call advances + // it. Set a consistent start time for all tests. + const deltaToStart = testStartTime - clock.now; + assert(deltaToStart >= 0); + clock.tick(deltaToStart); }); afterEach(() => { @@ -72,7 +79,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -99,7 +110,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -126,7 +141,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -168,7 +187,7 @@ describe("Presence", () => { assertFinalExpectations(runtime, logger); }); - it("sets timer for default allowableUpdateLatency", async () => { + it("sets timer for default allowableUpdateLatencyMs", async () => { runtime.signalsExpected.push([ { type: "Pres:DatastoreUpdate", @@ -180,7 +199,7 @@ describe("Presence", () => { "clientToSessionId": { [connectionId2]: { "rev": 0, - "timestamp": 1000, + "timestamp": initialTime, "value": attendeeId2, }, }, @@ -215,7 +234,7 @@ describe("Presence", () => { clock.tick(100); // Time is now 1110 }); - it("batches signals sent within default allowableUpdateLatency", async () => { + it("batches signals sent within default allowableUpdateLatencyMs", async () => { runtime.signalsExpected.push( [ { @@ -226,7 +245,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -253,7 +276,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -315,7 +342,7 @@ describe("Presence", () => { clock.tick(30); // Time is now 1180 }); - it("batches signals sent within a specified allowableUpdateLatency", async () => { + it("batches signals sent within a specified allowableUpdateLatencyMs", async () => { runtime.signalsExpected.push( [ { @@ -326,7 +353,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -353,7 +384,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -424,7 +459,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -460,7 +499,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -530,7 +573,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -566,7 +613,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -632,7 +683,7 @@ describe("Presence", () => { "clientToSessionId": { [connectionId2]: { "rev": 0, - "timestamp": 1000, + "timestamp": initialTime, "value": attendeeId2, }, }, @@ -713,7 +764,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "n:name:testNotificationWorkspace": { @@ -739,7 +794,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "n:name:testNotificationWorkspace": { @@ -801,7 +860,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "s:name:testStateWorkspace": { @@ -841,7 +904,11 @@ describe("Presence", () => { "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "n:name:testNotificationWorkspace": { diff --git a/packages/framework/presence/src/test/notificationsManager.spec.ts b/packages/framework/presence/src/test/notificationsManager.spec.ts index c2283f1932a9..f29df4532ba7 100644 --- a/packages/framework/presence/src/test/notificationsManager.spec.ts +++ b/packages/framework/presence/src/test/notificationsManager.spec.ts @@ -137,12 +137,16 @@ describe("Presence", () => { { type: "Pres:DatastoreUpdate", content: { - "sendTimestamp": 1020, + "sendTimestamp": clock.now, "avgLatency": 10, "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "n:name:testNotificationWorkspace": { @@ -190,12 +194,16 @@ describe("Presence", () => { { type: "Pres:DatastoreUpdate", content: { - "sendTimestamp": 1020, + "sendTimestamp": clock.now, "avgLatency": 10, "data": { "system:presence": { "clientToSessionId": { - [connectionId2]: { "rev": 0, "timestamp": 1000, "value": attendeeId2 }, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, "n:name:testNotificationWorkspace": { diff --git a/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts b/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts index 9c012ea3655d..dab10e11bcfd 100644 --- a/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts +++ b/packages/framework/presence/src/test/presenceDatastoreManager.spec.ts @@ -15,8 +15,9 @@ import { EventAndErrorTrackingLogger } from "@fluidframework/test-utils/internal import type { SinonFakeTimers } from "sinon"; import { useFakeTimers, spy } from "sinon"; +import type { ClientConnectionId } from "../baseTypes.js"; import { toOpaqueJson } from "../internalUtils.js"; -import type { AttendeeId, PresenceWithNotifications } from "../presence.js"; +import type { PresenceWithNotifications } from "../presence.js"; import { createPresenceManager } from "../presenceManager.js"; import type { InternalWorkspaceAddress, SignalMessages } from "../protocol.js"; import type { SystemWorkspaceDatastore } from "../systemWorkspace.js"; @@ -25,6 +26,7 @@ import { MockEphemeralRuntime } from "./mockEphemeralRuntime.js"; import type { ProcessSignalFunction } from "./testUtils.js"; import { assertFinalExpectations, + broadcastJoinResponseDelaysMs, connectionId2, createSpecificAttendeeId, prepareConnectedPresence, @@ -32,9 +34,19 @@ import { attendeeId2, } from "./testUtils.js"; +const attendee0SystemWorkspaceDatastore = { + "clientToSessionId": { + ["client0"]: { + "rev": 0, + "timestamp": 0, + "value": createSpecificAttendeeId("attendeeId-0"), + }, + }, +} as const satisfies SystemWorkspaceDatastore; + const attendee4SystemWorkspaceDatastore = { "clientToSessionId": { - ["client4" as AttendeeId]: { + ["client4"]: { "rev": 0, "timestamp": 700, "value": createSpecificAttendeeId("attendeeId-4"), @@ -42,6 +54,16 @@ const attendee4SystemWorkspaceDatastore = { }, } as const satisfies SystemWorkspaceDatastore; +const attendee5SystemWorkspaceDatastore = { + "clientToSessionId": { + ["client5"]: { + "rev": 0, + "timestamp": 800, + "value": createSpecificAttendeeId("attendeeId-5"), + }, + }, +} as const satisfies SystemWorkspaceDatastore; + describe("Presence", () => { describe("protocol handling", () => { let runtime: MockEphemeralRuntime; @@ -83,7 +105,7 @@ describe("Presence", () => { prepareConnectedPresence(runtime, "attendeeId-2", "client2", clock, logger); }); - describe("responds to ClientJoin", () => { + describe("handles ClientJoin", () => { let processSignal: ProcessSignalFunction; beforeEach(() => { @@ -99,7 +121,79 @@ describe("Presence", () => { clock.tick(10); }); - it("with broadcast immediately when preferred responder", () => { + function joinClients( + updateProviders: ClientConnectionId[], + delayToJoinClient5: number | undefined, + ): void { + // Join client4 + processSignal( + [], + { + type: "Pres:ClientJoin", + content: { + sendTimestamp: clock.now - 50, + avgLatency: 50, + data: { + "system:presence": attendee4SystemWorkspaceDatastore, + }, + updateProviders, + }, + clientId: "client4", + }, + false, + ); + + if (delayToJoinClient5 !== undefined) { + // Advance clock to delay joining client5 + clock.tick(delayToJoinClient5); + + // Join client5 + processSignal( + [], + { + type: "Pres:ClientJoin", + content: { + sendTimestamp: clock.now - 50, + avgLatency: 50, + data: { + "system:presence": attendee5SystemWorkspaceDatastore, + }, + updateProviders, + }, + clientId: "client5", + }, + false, + ); + } + } + + function processClient0ResponseForClient4(): void { + // Setup + // client 0 handles client 4 join + processSignal( + [], + { + type: "Pres:DatastoreUpdate", + content: { + "avgLatency": 10, + "data": { + "system:presence": { + "clientToSessionId": { + ...attendee0SystemWorkspaceDatastore.clientToSessionId, + ...attendee4SystemWorkspaceDatastore.clientToSessionId, + }, + }, + }, + "isComplete": true, + "sendTimestamp": clock.now - 10, + }, + clientId: "client0", + }, + false, + ); + } + + it("when preferred responder ... with broadcast immediately", () => { // Setup logger.registerExpectedEvent({ eventName: "Presence:JoinResponse", @@ -153,67 +247,164 @@ describe("Presence", () => { assertFinalExpectations(runtime, logger); }); - it("with broadcast after delay when NOT preferred responder", () => { - // #region Part 1 (no response) - // Act - processSignal( - [], - { - type: "Pres:ClientJoin", - content: { - sendTimestamp: clock.now - 20, - avgLatency: 0, - data: { - "system:presence": attendee4SystemWorkspaceDatastore, + describe("when NOT preferred responder", () => { + it("and no other responses ... with broadcast after delay", () => { + // Setup + const responseOrder = 2; + // 3 * named length (client0 and client1) + quorum sequence order (third -> 2) + const responderIndex = 3 * 2 + responseOrder; + const updateTime = + clock.now + + broadcastJoinResponseDelaysMs.namedResponder + + broadcastJoinResponseDelaysMs.backupResponderBase + + broadcastJoinResponseDelaysMs.backupResponderIncrement * responderIndex; + + // #region Part 1 (no response) + // Act + // join client 4 + joinClients(["client0", "client1"], undefined); + + clock.tick(updateTime - clock.now - 1); + // #endregion + + // #region Part 2 (response after delay) + // Setup + logger.registerExpectedEvent({ + eventName: "Presence:JoinResponse", + details: JSON.stringify({ + type: "broadcastAll", + requestor: "client4", + role: "secondary", + order: 2, + }), + }); + runtime.signalsExpected.push([ + { + type: "Pres:DatastoreUpdate", + content: { + "avgLatency": 10, + "data": { + "system:presence": { + "clientToSessionId": { + ...attendee4SystemWorkspaceDatastore.clientToSessionId, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, + }, + }, + }, + "isComplete": true, + "sendTimestamp": clock.now + 1, }, - updateProviders: ["client0", "client1"], }, - clientId: "client4", - }, - false, - ); - // #endregion + ]); - // #region Part 2 (response after delay) - // Setup - logger.registerExpectedEvent({ - eventName: "Presence:JoinResponse", - details: JSON.stringify({ - type: "broadcastAll", - requestor: "client4", - role: "secondary", - order: 2, - }), + // Act + clock.tick(1); + + // Verify + assertFinalExpectations(runtime, logger); + // #endregion }); - runtime.signalsExpected.push([ - { - type: "Pres:DatastoreUpdate", - content: { - "avgLatency": 10, - "data": { - "system:presence": { - "clientToSessionId": { - ...attendee4SystemWorkspaceDatastore.clientToSessionId, - [connectionId2]: { - "rev": 0, - "timestamp": initialTime, - "value": attendeeId2, + + it("and other has fully responded ... without broadcast", () => { + // Setup + // join client 4 + joinClients(["client0", "client1"], undefined); + + clock.tick(broadcastJoinResponseDelaysMs.namedResponder); + + // Act + // client 0 handles client 4 join + processClient0ResponseForClient4(); + + clock.runAll(); + + // Verify + assertFinalExpectations(runtime, logger); + }); + + // There is a hole in the protocol where all complete responses are considered + // to resolve all pending join requests. However a complete response might be + // crafted and sent prior to a recent join request and the recent joiner may + // not have all of the messages to fill in the gap. + it("(expect to fail) and other has partially responded ... with broadcast after delay", () => { + // Setup + const responseOrder = 2; + // 3 * named length (client0 and client1) + quorum sequence order (third -> 2) + const responderIndex = 3 * 2 + responseOrder; + const client4ResponseDelay = + broadcastJoinResponseDelaysMs.namedResponder + + broadcastJoinResponseDelaysMs.backupResponderBase + + broadcastJoinResponseDelaysMs.backupResponderIncrement * responderIndex; + const client4UpdateTime = clock.now + client4ResponseDelay; + const delayToJoinClient5 = client4ResponseDelay / 2; + const client5UpdateTime = client4UpdateTime + delayToJoinClient5; + + // join clients 4 and 5 + joinClients(["client0", "client1"], delayToJoinClient5); + + clock.tick(client4UpdateTime - clock.now - 1); + + // client 0 handles client 4 join + processClient0ResponseForClient4(); + + clock.tick(client5UpdateTime - clock.now - 1); + + logger.registerExpectedEvent({ + eventName: "Presence:JoinResponse", + details: JSON.stringify({ + type: "broadcastAll", + requestor: "client5", + role: "secondary", + order: 2, + }), + }); + runtime.signalsExpected.push([ + { + type: "Pres:DatastoreUpdate", + content: { + "avgLatency": 10, + "data": { + "system:presence": { + "clientToSessionId": { + ...attendee0SystemWorkspaceDatastore.clientToSessionId, + ...attendee4SystemWorkspaceDatastore.clientToSessionId, + ...attendee5SystemWorkspaceDatastore.clientToSessionId, + [connectionId2]: { + "rev": 0, + "timestamp": initialTime, + "value": attendeeId2, + }, }, }, }, + "isComplete": true, + "sendTimestamp": clock.now + 1, }, - "isComplete": true, - "sendTimestamp": clock.now + 180, }, - }, - ]); + ]); - // Act - clock.tick(200); + // Act + clock.tick(1); - // Verify - assertFinalExpectations(runtime, logger); - // #endregion + // Verify + // TODO: Once protocol is fixed, remove assert.throws and signalsExpected check+reset. + assert.throws( + () => { + assertFinalExpectations(runtime, logger); + }, + { + name: "Error", + message: /Expected Events not found/, + }, + ); + assert.equal(runtime.signalsExpected.length, 1); + runtime.signalsExpected.length = 0; + // #endregion + }); }); }); diff --git a/packages/framework/presence/src/test/schemaValidation/protocol.spec.ts b/packages/framework/presence/src/test/schemaValidation/protocol.spec.ts index c51bcb79dd58..6b28b7564ed2 100644 --- a/packages/framework/presence/src/test/schemaValidation/protocol.spec.ts +++ b/packages/framework/presence/src/test/schemaValidation/protocol.spec.ts @@ -3,6 +3,8 @@ * Licensed under the MIT License. */ +import { strict as assert } from "node:assert"; + import { EventAndErrorTrackingLogger } from "@fluidframework/test-utils/internal"; import { describe, it, after, afterEach, before, beforeEach } from "mocha"; import { useFakeTimers, type SinonFakeTimers } from "sinon"; @@ -35,15 +37,6 @@ interface Point3D { z: number; } -const attendeeUpdate = { - "clientToSessionId": { - "client1": { - "rev": 0, - "timestamp": 0, - "value": attendeeId1, - }, - }, -} as const; const latestUpdate = { "latest": { [attendeeId1]: { @@ -77,9 +70,8 @@ const latestMapUpdate = { describe("Presence", () => { describe("Runtime schema validation", () => { const afterCleanUp: (() => void)[] = []; - const initialTime = 1000; - - type UpdateContent = typeof latestUpdate & typeof latestMapUpdate; + const initialTime = 500; + const testStartTime = 1010; let clock: SinonFakeTimers; let logger: EventAndErrorTrackingLogger; @@ -87,24 +79,6 @@ describe("Presence", () => { let processSignal: ProcessSignalFunction; let runtime: MockEphemeralRuntime; - function processUpdates(valueManagerUpdates: Record): void { - const updates = { "system:presence": attendeeUpdate, ...valueManagerUpdates }; - - processSignal( - [], - { - type: "Pres:DatastoreUpdate", - content: { - sendTimestamp: clock.now - 10, - avgLatency: 20, - data: updates, - }, - clientId: "client1", - }, - false, - ); - } - before(async () => { clock = useFakeTimers(); }); @@ -123,14 +97,37 @@ describe("Presence", () => { logger, )); - // Pass a little time (to mimic reality) - clock.tick(10); + // Note that while the initialTime was set to 500, the prepareConnectedPresence call advances + // it. Set a consistent start time for all tests. + const deltaToStart = testStartTime - clock.now; + assert(deltaToStart >= 10); + clock.tick(deltaToStart - 10); // Process remote client update signal (attendeeId-1 is then part of local client's known session). - const workspace = { - "s:name:testWorkspace": { ...latestUpdate, ...latestMapUpdate }, - }; - processUpdates(workspace); + processSignal( + [], + { + type: "Pres:DatastoreUpdate", + content: { + sendTimestamp: deltaToStart - 20, + avgLatency: 20, + data: { + "system:presence": { + "clientToSessionId": { + "client1": { + "rev": 0, + "timestamp": initialTime + 40, + "value": attendeeId1, + }, + }, + }, + "s:name:testWorkspace": { ...latestUpdate, ...latestMapUpdate }, + }, + }, + clientId: "client1", + }, + false, + ); // Pass a little time (to mimic reality) clock.tick(10); @@ -162,7 +159,8 @@ describe("Presence", () => { // Check Join response without active validators const attendeeId4 = createSpecificAttendeeId("attendeeId-4"); const connectionId4 = "client4"; - const newAttendeeSignal = generateBasicClientJoin(clock.now - 50, { + const client4JoinTime = clock.now - 50; + const newAttendeeSignal = generateBasicClientJoin(client4JoinTime, { averageLatency: 50, attendeeId: attendeeId4, clientConnectionId: connectionId4, @@ -185,7 +183,7 @@ describe("Presence", () => { }, [connectionId1]: { "rev": 0, - "timestamp": 0, + "timestamp": initialTime + 40, "value": attendeeId1, }, }, @@ -194,7 +192,7 @@ describe("Presence", () => { "latest": { [attendeeId1]: { "rev": 1, - "timestamp": -20, + "timestamp": initialTime + 10, "value": toOpaqueJson({ x: 1, y: 1, z: 1 }), }, }, @@ -204,12 +202,12 @@ describe("Presence", () => { "items": { "key1": { "rev": 1, - "timestamp": -20, + "timestamp": initialTime + 10, "value": toOpaqueJson({ a: 1, b: 1 }), }, "key2": { "rev": 1, - "timestamp": -20, + "timestamp": initialTime + 10, "value": toOpaqueJson({ b: 1, d: 1 }), }, }, @@ -258,7 +256,7 @@ describe("Presence", () => { // robust data and may change. [connectionId4]: { "rev": 0, - "timestamp": initialTime - 20, + "timestamp": client4JoinTime, "value": attendeeId4, }, }, diff --git a/packages/framework/presence/src/test/schemaValidation/valueManagers.spec.ts b/packages/framework/presence/src/test/schemaValidation/valueManagers.spec.ts index d9fd90148557..acf6910ba413 100644 --- a/packages/framework/presence/src/test/schemaValidation/valueManagers.spec.ts +++ b/packages/framework/presence/src/test/schemaValidation/valueManagers.spec.ts @@ -565,14 +565,14 @@ describe("Presence", () => { { "type": "Pres:DatastoreUpdate", "content": { - "sendTimestamp": 1030, + "sendTimestamp": clock.now, "avgLatency": 10, "data": { "system:presence": { "clientToSessionId": { [connectionId2]: { "rev": 0, - "timestamp": 1000, + "timestamp": initialTime, "value": attendeeId2, }, }, @@ -584,12 +584,12 @@ describe("Presence", () => { "items": { "key1": { "rev": 0, - "timestamp": 1030, + "timestamp": clock.now, "value": toOpaqueJson({ "x": 0, "y": 0, "z": 0 }), }, "key2": { "rev": 0, - "timestamp": 1030, + "timestamp": clock.now, "value": toOpaqueJson({ "x": 0, "y": 0, "z": 0 }), }, }, @@ -650,14 +650,14 @@ describe("Presence", () => { { "type": "Pres:DatastoreUpdate", "content": { - "sendTimestamp": 1030, + "sendTimestamp": clock.now, "avgLatency": 10, "data": { "system:presence": { "clientToSessionId": { [connectionId2]: { "rev": 0, - "timestamp": 1000, + "timestamp": initialTime, "value": attendeeId2, }, }, @@ -669,7 +669,7 @@ describe("Presence", () => { "items": { "key1": { "rev": 1, - "timestamp": 1030, + "timestamp": clock.now, "value": toOpaqueJson({ "x": 0, "y": 1, "z": 2 }), }, }, diff --git a/packages/framework/presence/src/test/testUtils.ts b/packages/framework/presence/src/test/testUtils.ts index aed6d1228a66..3e8340795840 100644 --- a/packages/framework/presence/src/test/testUtils.ts +++ b/packages/framework/presence/src/test/testUtils.ts @@ -16,6 +16,7 @@ import type { SinonFakeTimers } from "sinon"; import { createPresenceManager } from "../presenceManager.js"; import type { InboundClientJoinMessage, + InboundDatastoreUpdateMessage, OutboundClientJoinMessage, SignalMessages, } from "../protocol.js"; @@ -121,6 +122,26 @@ export function generateBasicClientJoin( }; } +/** + * Expected delays for broadcasting join responses + */ +export const broadcastJoinResponseDelaysMs = { + /** + * The delay in milliseconds before a join response is sent to any client. + */ + namedResponder: 0, + /** + * The delay in milliseconds all backup responders wait before sending + * a join response to allow others to respond first. + */ + backupResponderBase: 20, + /** + * The additional delay in milliseconds a backup responder waits per + * ordering before sending a join response. + */ + backupResponderIncrement: 20, +} as const; + /** * Function signature for sending a signal to the presence manager. */ @@ -155,18 +176,21 @@ export function prepareConnectedPresence( const quorumClientIds = [...runtime.quorum.getMembers().keys()].filter( (quorumClientId) => quorumClientId !== clientConnectionId, ); - if (quorumClientIds.length > 3) { - quorumClientIds.length = 3; + const updateProviders = quorumClientIds; + if (updateProviders.length > 3) { + updateProviders.length = 3; } const expectedClientJoin: OutboundClientJoinMessage & Partial> = generateBasicClientJoin(clock.now, { attendeeId, clientConnectionId, - updateProviders: quorumClientIds, + updateProviders, }); delete expectedClientJoin.clientId; - runtime.signalsExpected.push([expectedClientJoin]); + if (updateProviders.length > 0) { + runtime.signalsExpected.push([expectedClientJoin]); + } const presence = createPresenceManager(runtime, attendeeId as AttendeeId); @@ -200,8 +224,30 @@ export function prepareConnectedPresence( // Pass a little time (to mimic reality) clock.tick(10); - // Return the join signal - processSignal([], { ...expectedClientJoin, clientId: clientConnectionId }, true); + if (updateProviders.length > 0) { + // Return the join signal + processSignal([], { ...expectedClientJoin, clientId: clientConnectionId }, true); + + // Pass time (to mimic likely response) + clock.tick(broadcastJoinResponseDelaysMs.namedResponder + 20); + + // Send a fake join response + // There are no other attendees in the session (not realistic) but this + // convinces the presence manager that it now has full knowledge, which + // enables it to respond to other's join requests accurately. + processSignal( + [], + { + type: "Pres:DatastoreUpdate", + content: { + ...expectedClientJoin.content, + isComplete: true, + }, + clientId: updateProviders[0], + } satisfies InboundDatastoreUpdateMessage, + false, + ); + } return { presence, diff --git a/packages/runtime/test-runtime-utils/src/mocks.ts b/packages/runtime/test-runtime-utils/src/mocks.ts index aa9a068b08b9..3ffd2a0d158b 100644 --- a/packages/runtime/test-runtime-utils/src/mocks.ts +++ b/packages/runtime/test-runtime-utils/src/mocks.ts @@ -708,7 +708,10 @@ export class MockQuorumClients implements IQuorumClients, EventEmitter { } getMembers(): Map { - return this.members; + // Implementation always generates a new Map. + // Mock should as well in case any callers rely on being able to modify + // the returned Map. + return new Map(this.members); } getMember(clientId: string): ISequencedClient | undefined { return this.getMembers().get(clientId);