diff --git a/bin/worker.js b/bin/worker.js old mode 100644 new mode 100755 diff --git a/package.json b/package.json index c85e744..a44e44e 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,7 @@ "socket.io-client": "^4.8.0", "toobusy-js": "^0.5.1", "y-protocols": "^1.0.6", - "yjs": "^13.6.18" + "yjs": "^13.6.27" }, "optionalDependencies": { "minio": "^7.1.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 13f8261..dd26d29 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -25,10 +25,10 @@ importers: version: 0.5.1 y-protocols: specifier: ^1.0.6 - version: 1.0.6(yjs@13.6.18) + version: 1.0.6(yjs@13.6.27) yjs: - specifier: ^13.6.18 - version: 13.6.18 + specifier: ^13.6.27 + version: 13.6.27 optionalDependencies: minio: specifier: ^7.1.3 @@ -1285,6 +1285,11 @@ packages: resolution: {integrity: sha512-+bT2uH4E5LGE7h/n3evcS/sQlJXCpIp6ym8OWJ5eV6+67Dsql/LaaT7qJBAt2rzfoa/5QBGBhxDix1dMt2kQKQ==} engines: {node: '>= 0.8.0'} + lib0@0.2.114: + resolution: {integrity: sha512-gcxmNFzA4hv8UYi8j43uPlQ7CGcyMJ2KQb5kZASw6SnAKAf10hK12i2fjrS3Cl/ugZa5Ui6WwIu1/6MIXiHttQ==} + engines: {node: '>=16'} + hasBin: true + lib0@0.2.93: resolution: {integrity: sha512-M5IKsiFJYulS+8Eal8f+zAqf5ckm1vffW0fFDxfgxJ+uiVopvDdd3PxJmz0GsVi3YNO7QCFSq0nAsiDmNhLj9Q==} engines: {node: '>=16'} @@ -2013,8 +2018,8 @@ packages: resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==} engines: {node: '>=12'} - yjs@13.6.18: - resolution: {integrity: sha512-GBTjO4QCmv2HFKFkYIJl7U77hIB1o22vSCSQD1Ge8ZxWbIbn8AltI4gyXbtL+g5/GJep67HCMq3Y5AmNwDSyEg==} + yjs@13.6.27: + resolution: {integrity: sha512-OIDwaflOaq4wC6YlPBy2L6ceKeKuF7DeTxx+jPzv1FHn9tCZ0ZwSRnUBxD05E3yed46fv/FWJbvR+Ud7x0L7zw==} engines: {node: '>=16.0.0', npm: '>=8.0.0'} yocto-queue@0.1.0: @@ -3331,6 +3336,10 @@ snapshots: prelude-ls: 1.2.1 type-check: 0.4.0 + lib0@0.2.114: + dependencies: + isomorphic.js: 0.2.5 + lib0@0.2.93: dependencies: isomorphic.js: 0.2.5 @@ -4142,10 +4151,10 @@ snapshots: xmlhttprequest-ssl@2.1.1: {} - y-protocols@1.0.6(yjs@13.6.18): + y-protocols@1.0.6(yjs@13.6.27): dependencies: lib0: 0.2.93 - yjs: 13.6.18 + yjs: 13.6.27 y18n@5.0.8: {} @@ -4163,8 +4172,8 @@ snapshots: y18n: 5.0.8 yargs-parser: 21.1.1 - yjs@13.6.18: + yjs@13.6.27: dependencies: - lib0: 0.2.93 + lib0: 0.2.114 yocto-queue@0.1.0: {} diff --git a/src/api.js b/src/api.js index 73618b7..d6e783a 100644 --- a/src/api.js +++ b/src/api.js @@ -1,5 +1,5 @@ import * as Y from 'yjs' -import * as redis from 'redis' +import { createClient, defineScript, commandOptions } from 'redis' import * as map from 'lib0/map' import * as decoding from 'lib0/decoding' import * as awarenessProtocol from 'y-protocols/awareness' @@ -97,6 +97,7 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar } export class Api { + redis /** * @param {import('./storage.js').AbstractStorage} store * @param {string=} prefix @@ -138,11 +139,11 @@ export class Api { redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL}) ` - this.redis = redis.createClient({ + this.redis = createClient({ url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { - addMessage: redis.defineScript({ + addMessage: defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: addScript, /** @@ -159,7 +160,7 @@ export class Api { return x } }), - xDelIfEmpty: redis.defineScript({ + xDelIfEmpty: defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: ` if redis.call("XLEN", KEYS[1]) == 0 then @@ -193,7 +194,7 @@ export class Api { return [] } const reads = await this.redis.xRead( - redis.commandOptions({ returnBuffers: true }), + commandOptions({ returnBuffers: true }), streams, { BLOCK: 1000, COUNT: 1000 } ) @@ -241,7 +242,7 @@ export class Api { * @param {string} docid */ async getDoc (room, docid) { - const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) + const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) const docMessages = ms.get(room)?.get(docid) || null if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`) const docstate = await this.store.retrieveDoc(room, docid) @@ -289,7 +290,7 @@ export class Api { * @param {string} docid */ async getRedisLastId (room, docid) { - const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) + const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) const docMessages = ms.get(room)?.get(docid) || null return docMessages?.lastId.toString() || '0' } @@ -370,6 +371,7 @@ export class Api { // call YDOC_UPDATE_CALLBACK here const formData = new FormData() // @todo only convert ydoc to updatev2 once + // @ts-ignore formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)])) // @todo should add a timeout to fetch (see fetch signal abortcontroller) const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' }) diff --git a/src/server.js b/src/server.js index 2dee4a9..7e5cd16 100644 --- a/src/server.js +++ b/src/server.js @@ -1,5 +1,4 @@ import * as env from 'lib0/environment' -import * as logging from 'lib0/logging' import * as jwt from 'lib0/crypto/jwt' import * as ecdsa from 'lib0/crypto/ecdsa' import * as json from 'lib0/json' @@ -50,8 +49,12 @@ export const createYSocketIOServer = async ({ } }) - httpServer.listen(port, undefined, undefined, () => { - logging.print(logging.GREEN, '[y-redis] Listening to port ', port) - }) + httpServer.listen(port, undefined, undefined) + + const oriDestroy = server.destroy + server.destroy = async () => { + await oriDestroy.bind(server)() + await new Promise((resolve) => httpServer.close(resolve)) + } return server } diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index a84f140..13ab0b6 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -37,6 +37,9 @@ import { io } from 'socket.io-client' * * @prop {Record=} auth * (Optional) Add the authentication data + * + * @prop {ClientSocket=} socket + * (Optional) Supply custom socket.io client socket. If supplied, `socketIoOptions` will be ignored. */ /** @@ -138,7 +141,8 @@ export class SocketIOProvider extends Observable { awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined, resyncInterval = -1, disableBc = false, - auth = {} + auth = {}, + socket } = {}, socketIoOptions = undefined ) { @@ -157,14 +161,17 @@ export class SocketIOProvider extends Observable { this.disableBc = disableBc this._socketIoOptions = socketIoOptions - this.socket = io(`${this.url}/yjs|${roomName}`, { - autoConnect: false, - transports: ['websocket'], - forceNew: true, - auth, - ...socketIoOptions - }) - this._socketIoOptions = socketIoOptions + if (socket) { + this.socket = socket + } else { + this.socket = io(`${this.url}/yjs|${roomName}`, { + autoConnect: false, + transports: ['websocket'], + forceNew: true, + auth, + ...socketIoOptions + }) + } this.doc.on('update', this.onUpdateDoc) @@ -333,19 +340,24 @@ export class SocketIOProvider extends Observable { ) } if (resyncInterval > 0) { - this.resyncInterval = setInterval(() => { - if (this.socket.disconnected) return - this.socket.emit( - 'sync-step-1', - Y.encodeStateVector(this.doc), - (/** @type {Uint8Array} */ update) => { - Y.applyUpdate(this.doc, new Uint8Array(update), this) - } - ) - }, resyncInterval) + this.resyncInterval = setInterval(() => this.resync(), resyncInterval) } } + /** + * Resynchronize the document with the server by firing `sync-step-1`. + */ + resync () { + if (this.socket.disconnected) return + this.socket.emit( + 'sync-step-1', + Y.encodeStateVector(this.doc), + (/** @type {Uint8Array} */ update) => { + Y.applyUpdate(this.doc, new Uint8Array(update), this) + } + ) + } + /** * Disconnect provider's socket * @type {() => void} @@ -406,6 +418,11 @@ export class SocketIOProvider extends Observable { super.destroy() } + /** + * @type {number} + * @private + */ + _updateRetries = 0 /** * This function is executed when the document is updated, if the instance that * emit the change is not this, it emit the changes by socket and broadcast channel. @@ -414,9 +431,28 @@ export class SocketIOProvider extends Observable { * @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change. * @readonly */ - onUpdateDoc = (update, origin) => { + onUpdateDoc = async (update, origin) => { + if (this._updateRetries > 3) { + this._updateRetries = 0 + this.disconnect() + this.connect() + return + } + if (origin !== this) { - this.socket.emit('sync-update', update) + /** @type {boolean} */ + const ack = await Promise.race([ + new Promise((resolve) => this.socket.emit('sync-update', update, () => resolve(true))), + new Promise((resolve) => setTimeout(() => resolve(false), 3000)) + ]) + if (!ack) { + this._updateRetries++ + if (this.socket.disconnected) return + this.onUpdateDoc(update, origin) + return + } else { + this._updateRetries = 0 + } if (this.bcconnected) { bc.publish( this._broadcastChannel, diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 8f395f5..11bcf73 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -14,6 +14,7 @@ import { User } from './user.js' import { createModuleLogger } from 'lib0/logging' import toobusy from 'toobusy-js' import { promiseWithResolvers } from './utils.js' +import { ClientClosedError } from 'redis' const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') @@ -22,6 +23,7 @@ const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-reval const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000') const WORKER_HEALTH_CHECK_INTERVAL = number.parseInt(env.getConf('y-socket-io-worker-health-check-interval') || '5000') +const NEVER_REJECT_CONNECTION = env.getConf('y-socket-io-never-reject-connection') === 'true' process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally @@ -232,7 +234,7 @@ export class YSocketIO { assert(this.client) assert(this.subscriber) const namespace = this.getNamespaceString(socket.nsp) - if (toobusy()) { + if (!NEVER_REJECT_CONNECTION && toobusy()) { logSocketIO(`warning server too busy, rejecting connection: ${namespace}`) // wait a bit to prevent client reconnect too fast await promise.wait(100) @@ -328,8 +330,8 @@ export class YSocketIO { /** @type {unknown} */ let prevMsg = null - socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => { - if (isDeepStrictEqual(update, prevMsg)) return + socket.on('sync-update', (/** @type {ArrayBuffer} */ update, /** @type {() => void} */ ack) => { + if (isDeepStrictEqual(update, prevMsg)) return ack() assert(this.client) const namespace = this.getNamespaceString(socket.nsp) const message = Buffer.from(update.slice(0, update.byteLength)) @@ -339,6 +341,7 @@ export class YSocketIO { 'index', Buffer.from(this.toRedis('sync-update', message)) ) + .then(() => ack()) .catch(console.error) prevMsg = update }) @@ -392,6 +395,7 @@ export class YSocketIO { this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) if (this.namespaceDocMap.has(ns)) this.debouncedPersist(ns, true) } + logSocketIO(`disconnecting socket in ${ns}, ${nsp?.sockets.size || 0} remaining`) } }) socket.onAnyOutgoing(async (ev) => { @@ -562,7 +566,10 @@ export class YSocketIO { await this.client.trimRoomStream(namespace, 'index') } catch (e) { - console.error(e) + // suppress redis client closed error + if (!(e instanceof ClientClosedError)) { + console.error(e) + } } }, timeoutInterval