Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 32 additions & 17 deletions experimental/PropertyDDS/packages/property-dds/src/propertyTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ import {
MessageType,
ISequencedDocumentMessage,
} from "@fluidframework/driver-definitions/internal";
import { ISummaryTreeWithStats } from "@fluidframework/runtime-definitions/internal";
import {
ISummaryTreeWithStats,
type IRuntimeMessageCollection,
type ISequencedMessageEnvelope,
} from "@fluidframework/runtime-definitions/internal";
import { SummaryTreeBuilder } from "@fluidframework/runtime-utils/internal";
import { SharedObject, IFluidSerializer } from "@fluidframework/shared-object-base/internal";
import axios from "axios";
Expand Down Expand Up @@ -359,29 +363,39 @@ export class SharedPropertyTree extends SharedObject {
this._root._reportDirtinessToView();
}

/**
* Process an operation
*
* @param message - the message to prepare
* @param local - whether the message was sent by the local client
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
* For messages from a remote client, this will be undefined.
*/
protected processCore(
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata: unknown,
) {
): void {
this.processMessage(message, {
contents: message.contents,
localOpMetadata,
clientSequenceNumber: message.clientSequenceNumber,
});
}

/**
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
*/
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
const { envelope, messagesContent } = messagesCollection;
for (const messageContent of messagesContent) {
this.processMessage(envelope, messageContent.contents);
}
}

private processMessage(messageEnvelope: ISequencedMessageEnvelope, contents: unknown) {
if (
message.type === MessageType.Operation &&
message.sequenceNumber > this.skipSequenceNumber
messageEnvelope.type === MessageType.Operation &&
messageEnvelope.sequenceNumber > this.skipSequenceNumber
) {
const change: IPropertyTreeMessage = this.decodeMessage(
cloneDeep(message.contents as IPropertyTreeMessage),
cloneDeep(contents as IPropertyTreeMessage),
);
const content: IRemotePropertyTreeMessage = {
...change,
sequenceNumber: message.sequenceNumber,
sequenceNumber: messageEnvelope.sequenceNumber,
};
switch (content.op) {
case OpKind.ChangeSet:
Expand Down Expand Up @@ -722,12 +736,13 @@ export class SharedPropertyTree extends SharedObject {

// eslint-disable-next-line @typescript-eslint/prefer-for-of
for (let i = 0; i < missingDeltas.length; i++) {
if (missingDeltas[i].sequenceNumber < commitMetadata.sequenceNumber) {
const missingDelta = missingDeltas[i];
if (missingDelta.sequenceNumber < commitMetadata.sequenceNumber) {
// TODO: Don't spy on the DeltaManager's private internals.
// This is trying to mimic what DeltaManager does in processInboundMessage, but there's no guarantee that
// private implementation won't change.
const remoteChange: IPropertyTreeMessage = JSON.parse(
missingDeltas[i].contents as string,
missingDelta.contents as string,
).contents.contents.content.contents;
const { changeSet } = (
await axios.get(
Expand All @@ -746,7 +761,7 @@ export class SharedPropertyTree extends SharedObject {
this.addRemoteChange(remoteChange);
}
} else {
this.processCore(missingDeltas[i], false, {});
this.processMessage(missingDelta, missingDelta.contents);
}
}

Expand Down
49 changes: 42 additions & 7 deletions experimental/dds/ot/ot/src/ot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ import {
IFluidDataStoreRuntime,
IChannelStorageService,
} from "@fluidframework/datastore-definitions/internal";
import { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
import { ISummaryTreeWithStats } from "@fluidframework/runtime-definitions/internal";
import type { ISequencedDocumentMessage } from "@fluidframework/driver-definitions/internal";
import {
ISummaryTreeWithStats,
type IRuntimeMessageCollection,
type IRuntimeMessagesContent,
type ISequencedMessageEnvelope,
} from "@fluidframework/runtime-definitions/internal";
import {
IFluidSerializer,
SharedObject,
Expand Down Expand Up @@ -103,17 +108,47 @@ export abstract class SharedOT<TState, TOp> extends SharedObject {

protected onDisconnect() {}

protected processCore(message: ISequencedDocumentMessage, local: boolean) {
protected processCore(
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata: unknown,
): void {
this.processMessage(
message,
{
contents: message.contents,
localOpMetadata,
clientSequenceNumber: message.clientSequenceNumber,
},
local,
);
}

/**
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
*/
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
const { envelope, local, messagesContent } = messagesCollection;
for (const messageContent of messagesContent) {
this.processMessage(envelope, messageContent, local);
}
}

private processMessage(
messageEnvelope: ISequencedMessageEnvelope,
messagesContent: IRuntimeMessagesContent,
local: boolean,
) {
// Discard any sequenced ops that are now below the minimum sequence number.
const minSeq = this.deltaManager.minimumSequenceNumber;
while (this.sequencedOps[0]?.seq < minSeq) {
this.sequencedOps.shift();
}

let remoteOp = message.contents;
const messageSeq = message.sequenceNumber;
const remoteRefSeq = message.referenceSequenceNumber;
const remoteClient = message.clientId;
let remoteOp = messagesContent.contents;
const messageSeq = messageEnvelope.sequenceNumber;
const remoteRefSeq = messageEnvelope.referenceSequenceNumber;
const remoteClient = messageEnvelope.clientId;

// Adjust the incoming sequenced op to account for prior sequenced ops that the
// sender hadn't yet seen at the time they sent the op.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
mergeEditsFrom(other: SharedTree, edits: Iterable<Edit<InternalizedChange>>, stableIdRemapper?: (id: StableNodeId) => StableNodeId): EditId[];
protected onDisconnect(): void;
protected processCore(message: unknown, local: boolean): void;
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void;
protected registerCore(): void;
// (undocumented)
protected reSubmitCore(op: unknown, localOpMetadata?: StashedLocalOpMetadata): void;
Expand Down
44 changes: 31 additions & 13 deletions experimental/dds/tree/src/SharedTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ import {
IChannelStorageService,
} from '@fluidframework/datastore-definitions/internal';
import { ISequencedDocumentMessage } from '@fluidframework/driver-definitions/internal';
import { ISummaryTreeWithStats, ITelemetryContext } from '@fluidframework/runtime-definitions/internal';
import {
ISummaryTreeWithStats,
ITelemetryContext,
type IRuntimeMessageCollection,
type ISequencedMessageEnvelope,
} from '@fluidframework/runtime-definitions/internal';
import {
IFluidSerializer,
ISharedObjectEvents,
Expand Down Expand Up @@ -998,11 +1003,24 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processCore}
*/
protected processCore(message: unknown, local: boolean): void {
const typedMessage = message as Omit<ISequencedDocumentMessage, 'contents'> & {
contents: SharedTreeOp_0_0_2 | SharedTreeOp;
};
this.cachingLogViewer.setMinimumSequenceNumber(typedMessage.minimumSequenceNumber);
const op = typedMessage.contents;
const typedMessage = message as ISequencedDocumentMessage;
this.processMessage(typedMessage, typedMessage.contents);
}

/**
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
*/
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
const { envelope, messagesContent } = messagesCollection;
for (const messageContent of messagesContent) {
this.processMessage(envelope, messageContent.contents);
}
}

private processMessage(messageEnvelope: ISequencedMessageEnvelope, contents: unknown): void {
const typedContents = contents as SharedTreeOp_0_0_2 | SharedTreeOp;
this.cachingLogViewer.setMinimumSequenceNumber(messageEnvelope.minimumSequenceNumber);
const op = typedContents;
if (op.version === undefined) {
// Back-compat: some legacy documents may contain trailing ops with an unstamped version; normalize them.
(op as { version: WriteFormat | undefined }).version = WriteFormat.v0_0_2;
Expand All @@ -1024,7 +1042,7 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
if (op.version === WriteFormat.v0_1_1) {
this.internStringsFromEdit(edit);
}
this.processSequencedEdit(edit, typedMessage);
this.processSequencedEdit(edit, messageEnvelope);
}
} else if (type === SharedTreeOpType.Update) {
this.processVersionUpdate(op.version);
Expand Down Expand Up @@ -1071,7 +1089,7 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
}
}

private processSequencedEdit(edit: Edit<ChangeInternal>, message: ISequencedDocumentMessage): void {
private processSequencedEdit(edit: Edit<ChangeInternal>, messageEnvelope: ISequencedMessageEnvelope): void {
const { id: editId } = edit;
const wasLocalEdit = this.editLog.isLocalEdit(editId);

Expand All @@ -1086,9 +1104,9 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
}

if (wasLocalEdit) {
this.editLog.addSequencedEdit(edit, message);
this.editLog.addSequencedEdit(edit, messageEnvelope);
} else {
this.applyEditLocally(edit, message);
this.applyEditLocally(edit, messageEnvelope);
}
}

Expand Down Expand Up @@ -1311,10 +1329,10 @@ export class SharedTree extends SharedObject<ISharedTreeEvents> implements NodeI
}
}

private applyEditLocally(edit: Edit<ChangeInternal>, message: ISequencedDocumentMessage | undefined): void {
const isSequenced = message !== undefined;
private applyEditLocally(edit: Edit<ChangeInternal>, messageEnvelope: ISequencedMessageEnvelope | undefined): void {
const isSequenced = messageEnvelope !== undefined;
if (isSequenced) {
this.editLog.addSequencedEdit(edit, message);
this.editLog.addSequencedEdit(edit, messageEnvelope);
} else {
this.editLog.addLocalEdit(edit);
}
Expand Down
53 changes: 37 additions & 16 deletions packages/dds/cell/src/cell.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import { readAndParse } from "@fluidframework/driver-utils/internal";
import type {
ISummaryTreeWithStats,
AttributionKey,
IRuntimeMessageCollection,
ISequencedMessageEnvelope,
IRuntimeMessagesContent,
} from "@fluidframework/runtime-definitions/internal";
import type { IFluidSerializer } from "@fluidframework/shared-object-base/internal";
import {
Expand Down Expand Up @@ -173,10 +176,10 @@ export class SharedCell<T = any>
* Set the Op-based attribution through the SequencedDocumentMessage,
* or set the local/detached attribution.
*/
private setAttribution(message?: ISequencedDocumentMessage): void {
private setAttribution(messageEnvelope?: ISequencedMessageEnvelope): void {
if (this.options?.attribution?.track ?? false) {
this.attribution = message
? { type: "op", seq: message.sequenceNumber }
this.attribution = messageEnvelope
? { type: "op", seq: messageEnvelope.sequenceNumber }
: this.isAttached()
? { type: "local" }
: { type: "detached", id: 0 };
Expand Down Expand Up @@ -242,20 +245,38 @@ export class SharedCell<T = any>
}
}

/**
* Process a cell operation (op).
*
* @param message - The message to prepare.
* @param local - Whether or not the message was sent by the local client.
* @param localOpMetadata - For local client messages, this is the metadata that was submitted with the message.
* For messages from a remote client, this will be `undefined`.
*/
protected processCore(
message: ISequencedDocumentMessage,
local: boolean,
localOpMetadata: unknown,
): void {
const cellOpMetadata = localOpMetadata as ICellLocalOpMetadata;
this.processMessage(
message,
{
contents: message.contents,
localOpMetadata,
clientSequenceNumber: message.clientSequenceNumber,
},
local,
);
}

/**
* {@inheritDoc @fluidframework/shared-object-base#SharedObject.processMessagesCore}
*/
protected processMessagesCore(messagesCollection: IRuntimeMessageCollection): void {
const { envelope, local, messagesContent } = messagesCollection;
for (const messageContent of messagesContent) {
this.processMessage(envelope, messageContent, local);
}
}

private processMessage(
messageEnvelope: ISequencedMessageEnvelope,
messageContent: IRuntimeMessagesContent,
local: boolean,
): void {
const cellOpMetadata = messageContent.localOpMetadata as ICellLocalOpMetadata;
if (this.messageId !== this.messageIdObserved) {
// We are waiting for an ACK on our change to this cell - we will ignore all messages until we get it.
if (local) {
Expand All @@ -273,16 +294,16 @@ export class SharedCell<T = any>
// We got an ACK. Update messageIdObserved.
this.messageIdObserved = cellOpMetadata.pendingMessageId;
// update the attributor
this.setAttribution(message);
this.setAttribution(messageEnvelope);
}
return;
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-enum-comparison
if (message.type === MessageType.Operation && !local) {
const op = message.contents as ICellOperation;
if (messageEnvelope.type === MessageType.Operation && !local) {
const op = messageContent.contents as ICellOperation;
// update the attributor
this.setAttribution(message);
this.setAttribution(messageEnvelope);
this.applyInnerOp(op);
}
}
Expand Down
Loading
Loading