From 66a76bdbd2493c2a70bc9656f6b12ca947dc3559 Mon Sep 17 00:00:00 2001 From: George Berezhnoy Date: Thu, 10 Apr 2025 15:12:30 +0200 Subject: [PATCH] ensure operatoins are processed synchrnously --- .../ot-server/src/DocumentManager.spec.ts | 101 +++++++++++++++--- packages/ot-server/src/DocumentManager.ts | 60 +++++++---- packages/ot-server/src/OTServer.ts | 6 +- 3 files changed, 130 insertions(+), 37 deletions(-) diff --git a/packages/ot-server/src/DocumentManager.spec.ts b/packages/ot-server/src/DocumentManager.spec.ts index 4e1b9c3..4e1c0c2 100644 --- a/packages/ot-server/src/DocumentManager.spec.ts +++ b/packages/ot-server/src/DocumentManager.spec.ts @@ -25,10 +25,10 @@ function createOperation( } describe('DocumentManager', () => { - it('should process consequential operations', () => { + it('should process consequential operations', async () => { const manager = new DocumentManager('document'); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0', @@ -48,7 +48,7 @@ describe('DocumentManager', () => { 0 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -57,7 +57,7 @@ describe('DocumentManager', () => { 1 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -66,7 +66,7 @@ describe('DocumentManager', () => { 2 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -95,10 +95,10 @@ describe('DocumentManager', () => { }); }); - it('should process concurrent operations', () => { + it('should process concurrent operations', async () => { const manager = new DocumentManager('document'); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0', @@ -118,7 +118,7 @@ describe('DocumentManager', () => { 0 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -127,7 +127,7 @@ describe('DocumentManager', () => { 1 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -156,10 +156,10 @@ describe('DocumentManager', () => { }); }); - it('should process older operations', () => { + it('should process older operations', async () => { const manager = new DocumentManager('document'); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0', @@ -179,7 +179,7 @@ describe('DocumentManager', () => { 0 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -188,7 +188,7 @@ describe('DocumentManager', () => { 1 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -197,7 +197,7 @@ describe('DocumentManager', () => { 2 ) ); - manager.process( + await manager.process( createOperation( OperationType.Insert, 'doc@0:block@0:data@text:[0,0]', @@ -225,4 +225,77 @@ describe('DocumentManager', () => { properties: {}, }); }); + + it('should correctly process async operations', async () => { + const manager = new DocumentManager('document'); + + void manager.process( + createOperation( + OperationType.Insert, + 'doc@0:block@0', + { + payload: [{ + name: 'paragraph', + data: { + text: { + $t: 't', + value: '', + fragments: [], + }, + }, + }], + }, + 'user', + 0 + ) + ); + void manager.process( + createOperation( + OperationType.Insert, + 'doc@0:block@0:data@text:[0,0]', + { payload: 'A' }, + 'user', + 1 + ) + ); + void manager.process( + createOperation( + OperationType.Insert, + 'doc@0:block@0:data@text:[0,0]', + { payload: 'A' }, + 'user', + 2 + ) + ); + /** + * Waiting for the last operation so expect is executed after it is processed + */ + await manager.process( + createOperation( + OperationType.Insert, + 'doc@0:block@0:data@text:[0,0]', + { payload: 'A' }, + 'user', + 3 + ) + ); + + expect(manager.currentModelState()).toEqual({ + identifier: 'document', + blocks: [ + { + name: 'paragraph', + tunes: {}, + data: { + text: { + $t: 't', + value: 'AAA', + fragments: [], + }, + }, + }, + ], + properties: {}, + }); + }); }); diff --git a/packages/ot-server/src/DocumentManager.ts b/packages/ot-server/src/DocumentManager.ts index 973465b..19f7fdc 100644 --- a/packages/ot-server/src/DocumentManager.ts +++ b/packages/ot-server/src/DocumentManager.ts @@ -20,6 +20,11 @@ export class DocumentManager { */ #model: EditorJSModel; + /** + * Promise resolving with currently processed operation + */ + #operationInProcessing: Promise | null = null; + /** * DocumentManager constructor function * @param identifier - identifier of the document to manage @@ -37,38 +42,53 @@ export class DocumentManager { /** * Process new operation + * - awaits previous operation to finish processing + * - processes the new one + * @param operation - operation from the client to process + */ + public async process(operation: Operation): Promise { + await this.#operationInProcessing; + + return this.#processNextOperation(operation); + } + + /** + * Return serialised current state of the document + */ + public currentModelState(): EditorDocumentSerialized { + return this.#model.serialized; + } + + /** + * Process next operation * - Transform relative to operations in stack if needed * - Puts operation to the operations array * - Updates models state - * @todo ensure the operations are processed consequently - * @param operation - operation from the client to process + * @param operation - operation to process */ - public process(operation: Operation): Operation | null { - if (operation.rev! > this.#currentRev) { - console.error('Operation rejected due to incorrect revision %o', operation); + #processNextOperation(operation: Operation): Promise { + this.#operationInProcessing = new Promise((resolve) => { + if (operation.rev! > this.#currentRev) { + console.error('Operation rejected due to incorrect revision %o', operation); - return null; - } + return resolve(null); + } - const conflictingOps = this.#operations.filter(op => op.rev! >= operation.rev!); - const transformedOp = conflictingOps.reduce((result, op) => result.transform(op), operation); + const conflictingOps = this.#operations.filter(op => op.rev! >= operation.rev!); + const transformedOp = conflictingOps.reduce((result, op) => result.transform(op), operation); - transformedOp.rev = this.#currentRev; + transformedOp.rev = this.#currentRev; - this.#currentRev += 1; + this.#currentRev += 1; - this.#operations.push(transformedOp); + this.#operations.push(transformedOp); - this.#applyOperationToModel(transformedOp); + this.#applyOperationToModel(transformedOp); - return transformedOp; - } + resolve(transformedOp); + }); - /** - * Return serialised current state of the document - */ - public currentModelState(): EditorDocumentSerialized { - return this.#model.serialized; + return this.#operationInProcessing; } /** diff --git a/packages/ot-server/src/OTServer.ts b/packages/ot-server/src/OTServer.ts index d044888..23a7a77 100644 --- a/packages/ot-server/src/OTServer.ts +++ b/packages/ot-server/src/OTServer.ts @@ -61,7 +61,7 @@ export class OTServer { return; case MessageType.Operation: - this.#onOperation(ws, message.payload as SerializedOperation); + void this.#onOperation(ws, message.payload as SerializedOperation); return; } @@ -104,7 +104,7 @@ export class OTServer { * @param ws - client websocket * @param payload - operation payload */ - #onOperation(ws: WebSocket, payload: SerializedOperation): void { + async #onOperation(ws: WebSocket, payload: SerializedOperation): Promise { const operation = Operation.from(payload); const documentId = operation.index.documentId; @@ -121,7 +121,7 @@ export class OTServer { const manager = this.#managers.get(documentId)!; const clients = this.#clients.get(documentId)!; - const processedOperation = manager.process(operation); + const processedOperation = await manager.process(operation); if (processedOperation === null) { ws.close(BAD_REQUEST_CODE, 'Operation couldn\'t be processed');