From fc5fdb2a2044bf2fa994b7bf4ec06005188c0b6b Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Sat, 30 Aug 2025 16:25:31 +0800 Subject: [PATCH 1/9] fix: update yjs & api type --- bin/worker.js | 0 package.json | 2 +- src/api.js | 5 +++++ 3 files changed, 6 insertions(+), 1 deletion(-) mode change 100644 => 100755 bin/worker.js diff --git a/bin/worker.js b/bin/worker.js old mode 100644 new mode 100755 diff --git a/package.json b/package.json index c85e744..a44e44e 100644 --- a/package.json +++ b/package.json @@ -68,7 +68,7 @@ "socket.io-client": "^4.8.0", "toobusy-js": "^0.5.1", "y-protocols": "^1.0.6", - "yjs": "^13.6.18" + "yjs": "^13.6.27" }, "optionalDependencies": { "minio": "^7.1.3", diff --git a/src/api.js b/src/api.js index 73618b7..37fbe85 100644 --- a/src/api.js +++ b/src/api.js @@ -97,6 +97,8 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar } export class Api { + /** @type {import('@redis/client').RedisClientType & { addMessage: (key: string, message: Buffer) => Promise, xDelIfEmpty: (key: string) => Promise }} */ + redis /** * @param {import('./storage.js').AbstractStorage} store * @param {string=} prefix @@ -138,6 +140,7 @@ export class Api { redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL}) ` + /** @type {import('@redis/client').RedisClientType & { addMessage: (key: string, message: Buffer) => Promise, xDelIfEmpty: (key: string) => Promise }} */ this.redis = redis.createClient({ url, // scripting: https://github.com/redis/node-redis/#lua-scripts @@ -339,6 +342,7 @@ export class Api { const streamlen = await this.redis.xLen(task.stream) if (streamlen === 0) { await this.redis.multi() + // @ts-expect-error custom script on multi .xDelIfEmpty(task.stream) .xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id) .xDel(this.redisWorkerStreamName, task.id) @@ -370,6 +374,7 @@ export class Api { // call YDOC_UPDATE_CALLBACK here const formData = new FormData() // @todo only convert ydoc to updatev2 once + // @ts-ignore formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)])) // @todo should add a timeout to fetch (see fetch signal abortcontroller) const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' }) From 04a4bf25930246417c4201439eeda5dc891aa4c0 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Sat, 30 Aug 2025 20:08:48 +0800 Subject: [PATCH 2/9] fix: disconnect server on destory & silent log --- src/server.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/server.js b/src/server.js index 2dee4a9..3bc4e30 100644 --- a/src/server.js +++ b/src/server.js @@ -50,8 +50,12 @@ export const createYSocketIOServer = async ({ } }) - httpServer.listen(port, undefined, undefined, () => { - logging.print(logging.GREEN, '[y-redis] Listening to port ', port) - }) + httpServer.listen(port, undefined, undefined) + + const oriDestroy = server.destroy + server.destroy = async () => { + await oriDestroy.bind(server)() + await new Promise((resolve) => httpServer.close(resolve)) + } return server } From ded3f1defcb98cb93e02fd60660d2da9d1722798 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Mon, 1 Sep 2025 18:36:59 +0800 Subject: [PATCH 3/9] fix: suppress redis error --- src/y-socket-io/y-socket-io.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 8f395f5..5a52b07 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -14,6 +14,7 @@ import { User } from './user.js' import { createModuleLogger } from 'lib0/logging' import toobusy from 'toobusy-js' import { promiseWithResolvers } from './utils.js' +import { ClientClosedError } from 'redis' const logSocketIO = createModuleLogger('@y/socket-io/server') const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000') @@ -22,6 +23,7 @@ const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-reval const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true' const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000') const WORKER_HEALTH_CHECK_INTERVAL = number.parseInt(env.getConf('y-socket-io-worker-health-check-interval') || '5000') +const NEVER_REJECT_CONNECTION = env.getConf('y-socket-io-never-reject-connection') === 'true' process.on('SIGINT', function () { // calling .shutdown allows your process to exit normally @@ -232,7 +234,7 @@ export class YSocketIO { assert(this.client) assert(this.subscriber) const namespace = this.getNamespaceString(socket.nsp) - if (toobusy()) { + if (!NEVER_REJECT_CONNECTION && toobusy()) { logSocketIO(`warning server too busy, rejecting connection: ${namespace}`) // wait a bit to prevent client reconnect too fast await promise.wait(100) @@ -392,6 +394,7 @@ export class YSocketIO { this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) if (this.namespaceDocMap.has(ns)) this.debouncedPersist(ns, true) } + logSocketIO(`disconnecting socket in ${ns}, ${nsp?.sockets.size || 0} remaining`) } }) socket.onAnyOutgoing(async (ev) => { @@ -562,7 +565,10 @@ export class YSocketIO { await this.client.trimRoomStream(namespace, 'index') } catch (e) { - console.error(e) + // suppress redis client closed error + if (!(e instanceof ClientClosedError)) { + console.error(e) + } } }, timeoutInterval From 2ca9f19d8afc456c356e2e603181ac3398d6e59d Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Tue, 2 Sep 2025 17:03:54 +0800 Subject: [PATCH 4/9] feat: allow supply socket client to provider --- src/y-socket-io/client.js | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index a84f140..1b87b06 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -37,6 +37,9 @@ import { io } from 'socket.io-client' * * @prop {Record=} auth * (Optional) Add the authentication data + * + * @prop {ClientSocket=} socket + * (Optional) Supply custom socket.io client socket. If supplied, `socketIoOptions` will be ignored. */ /** @@ -138,7 +141,8 @@ export class SocketIOProvider extends Observable { awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined, resyncInterval = -1, disableBc = false, - auth = {} + auth = {}, + socket, } = {}, socketIoOptions = undefined ) { @@ -157,13 +161,17 @@ export class SocketIOProvider extends Observable { this.disableBc = disableBc this._socketIoOptions = socketIoOptions - this.socket = io(`${this.url}/yjs|${roomName}`, { - autoConnect: false, - transports: ['websocket'], - forceNew: true, - auth, - ...socketIoOptions - }) + if (socket) { + this.socket = socket + } else { + this.socket = io(`${this.url}/yjs|${roomName}`, { + autoConnect: false, + transports: ['websocket'], + forceNew: true, + auth, + ...socketIoOptions + }) + } this._socketIoOptions = socketIoOptions this.doc.on('update', this.onUpdateDoc) From 05dda36e50b1cf071379234f485cf51d3fa8cbf8 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 3 Sep 2025 01:28:50 +0800 Subject: [PATCH 5/9] fix: remove duplicated assignment --- src/y-socket-io/client.js | 1 - 1 file changed, 1 deletion(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index 1b87b06..a5d511f 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -172,7 +172,6 @@ export class SocketIOProvider extends Observable { ...socketIoOptions }) } - this._socketIoOptions = socketIoOptions this.doc.on('update', this.onUpdateDoc) From 2a0c086f9c90eb35a0e99bb6c736849c069ed1a0 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Wed, 3 Sep 2025 19:25:07 +0800 Subject: [PATCH 6/9] feat: handle resync & missing updates --- src/y-socket-io/client.js | 53 ++++++++++++++++++++++++++-------- src/y-socket-io/y-socket-io.js | 5 ++-- 2 files changed, 44 insertions(+), 14 deletions(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index a5d511f..26317fa 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -340,19 +340,24 @@ export class SocketIOProvider extends Observable { ) } if (resyncInterval > 0) { - this.resyncInterval = setInterval(() => { - if (this.socket.disconnected) return - this.socket.emit( - 'sync-step-1', - Y.encodeStateVector(this.doc), - (/** @type {Uint8Array} */ update) => { - Y.applyUpdate(this.doc, new Uint8Array(update), this) - } - ) - }, resyncInterval) + this.resyncInterval = setInterval(() => this.resync(), resyncInterval) } } + /** + * Resynchronize the document with the server by firing `sync-step-1`. + */ + resync () { + if (this.socket.disconnected) return + this.socket.emit( + 'sync-step-1', + Y.encodeStateVector(this.doc), + (/** @type {Uint8Array} */ update) => { + Y.applyUpdate(this.doc, new Uint8Array(update), this) + } + ) + } + /** * Disconnect provider's socket * @type {() => void} @@ -413,6 +418,11 @@ export class SocketIOProvider extends Observable { super.destroy() } + /** + * @type {number} + * @private + */ + _updateRetries = 0 /** * This function is executed when the document is updated, if the instance that * emit the change is not this, it emit the changes by socket and broadcast channel. @@ -421,9 +431,28 @@ export class SocketIOProvider extends Observable { * @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change. * @readonly */ - onUpdateDoc = (update, origin) => { + onUpdateDoc = async (update, origin) => { + if (this._updateRetries > 3) { + this._updateRetries = 0 + this.disconnect() + this.connect() + return + } + if (origin !== this) { - this.socket.emit('sync-update', update) + /** @type {boolean} */ + const ack = await Promise.race([ + new Promise((res) => this.socket.emit('sync-update', update, () => res(true))), + new Promise((res) => setTimeout(() => res(false), 3000)), + ]) + if (!ack) { + this._updateRetries++ + if (this.socket.disconnected) return + this.onUpdateDoc(update, origin) + return + } else { + this._updateRetries = 0 + } if (this.bcconnected) { bc.publish( this._broadcastChannel, diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 5a52b07..11bcf73 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -330,8 +330,8 @@ export class YSocketIO { /** @type {unknown} */ let prevMsg = null - socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => { - if (isDeepStrictEqual(update, prevMsg)) return + socket.on('sync-update', (/** @type {ArrayBuffer} */ update, /** @type {() => void} */ ack) => { + if (isDeepStrictEqual(update, prevMsg)) return ack() assert(this.client) const namespace = this.getNamespaceString(socket.nsp) const message = Buffer.from(update.slice(0, update.byteLength)) @@ -341,6 +341,7 @@ export class YSocketIO { 'index', Buffer.from(this.toRedis('sync-update', message)) ) + .then(() => ack()) .catch(console.error) prevMsg = update }) From 2dd52257aeae74eef60d7e74c5f775d254a689e7 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 4 Sep 2025 21:55:45 +0800 Subject: [PATCH 7/9] fix: update lockfile --- pnpm-lock.yaml | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 13f8261..dd26d29 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -25,10 +25,10 @@ importers: version: 0.5.1 y-protocols: specifier: ^1.0.6 - version: 1.0.6(yjs@13.6.18) + version: 1.0.6(yjs@13.6.27) yjs: - specifier: ^13.6.18 - version: 13.6.18 + specifier: ^13.6.27 + version: 13.6.27 optionalDependencies: minio: specifier: ^7.1.3 @@ -1285,6 +1285,11 @@ packages: resolution: {integrity: sha512-+bT2uH4E5LGE7h/n3evcS/sQlJXCpIp6ym8OWJ5eV6+67Dsql/LaaT7qJBAt2rzfoa/5QBGBhxDix1dMt2kQKQ==} engines: {node: '>= 0.8.0'} + lib0@0.2.114: + resolution: {integrity: sha512-gcxmNFzA4hv8UYi8j43uPlQ7CGcyMJ2KQb5kZASw6SnAKAf10hK12i2fjrS3Cl/ugZa5Ui6WwIu1/6MIXiHttQ==} + engines: {node: '>=16'} + hasBin: true + lib0@0.2.93: resolution: {integrity: sha512-M5IKsiFJYulS+8Eal8f+zAqf5ckm1vffW0fFDxfgxJ+uiVopvDdd3PxJmz0GsVi3YNO7QCFSq0nAsiDmNhLj9Q==} engines: {node: '>=16'} @@ -2013,8 +2018,8 @@ packages: resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==} engines: {node: '>=12'} - yjs@13.6.18: - resolution: {integrity: sha512-GBTjO4QCmv2HFKFkYIJl7U77hIB1o22vSCSQD1Ge8ZxWbIbn8AltI4gyXbtL+g5/GJep67HCMq3Y5AmNwDSyEg==} + yjs@13.6.27: + resolution: {integrity: sha512-OIDwaflOaq4wC6YlPBy2L6ceKeKuF7DeTxx+jPzv1FHn9tCZ0ZwSRnUBxD05E3yed46fv/FWJbvR+Ud7x0L7zw==} engines: {node: '>=16.0.0', npm: '>=8.0.0'} yocto-queue@0.1.0: @@ -3331,6 +3336,10 @@ snapshots: prelude-ls: 1.2.1 type-check: 0.4.0 + lib0@0.2.114: + dependencies: + isomorphic.js: 0.2.5 + lib0@0.2.93: dependencies: isomorphic.js: 0.2.5 @@ -4142,10 +4151,10 @@ snapshots: xmlhttprequest-ssl@2.1.1: {} - y-protocols@1.0.6(yjs@13.6.18): + y-protocols@1.0.6(yjs@13.6.27): dependencies: lib0: 0.2.93 - yjs: 13.6.18 + yjs: 13.6.27 y18n@5.0.8: {} @@ -4163,8 +4172,8 @@ snapshots: y18n: 5.0.8 yargs-parser: 21.1.1 - yjs@13.6.18: + yjs@13.6.27: dependencies: - lib0: 0.2.93 + lib0: 0.2.114 yocto-queue@0.1.0: {} From 71beace8d1da65f85788b91aa8cef3abe68f4673 Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Thu, 4 Sep 2025 21:59:09 +0800 Subject: [PATCH 8/9] fix: lint error --- src/server.js | 1 - src/y-socket-io/client.js | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/server.js b/src/server.js index 3bc4e30..7e5cd16 100644 --- a/src/server.js +++ b/src/server.js @@ -1,5 +1,4 @@ import * as env from 'lib0/environment' -import * as logging from 'lib0/logging' import * as jwt from 'lib0/crypto/jwt' import * as ecdsa from 'lib0/crypto/ecdsa' import * as json from 'lib0/json' diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index 26317fa..13ab0b6 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -142,7 +142,7 @@ export class SocketIOProvider extends Observable { resyncInterval = -1, disableBc = false, auth = {}, - socket, + socket } = {}, socketIoOptions = undefined ) { @@ -442,8 +442,8 @@ export class SocketIOProvider extends Observable { if (origin !== this) { /** @type {boolean} */ const ack = await Promise.race([ - new Promise((res) => this.socket.emit('sync-update', update, () => res(true))), - new Promise((res) => setTimeout(() => res(false), 3000)), + new Promise((resolve) => this.socket.emit('sync-update', update, () => resolve(true))), + new Promise((resolve) => setTimeout(() => resolve(false), 3000)) ]) if (!ack) { this._updateRetries++ From 77069c1fe7215f95057395bb21546cbce850196b Mon Sep 17 00:00:00 2001 From: stanley2058 Date: Fri, 5 Sep 2025 00:00:58 +0800 Subject: [PATCH 9/9] fix: type error --- src/api.js | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/api.js b/src/api.js index 37fbe85..d6e783a 100644 --- a/src/api.js +++ b/src/api.js @@ -1,5 +1,5 @@ import * as Y from 'yjs' -import * as redis from 'redis' +import { createClient, defineScript, commandOptions } from 'redis' import * as map from 'lib0/map' import * as decoding from 'lib0/decoding' import * as awarenessProtocol from 'y-protocols/awareness' @@ -97,7 +97,6 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar } export class Api { - /** @type {import('@redis/client').RedisClientType & { addMessage: (key: string, message: Buffer) => Promise, xDelIfEmpty: (key: string) => Promise }} */ redis /** * @param {import('./storage.js').AbstractStorage} store @@ -140,12 +139,11 @@ export class Api { redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL}) ` - /** @type {import('@redis/client').RedisClientType & { addMessage: (key: string, message: Buffer) => Promise, xDelIfEmpty: (key: string) => Promise }} */ - this.redis = redis.createClient({ + this.redis = createClient({ url, // scripting: https://github.com/redis/node-redis/#lua-scripts scripts: { - addMessage: redis.defineScript({ + addMessage: defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: addScript, /** @@ -162,7 +160,7 @@ export class Api { return x } }), - xDelIfEmpty: redis.defineScript({ + xDelIfEmpty: defineScript({ NUMBER_OF_KEYS: 1, SCRIPT: ` if redis.call("XLEN", KEYS[1]) == 0 then @@ -196,7 +194,7 @@ export class Api { return [] } const reads = await this.redis.xRead( - redis.commandOptions({ returnBuffers: true }), + commandOptions({ returnBuffers: true }), streams, { BLOCK: 1000, COUNT: 1000 } ) @@ -244,7 +242,7 @@ export class Api { * @param {string} docid */ async getDoc (room, docid) { - const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) + const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) const docMessages = ms.get(room)?.get(docid) || null if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`) const docstate = await this.store.retrieveDoc(room, docid) @@ -292,7 +290,7 @@ export class Api { * @param {string} docid */ async getRedisLastId (room, docid) { - const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) + const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix) const docMessages = ms.get(room)?.get(docid) || null return docMessages?.lastId.toString() || '0' } @@ -342,7 +340,6 @@ export class Api { const streamlen = await this.redis.xLen(task.stream) if (streamlen === 0) { await this.redis.multi() - // @ts-expect-error custom script on multi .xDelIfEmpty(task.stream) .xAck(this.redisWorkerStreamName, this.redisWorkerGroupName, task.id) .xDel(this.redisWorkerStreamName, task.id)