Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 87 additions & 14 deletions packages/ot-server/src/DocumentManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ function createOperation<T extends OperationType>(
}

describe('DocumentManager', () => {
it('should process consequential operations', () => {
it('should process consequential operations', async () => {
const manager = new DocumentManager('document');

manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0',
Expand All @@ -48,7 +48,7 @@ describe('DocumentManager', () => {
0
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand All @@ -57,7 +57,7 @@ describe('DocumentManager', () => {
1
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand All @@ -66,7 +66,7 @@ describe('DocumentManager', () => {
2
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand Down Expand Up @@ -95,10 +95,10 @@ describe('DocumentManager', () => {
});
});

it('should process concurrent operations', () => {
it('should process concurrent operations', async () => {
const manager = new DocumentManager('document');

manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0',
Expand All @@ -118,7 +118,7 @@ describe('DocumentManager', () => {
0
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand All @@ -127,7 +127,7 @@ describe('DocumentManager', () => {
1
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand Down Expand Up @@ -156,10 +156,10 @@ describe('DocumentManager', () => {
});
});

it('should process older operations', () => {
it('should process older operations', async () => {
const manager = new DocumentManager('document');

manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0',
Expand All @@ -179,7 +179,7 @@ describe('DocumentManager', () => {
0
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand All @@ -188,7 +188,7 @@ describe('DocumentManager', () => {
1
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand All @@ -197,7 +197,7 @@ describe('DocumentManager', () => {
2
)
);
manager.process(
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
Expand Down Expand Up @@ -225,4 +225,77 @@ describe('DocumentManager', () => {
properties: {},
});
});

it('should correctly process async operations', async () => {
const manager = new DocumentManager('document');

void manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0',
{
payload: [{
name: 'paragraph',
data: {
text: {
$t: 't',
value: '',
fragments: [],
},
},
}],
},
'user',
0
)
);
void manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
{ payload: 'A' },
'user',
1
)
);
void manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
{ payload: 'A' },
'user',
2
)
);
/**
* Waiting for the last operation so expect is executed after it is processed
*/
await manager.process(
createOperation(
OperationType.Insert,
'doc@0:block@0:data@text:[0,0]',
{ payload: 'A' },
'user',
3
)
);

expect(manager.currentModelState()).toEqual({
identifier: 'document',
blocks: [
{
name: 'paragraph',
tunes: {},
data: {
text: {
$t: 't',
value: 'AAA',
fragments: [],
},
},
},
],
properties: {},
});
});
});
60 changes: 40 additions & 20 deletions packages/ot-server/src/DocumentManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
*/
#model: EditorJSModel;

/**
* Promise resolving with currently processed operation
*/
#operationInProcessing: Promise<Operation | null> | null = null;

/**
* DocumentManager constructor function
* @param identifier - identifier of the document to manage
Expand All @@ -37,38 +42,53 @@

/**
* Process new operation
* - awaits previous operation to finish processing
* - processes the new one
* @param operation - operation from the client to process
*/
public async process(operation: Operation): Promise<Operation | null> {
await this.#operationInProcessing;

return this.#processNextOperation(operation);
}

/**
* Return serialised current state of the document
*/
public currentModelState(): EditorDocumentSerialized {
return this.#model.serialized;
}

/**
* Process next operation
* - Transform relative to operations in stack if needed
* - Puts operation to the operations array
* - Updates models state
* @todo ensure the operations are processed consequently
* @param operation - operation from the client to process
* @param operation - operation to process
*/
public process(operation: Operation): Operation | null {
if (operation.rev! > this.#currentRev) {
console.error('Operation rejected due to incorrect revision %o', operation);
#processNextOperation(operation: Operation): Promise<Operation | null> {
this.#operationInProcessing = new Promise((resolve) => {
if (operation.rev! > this.#currentRev) {
console.error('Operation rejected due to incorrect revision %o', operation);

Check warning on line 72 in packages/ot-server/src/DocumentManager.ts

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement

return null;
}
return resolve(null);

Check warning on line 74 in packages/ot-server/src/DocumentManager.ts

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
}

Check warning on line 75 in packages/ot-server/src/DocumentManager.ts

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🌿 Branch is not covered

Warning! Not covered branch

const conflictingOps = this.#operations.filter(op => op.rev! >= operation.rev!);
const transformedOp = conflictingOps.reduce((result, op) => result.transform(op), operation);
const conflictingOps = this.#operations.filter(op => op.rev! >= operation.rev!);
const transformedOp = conflictingOps.reduce((result, op) => result.transform(op), operation);

transformedOp.rev = this.#currentRev;
transformedOp.rev = this.#currentRev;

this.#currentRev += 1;
this.#currentRev += 1;

this.#operations.push(transformedOp);
this.#operations.push(transformedOp);

this.#applyOperationToModel(transformedOp);
this.#applyOperationToModel(transformedOp);

return transformedOp;
}
resolve(transformedOp);
});

/**
* Return serialised current state of the document
*/
public currentModelState(): EditorDocumentSerialized {
return this.#model.serialized;
return this.#operationInProcessing;
}

/**
Expand Down
6 changes: 3 additions & 3 deletions packages/ot-server/src/OTServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class OTServer {

return;
case MessageType.Operation:
this.#onOperation(ws, message.payload as SerializedOperation);
void this.#onOperation(ws, message.payload as SerializedOperation);

return;
}
Expand Down Expand Up @@ -104,7 +104,7 @@ export class OTServer {
* @param ws - client websocket
* @param payload - operation payload
*/
#onOperation(ws: WebSocket, payload: SerializedOperation): void {
async #onOperation(ws: WebSocket, payload: SerializedOperation): Promise<void> {
const operation = Operation.from(payload);
const documentId = operation.index.documentId;

Expand All @@ -121,7 +121,7 @@ export class OTServer {
const manager = this.#managers.get(documentId)!;
const clients = this.#clients.get(documentId)!;

const processedOperation = manager.process(operation);
const processedOperation = await manager.process(operation);

if (processedOperation === null) {
ws.close(BAD_REQUEST_CODE, 'Operation couldn\'t be processed');
Expand Down