Skip to content

Commit 7baf59c

Browse files
authored
improvement(client-presence): Consistent event ordering and state results (microsoft#23797)
## Description For consistent data reads, events from value manager updates should be placed in a queue to be processed directly after the incoming signal is processed. Then any data explorations a customer may trigger from event will get current values. This change makes every workspace return a list of "post update actions" that must be executed after every updated is processed from the incoming signal. This makes sure that if a client gets receives event from one update, all other updates within the same datastore update message will be consistently reflected. Tests: ``` Presence events are fired with consistent and final state when states workspace value is updated where - 'latest' update comes before 'latestMap' update in single workspace - 'latestMap' update comes before 'latest' update in single workspace - workspace 1 update comes before workspace 2 update in multiple workspaces - workspace 2 update comes before workspace 1 update in multiple workspaces map item is removed and 'latest' value updated - in a single workspace - in multiple workspaces and map item is updated - with removal first - with update first Notifications update - comes before states workspace update - comes after states workspace update - comes in between states workspaces - within a states workspace ``` Fixes [AB#29542](https://dev.azure.com/fluidframework/235294da-091d-4c29-84fc-cdfc3d90890b/_workitems/edit/29542)
1 parent 49f868e commit 7baf59c

File tree

8 files changed

+730
-48
lines changed

8 files changed

+730
-48
lines changed

packages/framework/presence/src/internalTypes.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,10 @@ export interface ValueManager<
4646
> {
4747
// Most value managers should provide value - implement Required<ValueManager<...>>
4848
readonly value?: TValueState;
49-
update(client: ISessionClient, received: number, value: TValueState): void;
49+
update(client: ISessionClient, received: number, value: TValueState): PostUpdateAction[];
5050
}
51+
52+
/**
53+
* @internal
54+
*/
55+
export type PostUpdateAction = () => void;

packages/framework/presence/src/latestMapValueManager.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import type { IEmitter } from "@fluidframework/core-interfaces/internal";
99

1010
import type { BroadcastControls, BroadcastControlSettings } from "./broadcastControls.js";
1111
import { OptionalBroadcastControl } from "./broadcastControls.js";
12-
import type { ValueManager } from "./internalTypes.js";
12+
import type { PostUpdateAction, ValueManager } from "./internalTypes.js";
1313
import { objectEntries, objectKeys } from "./internalUtils.js";
1414
import type {
1515
LatestValueClientData,
@@ -421,7 +421,7 @@ class LatestMapValueManagerImpl<
421421
client: SpecificSessionClient<SpecificSessionClientId>,
422422
_received: number,
423423
value: InternalTypes.MapValueState<T, string | number>,
424-
): void {
424+
): PostUpdateAction[] {
425425
const allKnownStates = this.datastore.knownValues(this.key);
426426
const clientSessionId: SpecificSessionClientId = client.sessionId;
427427
const currentState = (allKnownStates.states[clientSessionId] ??=
@@ -441,7 +441,7 @@ class LatestMapValueManagerImpl<
441441
}
442442

443443
if (updatedItemKeys.length === 0) {
444-
return;
444+
return [];
445445
}
446446

447447
// Store updates
@@ -452,30 +452,36 @@ class LatestMapValueManagerImpl<
452452
client,
453453
items: new Map<Keys, LatestValueData<T>>(),
454454
};
455+
const postUpdateActions: PostUpdateAction[] = [];
455456
for (const key of updatedItemKeys) {
456457
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
457458
const item = value.items[key]!;
458459
const hadPriorValue = currentState.items[key]?.value;
459460
currentState.items[key] = item;
460461
const metadata = { revision: item.rev, timestamp: item.timestamp };
461462
if (item.value !== undefined) {
462-
this.events.emit("itemUpdated", {
463+
const itemValue = item.value;
464+
const updatedItem = {
463465
client,
464466
key,
465-
value: item.value,
467+
value: itemValue,
466468
metadata,
467-
});
468-
allUpdates.items.set(key, { value: item.value, metadata });
469+
};
470+
postUpdateActions.push(() => this.events.emit("itemUpdated", updatedItem));
471+
allUpdates.items.set(key, { value: itemValue, metadata });
469472
} else if (hadPriorValue !== undefined) {
470-
this.events.emit("itemRemoved", {
471-
client,
472-
key,
473-
metadata,
474-
});
473+
postUpdateActions.push(() =>
474+
this.events.emit("itemRemoved", {
475+
client,
476+
key,
477+
metadata,
478+
}),
479+
);
475480
}
476481
}
477482
this.datastore.update(this.key, clientSessionId, currentState);
478-
this.events.emit("updated", allUpdates);
483+
postUpdateActions.push(() => this.events.emit("updated", allUpdates));
484+
return postUpdateActions;
479485
}
480486
}
481487

packages/framework/presence/src/latestValueManager.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { shallowCloneObject } from "@fluidframework/core-utils/internal";
99

1010
import type { BroadcastControls, BroadcastControlSettings } from "./broadcastControls.js";
1111
import { OptionalBroadcastControl } from "./broadcastControls.js";
12-
import type { ValueManager } from "./internalTypes.js";
12+
import type { PostUpdateAction, ValueManager } from "./internalTypes.js";
1313
import { objectEntries } from "./internalUtils.js";
1414
import type { LatestValueClientData, LatestValueData } from "./latestValueTypes.js";
1515
import type { ISessionClient } from "./presence.js";
@@ -156,19 +156,22 @@ class LatestValueManagerImpl<T, Key extends string>
156156
client: ISessionClient,
157157
_received: number,
158158
value: InternalTypes.ValueRequiredState<T>,
159-
): void {
159+
): PostUpdateAction[] {
160160
const allKnownStates = this.datastore.knownValues(this.key);
161161
const clientSessionId = client.sessionId;
162162
const currentState = allKnownStates.states[clientSessionId];
163163
if (currentState !== undefined && currentState.rev >= value.rev) {
164-
return;
164+
return [];
165165
}
166166
this.datastore.update(this.key, clientSessionId, value);
167-
this.events.emit("updated", {
168-
client,
169-
value: value.value,
170-
metadata: { revision: value.rev, timestamp: value.timestamp },
171-
});
167+
return [
168+
() =>
169+
this.events.emit("updated", {
170+
client,
171+
value: value.value,
172+
metadata: { revision: value.rev, timestamp: value.timestamp },
173+
}),
174+
];
172175
}
173176
}
174177

packages/framework/presence/src/notificationsManager.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import { createEmitter } from "@fluid-internal/client-utils";
77
import type { Listeners, Listenable, Off } from "@fluidframework/core-interfaces";
88

9-
import type { ValueManager } from "./internalTypes.js";
9+
import type { PostUpdateAction, ValueManager } from "./internalTypes.js";
1010
import type { ISessionClient } from "./presence.js";
1111
import { datastoreFromHandle, type StateDatastore } from "./stateDatastore.js";
1212
import { brandIVM } from "./valueManager.js";
@@ -231,23 +231,27 @@ class NotificationsManagerImpl<
231231
client: ISessionClient,
232232
_received: number,
233233
value: InternalTypes.ValueRequiredState<InternalTypes.NotificationType>,
234-
): void {
234+
): PostUpdateAction[] {
235+
const postUpdateActions: PostUpdateAction[] = [];
235236
const eventName = value.value.name as keyof Listeners<NotificationSubscriptions<T>>;
236237
if (this.notificationsInternal.hasListeners(eventName)) {
237238
// Without schema validation, we don't know that the args are the correct type.
238239
// For now we assume the user is sending the correct types and there is no corruption along the way.
239240
const args = [client, ...value.value.args] as Parameters<
240241
NotificationSubscriptions<T>[typeof eventName]
241242
>;
242-
this.notificationsInternal.emit(eventName, ...args);
243+
postUpdateActions.push(() => this.notificationsInternal.emit(eventName, ...args));
243244
} else {
244-
this.events.emit(
245-
"unattendedNotification",
246-
value.value.name,
247-
client,
248-
...value.value.args,
245+
postUpdateActions.push(() =>
246+
this.events.emit(
247+
"unattendedNotification",
248+
value.value.name,
249+
client,
250+
...value.value.args,
251+
),
249252
);
250253
}
254+
return postUpdateActions;
251255
}
252256
}
253257

packages/framework/presence/src/presenceDatastoreManager.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import type { ITelemetryLoggerExt } from "@fluidframework/telemetry-utils/intern
99

1010
import type { ClientConnectionId } from "./baseTypes.js";
1111
import type { BroadcastControlSettings } from "./broadcastControls.js";
12-
import type { IEphemeralRuntime } from "./internalTypes.js";
12+
import type { IEphemeralRuntime, PostUpdateAction } from "./internalTypes.js";
1313
import { objectEntries } from "./internalUtils.js";
1414
import type { ClientSessionId, ISessionClient } from "./presence.js";
1515
import type {
@@ -383,16 +383,18 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
383383
this.refreshBroadcastRequested = false;
384384
}
385385
}
386-
386+
const postUpdateActions: PostUpdateAction[] = [];
387387
for (const [workspaceAddress, remoteDatastore] of Object.entries(message.content.data)) {
388388
// Direct to the appropriate Presence Workspace, if present.
389389
const workspace = this.workspaces.get(workspaceAddress);
390390
if (workspace) {
391-
workspace.internal.processUpdate(
392-
received,
393-
timeModifier,
394-
remoteDatastore,
395-
message.clientId,
391+
postUpdateActions.push(
392+
...workspace.internal.processUpdate(
393+
received,
394+
timeModifier,
395+
remoteDatastore,
396+
message.clientId,
397+
),
396398
);
397399
} else {
398400
// All broadcast state is kept even if not currently registered, unless a value
@@ -409,6 +411,9 @@ export class PresenceDatastoreManagerImpl implements PresenceDatastoreManager {
409411
}
410412
}
411413
}
414+
for (const action of postUpdateActions) {
415+
action();
416+
}
412417
}
413418

414419
/**

packages/framework/presence/src/presenceStates.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import type { ClientConnectionId } from "./baseTypes.js";
99
import type { BroadcastControlSettings } from "./broadcastControls.js";
1010
import { RequiredBroadcastControl } from "./broadcastControls.js";
1111
import type { InternalTypes } from "./exposedInternalTypes.js";
12-
import type { ClientRecord } from "./internalTypes.js";
12+
import type { ClientRecord, PostUpdateAction } from "./internalTypes.js";
1313
import type { RecordEntryTypes } from "./internalUtils.js";
1414
import { getOrCreateRecord, objectEntries } from "./internalUtils.js";
1515
import type { ClientSessionId, ISessionClient } from "./presence.js";
@@ -133,7 +133,7 @@ export interface PresenceStatesInternal {
133133
timeModifier: number,
134134
remoteDatastore: ValueUpdateRecord,
135135
senderConnectionId: ClientConnectionId,
136-
): void;
136+
): PostUpdateAction[];
137137
}
138138

139139
function isValueDirectory<
@@ -408,7 +408,8 @@ class PresenceStatesImpl<TSchema extends PresenceStatesSchema>
408408
received: number,
409409
timeModifier: number,
410410
remoteDatastore: ValueUpdateRecord,
411-
): void {
411+
): PostUpdateAction[] {
412+
const postUpdateActions: PostUpdateAction[] = [];
412413
for (const [key, remoteAllKnownState] of Object.entries(remoteDatastore)) {
413414
const brandedIVM = this.nodes[key];
414415
if (brandedIVM === undefined) {
@@ -418,10 +419,11 @@ class PresenceStatesImpl<TSchema extends PresenceStatesSchema>
418419
const node = unbrandIVM(brandedIVM);
419420
for (const [clientSessionId, value] of objectEntries(remoteAllKnownState)) {
420421
const client = this.runtime.lookupClient(clientSessionId);
421-
node.update(client, received, value);
422+
postUpdateActions.push(...node.update(client, received, value));
422423
}
423424
}
424425
}
426+
return postUpdateActions;
425427
}
426428
}
427429

packages/framework/presence/src/systemWorkspace.ts

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { assert } from "@fluidframework/core-utils/internal";
99

1010
import type { ClientConnectionId } from "./baseTypes.js";
1111
import type { InternalTypes } from "./exposedInternalTypes.js";
12+
import type { PostUpdateAction } from "./internalTypes.js";
1213
import type {
1314
ClientSessionId,
1415
IPresence,
@@ -135,9 +136,9 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
135136
};
136137
},
137138
senderConnectionId: ClientConnectionId,
138-
): void {
139+
): PostUpdateAction[] {
139140
const audienceMembers = this.audience.getMembers();
140-
const joiningAttendees = new Set<SessionClient>();
141+
const postUpdateActions: PostUpdateAction[] = [];
141142
for (const [clientConnectionId, value] of Object.entries(
142143
remoteDatastore.clientToSessionId,
143144
)) {
@@ -153,7 +154,7 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
153154
);
154155
// If the attendee is joining the session, add them to the list of joining attendees to be announced later.
155156
if (isJoining) {
156-
joiningAttendees.add(attendee);
157+
postUpdateActions.push(() => this.events.emit("attendeeJoined", attendee));
157158
}
158159

159160
const knownSessionId: InternalTypes.ValueRequiredState<ClientSessionId> | undefined =
@@ -165,10 +166,7 @@ class SystemWorkspaceImpl implements PresenceStatesInternal, SystemWorkspace {
165166
}
166167
}
167168

168-
// TODO: reorganize processUpdate and caller to process actions after all updates are processed.
169-
for (const announcedAttendee of joiningAttendees) {
170-
this.events.emit("attendeeJoined", announcedAttendee);
171-
}
169+
return postUpdateActions;
172170
}
173171

174172
public onConnectionAdded(clientConnectionId: ClientConnectionId): void {

0 commit comments

Comments
 (0)