diff --git a/.editorconfig b/.editorconfig index b5fe0785..f0c0ee43 100644 --- a/.editorconfig +++ b/.editorconfig @@ -8,6 +8,9 @@ charset = utf-8 insert_final_newline = true trim_trailing_whitespace = true +[*.xml] +indent_size = 2 + [*.yml] indent_size = 2 diff --git a/.vscode/launch.json b/.vscode/launch.json index f30c2f46..6db8a5eb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -4,6 +4,13 @@ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 "version": "0.2.0", "configurations": [ + { + "type": "node", + "request": "attach", + "name": "Attach to Service Process", + "port": "${input:attachPort}", + + }, { "type": "node", "request": "launch", @@ -85,5 +92,13 @@ "sourceMaps": true, "outFiles": [] } + ], + "inputs": [ + { + "id": "attachPort", + "type": "promptString", + "description": "What port should the application use for debugging?", + "default": "23698" + } ] } diff --git a/packages/open-collaboration-service-process/Readme.md b/packages/open-collaboration-service-process/Readme.md index e69de29b..324d1b3d 100644 --- a/packages/open-collaboration-service-process/Readme.md +++ b/packages/open-collaboration-service-process/Readme.md @@ -0,0 +1,76 @@ +# Open Collaboration Service Process + +Open Collaboration Tools is a collection of open source tools, libraries and extensions for live-sharing of IDE contents, designed to boost remote teamwork with open technologies. For more information about this project, please [read the announcement](https://www.typefox.io/blog/open-collaboration-tools-announcement/). + +This package is a standalone Node.js application, which helps to simplify integration of OCT with non-TypeScript environments, by providing a stdin/stdout based [JSON-RPC](https://www.jsonrpc.org/) API. + +It takes over encryption, session lifecycle management, and includes Yjs integration for collision-free real-time editing of documents, +so client applications do not need to implement these complex features themselves. + +The artifact needs to be cjs to be able to be bundled and made into an SEA because of socket.io's dynamic requires. + +## Usage + +### Starting the Service Process + +Start the process by either using [Node.js](https://nodejs.org) to call `process.js` or use a prebuilt executable: + +```sh +node ./lib/process.js --server-address http://localhost:8100 --auth-token +``` + +- `--server-address` (**required**): The address of the collaboration server to connect to (e.g., `https://api.open-collab.tools`). +- `--auth-token` (**optional**): The authentication token to use for the session, if saved by the application from a previous login + +### Communication Protocol + +All communication happens via JSON-RPC 2.0 messages over stdin/stdout. +See [messages.ts](src/messages.ts) for service process specific awareness or lifecycle messages. Other messages follow the open-collaboration-protocol. + +For specific examples see `service.process.test.ts` or the [IntellIJ integration](https://github.com/eclipse-oct/oct-intellij) + +### Sending and Receiving Binary Data + +- For efficient document and file transfer, binary data is supported. +- Use the `BinaryData` type for parameters or results that contain binary content. +- Binary data is encoded as base64-encoded [MessagePack](https://msgpack.org/) in the `data` field of a `BinaryData` object. + +#### Example: Sending Binary Data + +```json +{ + "jsonrpc": "2.0", + "id": 1, + "method": "awareness/getDocumentContent", + "params": ["path/to/file"] +} +``` + +The response will be: + +```json +{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "type": "binaryData", + "data": "" + } +} +``` + +Or sending Binary Data as a Parameter: + +```json +{ + "jsonrpc": "2.0", + "id": 2, + "method": "fileSystem/writeFile", + "params": [ + { + "type": "binaryData", + "data": "" + } + ] +} +``` diff --git a/packages/open-collaboration-service-process/package.json b/packages/open-collaboration-service-process/package.json index 5b39d056..2fbd4e9c 100644 --- a/packages/open-collaboration-service-process/package.json +++ b/packages/open-collaboration-service-process/package.json @@ -30,7 +30,7 @@ "scripts": { "start": "node lib/process.js", "start:direct": "tsx src/process.ts", - "build": "esbuild ./lib/process.js --bundle --platform=node --format=esm --outfile=lib/bundle.js", + "build": "esbuild ./src/process.ts --bundle --platform=node --format=cjs --outfile=lib/bundle.cjs", "create:executable": "npm run build && shx mkdir -p bin && node --experimental-sea-config sea-config.json && node scripts/sea-build.mjs" }, "dependencies": { diff --git a/packages/open-collaboration-service-process/scripts/sea-build.mjs b/packages/open-collaboration-service-process/scripts/sea-build.mjs index 82ed8396..17e24ec4 100644 --- a/packages/open-collaboration-service-process/scripts/sea-build.mjs +++ b/packages/open-collaboration-service-process/scripts/sea-build.mjs @@ -8,7 +8,7 @@ import { inject } from 'postject' * The resulting executable can be run as a standalone application without the need of nodejs being installed */ -var EXECUTABLE_NAME = 'oct-servcice-process' +var EXECUTABLE_NAME = 'oct-service-process' if (process.platform === 'win32') { EXECUTABLE_NAME = EXECUTABLE_NAME + '.exe' diff --git a/packages/open-collaboration-service-process/src/collaboration-instance.ts b/packages/open-collaboration-service-process/src/collaboration-instance.ts index f8482ce2..07e97106 100644 --- a/packages/open-collaboration-service-process/src/collaboration-instance.ts +++ b/packages/open-collaboration-service-process/src/collaboration-instance.ts @@ -5,14 +5,13 @@ // ****************************************************************************** import * as types from 'open-collaboration-protocol'; import { DisposableCollection, Deferred } from 'open-collaboration-protocol'; -import { LOCAL_ORIGIN, OpenCollaborationYjsProvider } from 'open-collaboration-yjs'; +import { LOCAL_ORIGIN, OpenCollaborationYjsProvider, YjsNormalizedTextDocument, YTextChange } from 'open-collaboration-yjs'; import * as Y from 'yjs'; -import { Mutex } from 'async-mutex'; import * as awarenessProtocol from 'y-protocols/awareness'; -import { BinaryResponse, ClientTextSelection, JoinSessionRequest, OCPBroadCast, OCPNotification, OCPRequest, OnInitNotification, TextDocumentInsert, toEncodedOCPMessage, UpdateDocumentContent, UpdateTextSelection } from './messages.js'; +import { BinaryData, BinaryResponse, ClientTextSelection, EditorOpenedNotification, fromBinaryMessage, GetDocumentContent, JoinSessionRequest, OnInitNotification, PeerJoinedNotification, PeerLeftNotification, SessionClosedNotification, TextDocumentInsert, toBinaryMessage, UpdateDocumentContent, UpdateTextSelection } from './messages.js'; import { MessageConnection } from 'vscode-jsonrpc'; -export class CollaborationInstance implements types.Disposable{ +export class CollaborationInstance implements types.Disposable { protected peers = new Map(); protected hostInfo = new Deferred(); @@ -21,14 +20,23 @@ export class CollaborationInstance implements types.Disposable{ protected yjsProvider?: OpenCollaborationYjsProvider; protected YjsDoc: Y.Doc; protected yjsAwareness; - private yjsMutex = new Mutex(); protected connectionDisposables: DisposableCollection = new DisposableCollection(); + private yjsDocuments = new Map(); + protected identity = new Deferred(); - constructor(public currentConnection: types.ProtocolBroadcastConnection, protected communicationHandler: MessageConnection, protected host: boolean, workspace?: types.Workspace) { - if(host && !workspace) { + private encoder = new TextEncoder(); + + isDisposed = false; + + constructor(public octConnection: types.ProtocolBroadcastConnection, protected clientConnection: MessageConnection, protected isHost: boolean, workspace?: types.Workspace) { + process.on('beforeExit', () => { + this.leaveRoom(); + }); + + if(isHost && !workspace) { throw new Error('Host must provide workspace'); } this.YjsDoc = new Y.Doc(); @@ -45,46 +53,54 @@ export class CollaborationInstance implements types.Disposable{ this.yjsAwareness.destroy(); }}); - this.yjsProvider = new OpenCollaborationYjsProvider(currentConnection, this.YjsDoc, this.yjsAwareness, { + this.yjsProvider = new OpenCollaborationYjsProvider(octConnection, this.YjsDoc, this.yjsAwareness, { resyncTimer: 10_000 }); this.yjsProvider.connect(); - this.connectionDisposables.push(currentConnection.onReconnect(() => { + this.connectionDisposables.push(octConnection.onReconnect(() => { this.yjsProvider?.connect(); })); - currentConnection.onDisconnect(() => { + octConnection.onDisconnect(() => { + this.dispose(); + }); + + octConnection.room.onClose(() => { this.dispose(); }); - currentConnection.onRequest(async (origin, method, ...params) => { - const result = await this.communicationHandler.sendRequest(OCPRequest, toEncodedOCPMessage({ - method, - params - })); - return BinaryResponse.is(result) ? types.Encoding.decode(Uint8Array.from(Buffer.from(result.data, 'base64'))) : result; + octConnection.onRequest(async (origin, method, ...params) => { + const result = await this.clientConnection.sendRequest(method, ...this.convertBinaryParams(params), origin); + return BinaryData.is(result) ? fromBinaryMessage(result.data) : result; }); - currentConnection.onNotification((origin, method, ...params) => { - this.communicationHandler.sendNotification(OCPNotification, toEncodedOCPMessage({method, params})); + octConnection.onNotification((origin, method, ...params) => { + this.clientConnection.sendNotification(method, ...this.convertBinaryParams(params), origin); }); - currentConnection.onBroadcast((origin, method, ...params) => { - this.communicationHandler.sendNotification(OCPBroadCast, toEncodedOCPMessage({method, params})); + octConnection.onBroadcast((origin, method, ...params) => { + this.clientConnection.sendNotification(method, ...this.convertBinaryParams(params), origin); }); - currentConnection.peer.onJoinRequest(async (_, user) => { - const accepted = await this.communicationHandler.sendRequest(JoinSessionRequest, user); + octConnection.peer.onJoinRequest(async (_, user) => { + const accepted = await this.clientConnection.sendRequest(JoinSessionRequest, user); return accepted ? { workspace: workspace! } : undefined; }); - currentConnection.peer.onInfo((_, peer) => { + octConnection.peer.onInfo((_, peer) => { this.yjsAwareness.setLocalStateField('peer', peer.id); this.identity.resolve(peer); }); - currentConnection.room.onJoin(async (_, peer) => { - if (host && workspace) { + octConnection.editor.onOpen(async (peerId, documentPath) => { + if(!this.YjsDoc.share.has(documentPath)) { + this.registerYjsObject('text', documentPath, ''); + } + this.clientConnection.sendNotification(EditorOpenedNotification, documentPath, peerId); + }); + + octConnection.room.onJoin(async (_, peer) => { + if (isHost && workspace) { // Only initialize the user if we are the host const initData: types.InitData = { protocol: types.VERSION, @@ -97,76 +113,95 @@ export class CollaborationInstance implements types.Disposable{ folders: workspace.folders ?? [] } }; - currentConnection.peer.init(peer.id, initData); + octConnection.peer.init(peer.id, initData); } + this.clientConnection.sendNotification(PeerJoinedNotification, peer); }); - currentConnection.peer.onInit((_, initData) => { + octConnection.room.onLeave(async (_, peer) => { + this.peers.delete(peer.id); + this.clientConnection.sendNotification(PeerLeftNotification, peer); + }); + + octConnection.peer.onInit((_, initData) => { this.peers.set(initData.host.id, initData.host); this.hostInfo.resolve(initData.host); for (const guest of initData.guests) { this.peers.set(guest.id, guest); } - this.communicationHandler.sendNotification(OnInitNotification, initData); + this.clientConnection.sendNotification(OnInitNotification, initData); + }); + + clientConnection.onRequest(GetDocumentContent, async (documentPath) => { + let fileContent: types.FileData | undefined = undefined; + if(this.YjsDoc.share.has(documentPath)) { + const text = this.YjsDoc.getText(documentPath); + fileContent = { + content: this.encoder.encode(text.toString()), + } as types.FileData; + + } else { + fileContent = await octConnection.fs.readFile((await this.hostInfo.promise).id, documentPath); + } + + return { + type: 'binaryData', + data: toBinaryMessage(fileContent), + method: GetDocumentContent.method, + } as BinaryResponse; + }); } async registerYjsObject(type: string, documentPath: string, text: string) { if(type === 'text') { - const yjsText = this.YjsDoc.getText(documentPath); - if (this.host) { - this.YjsDoc.transact(() => { - yjsText.delete(0, yjsText.length); - yjsText.insert(0, text); - }); + const normalizedDocument = this.getNormalizedDocument(documentPath); + if (this.isHost) { + normalizedDocument.update({changes: text}); } else { - this.currentConnection.editor.open((await this.hostInfo.promise).id, documentPath); + this.octConnection.editor.open((await this.hostInfo.promise).id, documentPath); } - const observer = (textEvent: Y.YTextEvent) => { - if (textEvent.transaction.local) { - // Ignore own events or if the document is already in sync - return; - } - const edits: TextDocumentInsert[] = []; - let index = 0; - textEvent.delta.forEach(delta => { - if (typeof delta.retain === 'number') { - index += delta.retain; - } else if (typeof delta.insert === 'string') { - edits.push({ - startOffset: index, - text: delta.insert, - }); - index += delta.insert.length; - } else if (typeof delta.delete === 'number') { - edits.push({ - startOffset: index, - endOffset: index + delta.delete, - text: '', - }); - } - }); - this.communicationHandler.sendNotification(UpdateDocumentContent, documentPath, edits); - }; - yjsText.observe(observer); } } + private getNormalizedDocument(path: string): YjsNormalizedTextDocument { + let yjsDocument = this.yjsDocuments.get(path); + if (!yjsDocument) { + yjsDocument = new YjsNormalizedTextDocument(this.YjsDoc.getText(path), async changes => { + this.clientConnection.sendNotification(UpdateDocumentContent, path, changes.map(change => { + const start = yjsDocument!.normalizedOffset(change.start); + const end = yjsDocument!.normalizedOffset(change.end); + return { + startOffset: start, + endOffset: end, + text: change.text + } as TextDocumentInsert; + })); + }); + this.yjsDocuments.set(path, yjsDocument); + } + return yjsDocument; + } + updateYjsObjectContent(documentPath: string, changes: TextDocumentInsert[]) { if (changes.length === 0) { return; } - this.yjsMutex.runExclusive(async () => { - const yjsText = this.YjsDoc.getText(documentPath); - this.YjsDoc.transact(() => { - for(const change of changes) { - if(change.endOffset) { - yjsText.delete(change.startOffset, change.endOffset - change.startOffset); - } - yjsText.insert(change.startOffset, change.text); - } - }); - }); + if (documentPath) { + + const normalizedDocument = this.getNormalizedDocument(documentPath); + const textChanges: YTextChange[] = []; + for (const change of changes) { + const start = change.startOffset; + const end = change.endOffset ?? change.startOffset; + textChanges.push({ + start, + end, + text: change.text + }); + } + normalizedDocument.update({ changes: textChanges }); + } } private selectionState: Map = new Map(); @@ -176,17 +211,23 @@ export class CollaborationInstance implements types.Disposable{ const currentSelections: Map = new Map(); - for (const [clientID, state] of states.entries()) { - if (types.ClientTextSelection.is(state.selection)) { - const selections = state.selection.textSelections.map(s => ({ - peer: state.peer, - start: s.start.assoc, - end: s.end.assoc, - isReversed: s.direction === types.SelectionDirection.RightToLeft - })); - currentSelections.has(state.peer) ? - currentSelections.get(state.peer)!.push(...selections) : - currentSelections.set(clientID.toString(), selections); + for (const [clientId, state] of states.entries()) { + if (types.ClientTextSelection.is(state.selection) && clientId !== this.yjsAwareness.clientID) { + const normalizedDocument = this.getNormalizedDocument(state.selection.path); + + const selections = state.selection.textSelections.map(s => { + const start = Y.createAbsolutePositionFromRelativePosition(s.start, this.YjsDoc)?.index ?? 0; + const end = Y.createAbsolutePositionFromRelativePosition(s.end, this.YjsDoc)?.index; + return { + peer: state.peer, + start: normalizedDocument.normalizedOffset(start), + end: normalizedDocument.normalizedOffset(end ?? start), + isReversed: s.direction === types.SelectionDirection.RightToLeft + }; + }); + currentSelections.has(state.selection.path) ? + currentSelections.get(state.selection.path)!.push(...selections) : + currentSelections.set(state.selection.path, selections); } } @@ -201,7 +242,7 @@ export class CollaborationInstance implements types.Disposable{ this.selectionState = currentSelections; for (const document of documentUpdates) { - this.communicationHandler.sendNotification(UpdateTextSelection, document, this.selectionState.get(document) ?? []); + this.clientConnection.sendNotification(UpdateTextSelection, document, this.selectionState.get(document) ?? []); } } @@ -216,7 +257,7 @@ export class CollaborationInstance implements types.Disposable{ types.SelectionDirection.RightToLeft : types.SelectionDirection.LeftToRight, start: Y.createRelativePositionFromTypeIndex(ytext, clientSelection.start), - end: Y.createRelativePositionFromTypeIndex(ytext, clientSelection.end) + end: Y.createRelativePositionFromTypeIndex(ytext, clientSelection.end ?? clientSelection.start) }); } const textSelection: types.ClientTextSelection = { @@ -234,8 +275,25 @@ export class CollaborationInstance implements types.Disposable{ this.yjsAwareness.setLocalStateField('selection', selection); } + async leaveRoom(): Promise { + await this.octConnection.room.leave(); + await new Promise(resolve => setTimeout(resolve, 100)); + this.dispose(); + } + + private convertBinaryParams(params: unknown[]): unknown[] { + return params.map(param => BinaryData.shouldConvert(param) ? { type: 'binaryData', data: toBinaryMessage(param) } as BinaryData: param); + } + dispose(): void { + if (this.isDisposed) { + return; + } + this.isDisposed = true; + this.clientConnection.sendNotification(SessionClosedNotification); + this.octConnection.dispose(); this.yjsProvider?.dispose(); + this.YjsDoc.destroy(); this.connectionDisposables.dispose(); } } diff --git a/packages/open-collaboration-service-process/src/message-handler.ts b/packages/open-collaboration-service-process/src/message-handler.ts index 87558b13..b4f96743 100644 --- a/packages/open-collaboration-service-process/src/message-handler.ts +++ b/packages/open-collaboration-service-process/src/message-handler.ts @@ -4,10 +4,10 @@ // terms of the MIT License, which is available in the project root. // ****************************************************************************** -import type * as types from 'open-collaboration-protocol'; -import { CloseSessionRequest, CreateRoomRequest, fromEncodedOCPMessage, InternalError, JoinRoomRequest, - LoginRequest, OCPBroadCast, OCPNotification, OCPRequest, OpenDocument, - SessionData, UpdateDocumentContent, UpdateTextSelection } from './messages.js'; +import * as types from 'open-collaboration-protocol'; +import { BinaryData, BinaryResponse, CloseSessionRequest, CreateRoomRequest, fromBinaryMessage, InternalError, JoinRoomRequest, + LoginRequest, OpenDocument, + SessionData, toBinaryMessage, UpdateDocumentContent, UpdateTextSelection } from './messages.js'; import { CollaborationInstance } from './collaboration-instance.js'; import { MessageConnection } from 'vscode-jsonrpc'; @@ -17,60 +17,106 @@ export class MessageHandler { protected lastRequestId = 0; - constructor(private connectionProvider: types.ConnectionProvider, private communicationHandler: MessageConnection) { - communicationHandler.onRequest(LoginRequest, async () => this.login()); - communicationHandler.onRequest(JoinRoomRequest, this.joinRoom.bind(this)); - communicationHandler.onRequest(CreateRoomRequest, this.createRoom.bind(this)); - communicationHandler.onRequest(CloseSessionRequest, () => this.currentCollaborationInstance?.currentConnection.dispose()); - communicationHandler.onNotification(OpenDocument, (p1, p2, p3) => this.currentCollaborationInstance?.registerYjsObject(p1, p2, p3)); - communicationHandler.onNotification(UpdateTextSelection, (p1, p2) => this.currentCollaborationInstance?.updateYjsObjectSelection(p1, p2)); - communicationHandler.onNotification(UpdateDocumentContent, (p1, p2) => this.currentCollaborationInstance?.updateYjsObjectContent(p1, p2)); - communicationHandler.onError(([error]) => communicationHandler.sendNotification(InternalError, {message: error.message, stack: error.stack})); - - communicationHandler.onRequest(OCPRequest, (rawMessage) => { - const message = typeof rawMessage === 'string' ? fromEncodedOCPMessage(rawMessage) : rawMessage; - return this.currentCollaborationInstance?.currentConnection.sendRequest(message.method, message.target, ...message.params); + constructor(private octConnectionProvider: types.ConnectionProvider, private clientCommunication: MessageConnection) { + clientCommunication.onRequest(LoginRequest, async () => this.login()); + clientCommunication.onRequest(JoinRoomRequest, this.joinRoom.bind(this)); + clientCommunication.onRequest(CreateRoomRequest, this.createRoom.bind(this)); + clientCommunication.onRequest(CloseSessionRequest, async () => { + if(this.currentCollaborationInstance && !this.currentCollaborationInstance.isDisposed) { + await this.currentCollaborationInstance?.leaveRoom(); + this.currentCollaborationInstance = undefined; + } }); - communicationHandler.onNotification(OCPNotification, async (rawMessage) => { - const message = typeof rawMessage === 'string' ? fromEncodedOCPMessage(rawMessage) : rawMessage; - this.currentCollaborationInstance?.currentConnection.sendNotification(message.method, message.target, ...message.params); + clientCommunication.onNotification(OpenDocument, (p1, p2, p3) => this.currentCollaborationInstance?.registerYjsObject(p1, p2, p3)); + clientCommunication.onNotification(UpdateTextSelection, (p1, p2) => this.currentCollaborationInstance?.updateYjsObjectSelection(p1, p2)); + clientCommunication.onNotification(UpdateDocumentContent, (p1, p2) => this.currentCollaborationInstance?.updateYjsObjectContent(p1, p2)); + clientCommunication.onError(([error]) => clientCommunication.sendNotification(InternalError, {message: error.message, stack: error.stack})); + + clientCommunication.onRequest(async (method, params) => { + if(!types.isArray(params) || params.length === 0 || typeof params[params.length - 1] !== 'string') { + throw new Error(`Invalid parameters for non service process specific request with method: ${method}, missing target`); + } + + const target = params[params.length - 1] as string; + const messageParams = params.slice(0, params.length - 1).map((param) => { + if(BinaryData.is(param)) { + return fromBinaryMessage(param.data); + } + return param; + }); + + const result = await this.currentCollaborationInstance?.octConnection.sendRequest(method, target, ...messageParams); + + return BinaryData.shouldConvert(result) ? { + type: 'binaryData', + method, + data: toBinaryMessage(result), + } as BinaryResponse : result; }); - communicationHandler.onNotification(OCPBroadCast, async (rawMessage) => { - const message = typeof rawMessage === 'string' ? fromEncodedOCPMessage(rawMessage) : rawMessage; - this.currentCollaborationInstance?.currentConnection.sendBroadcast(message.method, ...message.params); + clientCommunication.onNotification(async (method, params) => { + if(!types.isArray(params) || params.length === 0 || typeof params[params.length - 1] !== 'string') { + throw new Error(`Invalid parameters for non service process specific notification or broadcast with method: ${method}, missing target or 'broadcast'`); + } + + const metaDataParam = params[params.length - 1]; + + const messageParams = params.slice(0, params.length - 1).map((param) => { + if(BinaryData.is(param)) { + return fromBinaryMessage(param.data); + } + return param; + }); + + if(metaDataParam === 'broadcast') { + this.currentCollaborationInstance?.octConnection.sendBroadcast(method, ...messageParams); + } else { + this.currentCollaborationInstance?.octConnection.sendNotification(method, metaDataParam, ...messageParams); + } }); } async login(): Promise { - const authToken = await this.connectionProvider.login({ }); - return authToken; + try { + const authToken = await this.octConnectionProvider.login({ }); + return authToken; + } catch (error) { + throw new Error(`Failed to login: ${error}`); + } } async joinRoom(roomId: string): Promise { - const resp = await this.connectionProvider.joinRoom({ roomId }); - this.onConnection(await this.connectionProvider.connect(resp.roomToken), false); - return { - roomId: resp.roomId, - roomToken: resp.roomToken, - authToken: resp.loginToken ?? this.connectionProvider.authToken, - workspace: resp.workspace - }; + try { + const resp = await this.octConnectionProvider.joinRoom({ roomId }); + this.onConnection(await this.octConnectionProvider.connect(resp.roomToken), false); + return { + roomId: resp.roomId, + roomToken: resp.roomToken, + authToken: resp.loginToken ?? this.octConnectionProvider.authToken, + workspace: resp.workspace + }; + } catch (error) { + throw new Error(`Failed to join room: ${error}`); + } } async createRoom(workspace: types.Workspace): Promise { - const resp = await this.connectionProvider.createRoom({}); - this.onConnection(await this.connectionProvider.connect(resp.roomToken), true, workspace); - return { - roomId: resp.roomId, - roomToken: resp.roomToken, - authToken: resp.loginToken ?? this.connectionProvider.authToken, - workspace, - }; + try { + const resp = await this.octConnectionProvider.createRoom({}); + this.onConnection(await this.octConnectionProvider.connect(resp.roomToken), true, workspace); + return { + roomId: resp.roomId, + roomToken: resp.roomToken, + authToken: resp.loginToken ?? this.octConnectionProvider.authToken, + workspace, + }; + } catch (error) { + throw new Error(`Failed to create room: ${error}`); + } } onConnection(connection: types.ProtocolBroadcastConnection, host: boolean, workspace?: types.Workspace) { this.currentCollaborationInstance?.dispose(); - this.currentCollaborationInstance = new CollaborationInstance(connection, this.communicationHandler, host, workspace); + this.currentCollaborationInstance = new CollaborationInstance(connection, this.clientCommunication, host, workspace); } dispose() { diff --git a/packages/open-collaboration-service-process/src/messages.ts b/packages/open-collaboration-service-process/src/messages.ts index 3165a511..5fae763b 100644 --- a/packages/open-collaboration-service-process/src/messages.ts +++ b/packages/open-collaboration-service-process/src/messages.ts @@ -5,6 +5,7 @@ // ****************************************************************************** import * as types from 'open-collaboration-protocol'; import { Encoding } from 'open-collaboration-protocol'; +import { isTypedArray } from 'util/types'; import { NotificationType, NotificationType2, NotificationType3, RequestType } from 'vscode-jsonrpc'; export function isOCPMessage(message: unknown): message is OCPMessage { @@ -17,20 +18,6 @@ export interface OCPMessage { target?: string } -// ***************************** generic messages ***************************** -// all params can be either msgpack encoded base64 strings of OCPMessages or just directly OCPMessages -export const OCPRequest = new RequestType('request'); -export const OCPNotification = new NotificationType('notification'); -export const OCPBroadCast = new NotificationType('broadcast'); - -export function fromEncodedOCPMessage(encoded: string): OCPMessage { - return Encoding.decode(Uint8Array.from(Buffer.from(encoded, 'base64'))) as OCPMessage; -} - -export function toEncodedOCPMessage(message: OCPMessage): string { - return Buffer.from(Encoding.encode(message)).toString('base64'); -} - // ***************************** To service process ***************************** export namespace ToServiceMessages { @@ -40,6 +27,7 @@ export namespace ToServiceMessages { export const CREATE_ROOM = 'room/createRoom'; export const CLOSE_SESSION = 'room/closeSession'; export const OPEN_DOCUMENT = 'awareness/openDocument'; + export const GET_DOCUMENT_CONTENT = 'awareness/getDocumentContent'; export const UPDATE_TEXT_SELECTION = 'awareness/updateTextSelection'; export const UPDATE_DOCUMENT_CONTENT = 'awareness/updateDocument'; } @@ -81,7 +69,7 @@ export interface TextDocumentInsert { export interface ClientTextSelection { start: number, - end: number, + end?: number, isReversed: boolean peer?: string } @@ -92,6 +80,12 @@ export interface ClientTextSelection { */ export const OpenDocument = new NotificationType3(ToServiceMessages.OPEN_DOCUMENT); +/** + * params: [documentPath] + * resp params: [documentContent as message pack encoded base64 string of types.FileContent] + */ +export const GetDocumentContent = new RequestType(ToServiceMessages.GET_DOCUMENT_CONTENT); + /** * params: [documentPath, selections] */ @@ -119,6 +113,11 @@ export const Authentication = new NotificationType2( */ export const OnInitNotification = new NotificationType('init'); +/** + * params : [documentPath, peerId] + */ +export const EditorOpenedNotification = new NotificationType2('editorOpened'); + /** * A request to the application to allow a user to join the current session * params: [user] @@ -126,18 +125,52 @@ export const OnInitNotification = new NotificationType('init'); */ export const JoinSessionRequest = new RequestType(ToServiceMessages.JOIN_SESSION_REQUEST); +export const PeerJoinedNotification = new NotificationType('peerJoined'); +export const PeerLeftNotification = new NotificationType('peerLeft'); + +export const SessionClosedNotification = new NotificationType('sessionClosed'); + /** * params: [error message, stack trace] */ export const InternalError = new NotificationType<{message: string, stack?: string}>('error'); -export namespace BinaryResponse { - export function is(message: unknown): message is BinaryResponse { - return types.isObject(message) && types.isString(message.data) && message.type === 'binaryResponse'; +// ***************************** binary encoding of message parameters ***************************** +// always use a BinaryData object to send binary data +export function fromBinaryMessage(encoded: string): unknown { + return Encoding.decode(Uint8Array.from(Buffer.from(encoded, 'base64'))); +} + +export function toBinaryMessage(message: unknown): string { + return Buffer.from(Encoding.encode(message)).toString('base64'); +} +export namespace BinaryData { + export function is(message: unknown): message is BinaryData { + return types.isObject(message) && types.isString(message.data) && message.type === 'binaryData'; + } + + export function shouldConvert(message: unknown): boolean { + if (typeof message !== 'object' || message === null) { + return false; + } + + if(isTypedArray(message)) { + return true; + } + + if (Array.isArray(message)) { + return message.some((item) => shouldConvert(item)); + } + + return Object.keys(message).some((key) => shouldConvert((message as Record)[key])); } } -export interface BinaryResponse { - type: 'binaryResponse' +export interface BinaryData { + type: 'binaryData' data: string } + +export interface BinaryResponse extends BinaryData { + method?: string +} diff --git a/packages/open-collaboration-service-process/test/service.process.test.ts b/packages/open-collaboration-service-process/test/service.process.test.ts index a451fafd..39c651f7 100644 --- a/packages/open-collaboration-service-process/test/service.process.test.ts +++ b/packages/open-collaboration-service-process/test/service.process.test.ts @@ -6,9 +6,9 @@ import { ChildProcessWithoutNullStreams, spawn } from 'child_process'; import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, test } from 'vitest'; -import { Authentication, CreateRoomRequest, fromEncodedOCPMessage, JoinRoomRequest, JoinSessionRequest, OCPRequest, OnInitNotification, OpenDocument, toEncodedOCPMessage, UpdateDocumentContent, UpdateTextSelection } from 'open-collaboration-service-process'; -import { Deferred } from 'open-collaboration-protocol'; -import { createMessageConnection, MessageConnection, StreamMessageReader, StreamMessageWriter } from 'vscode-jsonrpc/node.js'; +import { Authentication, BinaryData, CreateRoomRequest, fromBinaryMessage, JoinRoomRequest, JoinSessionRequest, OnInitNotification, OpenDocument, toBinaryMessage, UpdateDocumentContent, UpdateTextSelection } from 'open-collaboration-service-process'; +import { Deferred, FileData } from 'open-collaboration-protocol'; +import { createMessageConnection, Message, MessageConnection, StreamMessageReader, StreamMessageWriter } from 'vscode-jsonrpc/node.js'; const SERVER_ADDRESS = 'http://localhost:8100'; class Client { @@ -27,7 +27,24 @@ class Client { this.communicationHandler = createMessageConnection( new StreamMessageReader(this.process.stdout), - new StreamMessageWriter(this.process.stdin)); + new StreamMessageWriter(this.process.stdin), undefined, {messageStrategy: { + handleMessage(message, next) { + // conversion of binary data to javascript objects + if (Message.isNotification(message) || Message.isRequest(message)) { + if (Array.isArray(message.params)) { + message.params = message.params?.map((param) => + BinaryData.is(param) ? fromBinaryMessage(param.data) : param); + } else { + message.params = BinaryData.is(message.params) ? fromBinaryMessage(message.params.data) as object : message.params; + } + } else if (Message.isResponse(message)) { + if (BinaryData.is(message.result)) { + message.result = fromBinaryMessage(message.result.data) as any; + } + } + next(message); + }, + }}); this.communicationHandler.listen(); } } @@ -85,17 +102,23 @@ describe('Service Process', () => { selectionArived.resolve(); }); - host.communicationHandler.onRequest(OCPRequest, ((rawMessage) => { - const message = typeof rawMessage === 'string' ? fromEncodedOCPMessage(rawMessage) : rawMessage; - if(message.method === 'fileSystem/stat') { - return {method: 'fileSystem/stat', params: [{ - type: 2, - mtime: 2132123, - ctime: 124112, - size: 1231, - }]}; - } - return 'error'; + host.communicationHandler.onRequest('fileSystem/stat', (() => { + return {method: 'fileSystem/stat', params: [{ + type: 2, + mtime: 2132123, + ctime: 124112, + size: 1231, + }]}; + })); + + host.communicationHandler.onRequest('fileSystem/readFile', ((path: string) => { + expect(path).toEqual('testFolder/test.txt'); + return { + type: 'binaryData', + data: toBinaryMessage({ + content: Uint8Array.from(new TextEncoder().encode('HELLO WORLD!')), + } as FileData), + } as BinaryData; })); // Setup guest message handlers @@ -120,9 +143,13 @@ describe('Service Process', () => { expect(hostId).toBeTruthy(); - const folderStat = await guest.communicationHandler.sendRequest(OCPRequest, toEncodedOCPMessage({ method: 'fileSystem/stat', params: ['testFolder'], target: hostId })); + const folderStat = await guest.communicationHandler.sendRequest('fileSystem/stat', 'testFolder', hostId); expect(folderStat).toBeDefined(); + // sending the file path as binary only for testing the conversion + const fileContent = await guest.communicationHandler.sendRequest('fileSystem/readFile', {type: 'binaryData', data: toBinaryMessage('testFolder/test.txt')} as BinaryData, hostId) as FileData; + expect(new TextDecoder().decode(fileContent.content)).toEqual('HELLO WORLD!'); + host.communicationHandler.sendNotification(OpenDocument, 'text', 'testFolder/test.txt', 'HELLO WORLD!'); guest.communicationHandler.sendNotification(OpenDocument, 'text', 'testFolder/test.txt', 'HELLO WORLD!'); @@ -134,7 +161,7 @@ describe('Service Process', () => { await updateArived.promise; - }, 2000000); + }, 60000); }); async function makeSimpleLoginRequest(token: string, username: string) {