Skip to content

Commit 15415d3

Browse files
(op bunching) Add processMessagesCore to SharedObject implementations (#25745)
`processMessagesCore` was added to `SharedObject` in #23836. This change adds `processMessagesCore` to implementations of `SharedObject`. This is a precursor to #25176 where `processCore` will be removed from `SharedObject`. Some of the boiler plate added here will be cleaned up with that change.
1 parent 0168edd commit 15415d3

File tree

26 files changed

+833
-259
lines changed

26 files changed

+833
-259
lines changed

experimental/PropertyDDS/packages/property-dds/src/propertyTree.ts

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import {
2525
MessageType,
2626
ISequencedDocumentMessage,
2727
} from "@fluidframework/driver-definitions/internal";
28-
import { ISummaryTreeWithStats } from "@fluidframework/runtime-definitions/internal";
28+
import {
29+
ISummaryTreeWithStats,
30+
type IRuntimeMessageCollection,
31+
type ISequencedMessageEnvelope,
32+
} from "@fluidframework/runtime-definitions/internal";
2933
import { SummaryTreeBuilder } from "@fluidframework/runtime-utils/internal";
3034
import { SharedObject, IFluidSerializer } from "@fluidframework/shared-object-base/internal";
3135
import axios from "axios";
@@ -359,29 +363,39 @@ export class SharedPropertyTree extends SharedObject {
359363
this._root._reportDirtinessToView();
360364
}
361365

362-
/**
363-
* Process an operation
364-
*
365-
* @param message - the message to prepare
366-
* @param local - whether the message was sent by the local client
367-
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
368-
* For messages from a remote client, this will be undefined.
369-
*/
370366
protected processCore(
371367
message: ISequencedDocumentMessage,
372368
local: boolean,
373369
localOpMetadata: unknown,
374-
) {
370+
): void {
371+
this.processMessage(message, {
372+
contents: message.contents,
373+
localOpMetadata,
374+
clientSequenceNumber: message.clientSequenceNumber,
375+
});
376+
}
377+
378+
/**
379+
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
380+
*/
381+
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
382+
const { envelope, messagesContent } = messagesCollection;
383+
for (const messageContent of messagesContent) {
384+
this.processMessage(envelope, messageContent.contents);
385+
}
386+
}
387+
388+
private processMessage(messageEnvelope: ISequencedMessageEnvelope, contents: unknown) {
375389
if (
376-
message.type === MessageType.Operation &&
377-
message.sequenceNumber > this.skipSequenceNumber
390+
messageEnvelope.type === MessageType.Operation &&
391+
messageEnvelope.sequenceNumber > this.skipSequenceNumber
378392
) {
379393
const change: IPropertyTreeMessage = this.decodeMessage(
380-
cloneDeep(message.contents as IPropertyTreeMessage),
394+
cloneDeep(contents as IPropertyTreeMessage),
381395
);
382396
const content: IRemotePropertyTreeMessage = {
383397
...change,
384-
sequenceNumber: message.sequenceNumber,
398+
sequenceNumber: messageEnvelope.sequenceNumber,
385399
};
386400
switch (content.op) {
387401
case OpKind.ChangeSet:
@@ -722,12 +736,13 @@ export class SharedPropertyTree extends SharedObject {
722736

723737
// eslint-disable-next-line @typescript-eslint/prefer-for-of
724738
for (let i = 0; i < missingDeltas.length; i++) {
725-
if (missingDeltas[i].sequenceNumber < commitMetadata.sequenceNumber) {
739+
const missingDelta = missingDeltas[i];
740+
if (missingDelta.sequenceNumber < commitMetadata.sequenceNumber) {
726741
// TODO: Don't spy on the DeltaManager's private internals.
727742
// This is trying to mimic what DeltaManager does in processInboundMessage, but there's no guarantee that
728743
// private implementation won't change.
729744
const remoteChange: IPropertyTreeMessage = JSON.parse(
730-
missingDeltas[i].contents as string,
745+
missingDelta.contents as string,
731746
).contents.contents.content.contents;
732747
const { changeSet } = (
733748
await axios.get(
@@ -746,7 +761,7 @@ export class SharedPropertyTree extends SharedObject {
746761
this.addRemoteChange(remoteChange);
747762
}
748763
} else {
749-
this.processCore(missingDeltas[i], false, {});
764+
this.processMessage(missingDelta, missingDelta.contents);
750765
}
751766
}
752767

experimental/dds/ot/ot/src/ot.ts

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,13 @@ import {
1010
IFluidDataStoreRuntime,
1111
IChannelStorageService,
1212
} from "@fluidframework/datastore-definitions/internal";
13-
import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
14-
import { ISummaryTreeWithStats } from "@fluidframework/runtime-definitions/internal";
13+
import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
14+
import {
15+
ISummaryTreeWithStats,
16+
type IRuntimeMessageCollection,
17+
type IRuntimeMessagesContent,
18+
type ISequencedMessageEnvelope,
19+
} from "@fluidframework/runtime-definitions/internal";
1520
import {
1621
IFluidSerializer,
1722
SharedObject,
@@ -103,17 +108,47 @@ export abstract class SharedOT<TState, TOp> extends SharedObject {
103108

104109
protected onDisconnect() {}
105110

106-
protected processCore(message: ISequencedDocumentMessage, local: boolean) {
111+
protected processCore(
112+
message: ISequencedDocumentMessage,
113+
local: boolean,
114+
localOpMetadata: unknown,
115+
): void {
116+
this.processMessage(
117+
message,
118+
{
119+
contents: message.contents,
120+
localOpMetadata,
121+
clientSequenceNumber: message.clientSequenceNumber,
122+
},
123+
local,
124+
);
125+
}
126+
127+
/**
128+
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
129+
*/
130+
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
131+
const { envelope, local, messagesContent } = messagesCollection;
132+
for (const messageContent of messagesContent) {
133+
this.processMessage(envelope, messageContent, local);
134+
}
135+
}
136+
137+
private processMessage(
138+
messageEnvelope: ISequencedMessageEnvelope,
139+
messagesContent: IRuntimeMessagesContent,
140+
local: boolean,
141+
) {
107142
// Discard any sequenced ops that are now below the minimum sequence number.
108143
const minSeq = this.deltaManager.minimumSequenceNumber;
109144
while (this.sequencedOps[0]?.seq < minSeq) {
110145
this.sequencedOps.shift();
111146
}
112147

113-
let remoteOp = message.contents;
114-
const messageSeq = message.sequenceNumber;
115-
const remoteRefSeq = message.referenceSequenceNumber;
116-
const remoteClient = message.clientId;
148+
let remoteOp = messagesContent.contents;
149+
const messageSeq = messageEnvelope.sequenceNumber;
150+
const remoteRefSeq = messageEnvelope.referenceSequenceNumber;
151+
const remoteClient = messageEnvelope.clientId;
117152

118153
// Adjust the incoming sequenced op to account for prior sequenced ops that the
119154
// sender hadn't yet seen at the time they sent the op.

experimental/dds/tree/api-report/experimental-tree.alpha.api.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,6 +669,7 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
669669
mergeEditsFrom(other: SharedTree, edits: Iterable<Edit<InternalizedChange>>, stableIdRemapper?: (id: StableNodeId) => StableNodeId): EditId[];
670670
protected onDisconnect(): void;
671671
protected processCore(message: unknown, local: boolean): void;
672+
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void;
672673
protected registerCore(): void;
673674
// (undocumented)
674675
protected reSubmitCore(op: unknown, localOpMetadata?: StashedLocalOpMetadata): void;

experimental/dds/tree/src/SharedTree.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ import {
1515
IChannelStorageService,
1616
} from '@fluidframework/datastore-definitions/internal';
1717
import { ISequencedDocumentMessage } from '@fluidframework/driver-definitions/internal';
18-
import { ISummaryTreeWithStats, ITelemetryContext } from '@fluidframework/runtime-definitions/internal';
18+
import {
19+
ISummaryTreeWithStats,
20+
ITelemetryContext,
21+
type IRuntimeMessageCollection,
22+
type ISequencedMessageEnvelope,
23+
} from '@fluidframework/runtime-definitions/internal';
1924
import {
2025
IFluidSerializer,
2126
ISharedObjectEvents,
@@ -998,11 +1003,24 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
9981003
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processCore}
9991004
*/
10001005
protected processCore(message: unknown, local: boolean): void {
1001-
const typedMessage = message as Omit<ISequencedDocumentMessage, 'contents'> & {
1002-
contents: SharedTreeOp_0_0_2 | SharedTreeOp;
1003-
};
1004-
this.cachingLogViewer.setMinimumSequenceNumber(typedMessage.minimumSequenceNumber);
1005-
const op = typedMessage.contents;
1006+
const typedMessage = message as ISequencedDocumentMessage;
1007+
this.processMessage(typedMessage, typedMessage.contents);
1008+
}
1009+
1010+
/**
1011+
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
1012+
*/
1013+
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
1014+
const { envelope, messagesContent } = messagesCollection;
1015+
for (const messageContent of messagesContent) {
1016+
this.processMessage(envelope, messageContent.contents);
1017+
}
1018+
}
1019+
1020+
private processMessage(messageEnvelope: ISequencedMessageEnvelope, contents: unknown): void {
1021+
const typedContents = contents as SharedTreeOp_0_0_2 | SharedTreeOp;
1022+
this.cachingLogViewer.setMinimumSequenceNumber(messageEnvelope.minimumSequenceNumber);
1023+
const op = typedContents;
10061024
if (op.version === undefined) {
10071025
// Back-compat: some legacy documents may contain trailing ops with an unstamped version; normalize them.
10081026
(op as { version: WriteFormat | undefined }).version = WriteFormat.v0_0_2;
@@ -1024,7 +1042,7 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
10241042
if (op.version === WriteFormat.v0_1_1) {
10251043
this.internStringsFromEdit(edit);
10261044
}
1027-
this.processSequencedEdit(edit, typedMessage);
1045+
this.processSequencedEdit(edit, messageEnvelope);
10281046
}
10291047
} else if (type === SharedTreeOpType.Update) {
10301048
this.processVersionUpdate(op.version);
@@ -1071,7 +1089,7 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
10711089
}
10721090
}
10731091

1074-
private processSequencedEdit(edit: Edit<ChangeInternal>, message: ISequencedDocumentMessage): void {
1092+
private processSequencedEdit(edit: Edit<ChangeInternal>, messageEnvelope: ISequencedMessageEnvelope): void {
10751093
const { id: editId } = edit;
10761094
const wasLocalEdit = this.editLog.isLocalEdit(editId);
10771095

@@ -1086,9 +1104,9 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
10861104
}
10871105

10881106
if (wasLocalEdit) {
1089-
this.editLog.addSequencedEdit(edit, message);
1107+
this.editLog.addSequencedEdit(edit, messageEnvelope);
10901108
} else {
1091-
this.applyEditLocally(edit, message);
1109+
this.applyEditLocally(edit, messageEnvelope);
10921110
}
10931111
}
10941112

@@ -1311,10 +1329,10 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
13111329
}
13121330
}
13131331

1314-
private applyEditLocally(edit: Edit<ChangeInternal>, message: ISequencedDocumentMessage | undefined): void {
1315-
const isSequenced = message !== undefined;
1332+
private applyEditLocally(edit: Edit<ChangeInternal>, messageEnvelope: ISequencedMessageEnvelope | undefined): void {
1333+
const isSequenced = messageEnvelope !== undefined;
13161334
if (isSequenced) {
1317-
this.editLog.addSequencedEdit(edit, message);
1335+
this.editLog.addSequencedEdit(edit, messageEnvelope);
13181336
} else {
13191337
this.editLog.addLocalEdit(edit);
13201338
}

packages/dds/cell/src/cell.ts

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ import { readAndParse } from "@fluidframework/driver-utils/internal";
1818
import type {
1919
ISummaryTreeWithStats,
2020
AttributionKey,
21+
IRuntimeMessageCollection,
22+
ISequencedMessageEnvelope,
23+
IRuntimeMessagesContent,
2124
} from "@fluidframework/runtime-definitions/internal";
2225
import type { IFluidSerializer } from "@fluidframework/shared-object-base/internal";
2326
import {
@@ -173,10 +176,10 @@ export class SharedCell<T = any>
173176
* Set the Op-based attribution through the SequencedDocumentMessage,
174177
* or set the local/detached attribution.
175178
*/
176-
private setAttribution(message?: ISequencedDocumentMessage): void {
179+
private setAttribution(messageEnvelope?: ISequencedMessageEnvelope): void {
177180
if (this.options?.attribution?.track ?? false) {
178-
this.attribution = message
179-
? { type: "op", seq: message.sequenceNumber }
181+
this.attribution = messageEnvelope
182+
? { type: "op", seq: messageEnvelope.sequenceNumber }
180183
: this.isAttached()
181184
? { type: "local" }
182185
: { type: "detached", id: 0 };
@@ -242,20 +245,38 @@ export class SharedCell<T = any>
242245
}
243246
}
244247

245-
/**
246-
* Process a cell operation (op).
247-
*
248-
* @param message - The message to prepare.
249-
* @param local - Whether or not the message was sent by the local client.
250-
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
251-
* For messages from a remote client, this will be `undefined`.
252-
*/
253248
protected processCore(
254249
message: ISequencedDocumentMessage,
255250
local: boolean,
256251
localOpMetadata: unknown,
257252
): void {
258-
const cellOpMetadata = localOpMetadata as ICellLocalOpMetadata;
253+
this.processMessage(
254+
message,
255+
{
256+
contents: message.contents,
257+
localOpMetadata,
258+
clientSequenceNumber: message.clientSequenceNumber,
259+
},
260+
local,
261+
);
262+
}
263+
264+
/**
265+
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
266+
*/
267+
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
268+
const { envelope, local, messagesContent } = messagesCollection;
269+
for (const messageContent of messagesContent) {
270+
this.processMessage(envelope, messageContent, local);
271+
}
272+
}
273+
274+
private processMessage(
275+
messageEnvelope: ISequencedMessageEnvelope,
276+
messageContent: IRuntimeMessagesContent,
277+
local: boolean,
278+
): void {
279+
const cellOpMetadata = messageContent.localOpMetadata as ICellLocalOpMetadata;
259280
if (this.messageId !== this.messageIdObserved) {
260281
// We are waiting for an ACK on our change to this cell - we will ignore all messages until we get it.
261282
if (local) {
@@ -273,16 +294,16 @@ export class SharedCell<T = any>
273294
// We got an ACK. Update messageIdObserved.
274295
this.messageIdObserved = cellOpMetadata.pendingMessageId;
275296
// update the attributor
276-
this.setAttribution(message);
297+
this.setAttribution(messageEnvelope);
277298
}
278299
return;
279300
}
280301

281302
// eslint-disable-next-line @typescript-eslint/no-unsafe-enum-comparison
282-
if (message.type === MessageType.Operation && !local) {
283-
const op = message.contents as ICellOperation;
303+
if (messageEnvelope.type === MessageType.Operation && !local) {
304+
const op = messageContent.contents as ICellOperation;
284305
// update the attributor
285-
this.setAttribution(message);
306+
this.setAttribution(messageEnvelope);
286307
this.applyInnerOp(op);
287308
}
288309
}

0 commit comments

Comments
 (0)