Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified bin/worker.js
100644 → 100755
Empty file.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
"socket.io-client": "^4.8.0",
"toobusy-js": "^0.5.1",
"y-protocols": "^1.0.6",
"yjs": "^13.6.18"
"yjs": "^13.6.27"
},
"optionalDependencies": {
"minio": "^7.1.3",
Expand Down
27 changes: 18 additions & 9 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions src/api.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Y from 'yjs'
import * as redis from 'redis'
import { createClient, defineScript, commandOptions } from 'redis'
import * as map from 'lib0/map'
import * as decoding from 'lib0/decoding'
import * as awarenessProtocol from 'y-protocols/awareness'
Expand Down Expand Up @@ -97,6 +97,7 @@ export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwar
}

export class Api {
redis
/**
* @param {import('./storage.js').AbstractStorage} store
* @param {string=} prefix
Expand Down Expand Up @@ -138,11 +139,11 @@ export class Api {
redis.call("EXPIRE", KEYS[1], ${ROOM_STREAM_TTL})
`

this.redis = redis.createClient({
this.redis = createClient({
url,
// scripting: https://github.com/redis/node-redis/#lua-scripts
scripts: {
addMessage: redis.defineScript({
addMessage: defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: addScript,
/**
Expand All @@ -159,7 +160,7 @@ export class Api {
return x
}
}),
xDelIfEmpty: redis.defineScript({
xDelIfEmpty: defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT: `
if redis.call("XLEN", KEYS[1]) == 0 then
Expand Down Expand Up @@ -193,7 +194,7 @@ export class Api {
return []
}
const reads = await this.redis.xRead(
redis.commandOptions({ returnBuffers: true }),
commandOptions({ returnBuffers: true }),
streams,
{ BLOCK: 1000, COUNT: 1000 }
)
Expand Down Expand Up @@ -241,7 +242,7 @@ export class Api {
* @param {string} docid
*/
async getDoc (room, docid) {
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
const docMessages = ms.get(room)?.get(docid) || null
if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`)
const docstate = await this.store.retrieveDoc(room, docid)
Expand Down Expand Up @@ -289,7 +290,7 @@ export class Api {
* @param {string} docid
*/
async getRedisLastId (room, docid) {
const ms = extractMessagesFromStreamReply(await this.redis.xRead(redis.commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
const ms = extractMessagesFromStreamReply(await this.redis.xRead(commandOptions({ returnBuffers: true }), { key: computeRedisRoomStreamName(room, docid, this.prefix), id: '0' }), this.prefix)
const docMessages = ms.get(room)?.get(docid) || null
return docMessages?.lastId.toString() || '0'
}
Expand Down Expand Up @@ -370,6 +371,7 @@ export class Api {
// call YDOC_UPDATE_CALLBACK here
const formData = new FormData()
// @todo only convert ydoc to updatev2 once
// @ts-ignore
formData.append('ydoc', new Blob([Y.encodeStateAsUpdateV2(ydoc)]))
// @todo should add a timeout to fetch (see fetch signal abortcontroller)
const res = await fetch(new URL(room, ydocUpdateCallback), { body: formData, method: 'PUT' })
Expand Down
11 changes: 7 additions & 4 deletions src/server.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import * as env from 'lib0/environment'
import * as logging from 'lib0/logging'
import * as jwt from 'lib0/crypto/jwt'
import * as ecdsa from 'lib0/crypto/ecdsa'
import * as json from 'lib0/json'
Expand Down Expand Up @@ -50,8 +49,12 @@ export const createYSocketIOServer = async ({
}
})

httpServer.listen(port, undefined, undefined, () => {
logging.print(logging.GREEN, '[y-redis] Listening to port ', port)
})
httpServer.listen(port, undefined, undefined)

const oriDestroy = server.destroy
server.destroy = async () => {
await oriDestroy.bind(server)()
await new Promise((resolve) => httpServer.close(resolve))
}
return server
}
78 changes: 57 additions & 21 deletions src/y-socket-io/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ import { io } from 'socket.io-client'
*
* @prop {Record<string, unknown>=} auth
* (Optional) Add the authentication data
*
* @prop {ClientSocket=} socket
* (Optional) Supply custom socket.io client socket. If supplied, `socketIoOptions` will be ignored.
*/

/**
Expand Down Expand Up @@ -138,7 +141,8 @@ export class SocketIOProvider extends Observable {
awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined,
resyncInterval = -1,
disableBc = false,
auth = {}
auth = {},
socket
} = {},
socketIoOptions = undefined
) {
Expand All @@ -157,14 +161,17 @@ export class SocketIOProvider extends Observable {
this.disableBc = disableBc
this._socketIoOptions = socketIoOptions

this.socket = io(`${this.url}/yjs|${roomName}`, {
autoConnect: false,
transports: ['websocket'],
forceNew: true,
auth,
...socketIoOptions
})
this._socketIoOptions = socketIoOptions
if (socket) {
this.socket = socket
} else {
this.socket = io(`${this.url}/yjs|${roomName}`, {
autoConnect: false,
transports: ['websocket'],
forceNew: true,
auth,
...socketIoOptions
})
}

this.doc.on('update', this.onUpdateDoc)

Expand Down Expand Up @@ -333,19 +340,24 @@ export class SocketIOProvider extends Observable {
)
}
if (resyncInterval > 0) {
this.resyncInterval = setInterval(() => {
if (this.socket.disconnected) return
this.socket.emit(
'sync-step-1',
Y.encodeStateVector(this.doc),
(/** @type {Uint8Array} */ update) => {
Y.applyUpdate(this.doc, new Uint8Array(update), this)
}
)
}, resyncInterval)
this.resyncInterval = setInterval(() => this.resync(), resyncInterval)
}
}

/**
* Resynchronize the document with the server by firing `sync-step-1`.
*/
resync () {
if (this.socket.disconnected) return
this.socket.emit(
'sync-step-1',
Y.encodeStateVector(this.doc),
(/** @type {Uint8Array} */ update) => {
Y.applyUpdate(this.doc, new Uint8Array(update), this)
}
)
}

/**
* Disconnect provider's socket
* @type {() => void}
Expand Down Expand Up @@ -406,6 +418,11 @@ export class SocketIOProvider extends Observable {
super.destroy()
}

/**
* @type {number}
* @private
*/
_updateRetries = 0
/**
* This function is executed when the document is updated, if the instance that
* emit the change is not this, it emit the changes by socket and broadcast channel.
Expand All @@ -414,9 +431,28 @@ export class SocketIOProvider extends Observable {
* @param {SocketIOProvider} origin The SocketIOProvider instance that emits the change.
* @readonly
*/
onUpdateDoc = (update, origin) => {
onUpdateDoc = async (update, origin) => {
if (this._updateRetries > 3) {
this._updateRetries = 0
this.disconnect()
this.connect()
return
}

if (origin !== this) {
this.socket.emit('sync-update', update)
/** @type {boolean} */
const ack = await Promise.race([
new Promise((resolve) => this.socket.emit('sync-update', update, () => resolve(true))),
new Promise((resolve) => setTimeout(() => resolve(false), 3000))
])
if (!ack) {
this._updateRetries++
if (this.socket.disconnected) return
this.onUpdateDoc(update, origin)
return
} else {
this._updateRetries = 0
}
if (this.bcconnected) {
bc.publish(
this._broadcastChannel,
Expand Down
15 changes: 11 additions & 4 deletions src/y-socket-io/y-socket-io.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { User } from './user.js'
import { createModuleLogger } from 'lib0/logging'
import toobusy from 'toobusy-js'
import { promiseWithResolvers } from './utils.js'
import { ClientClosedError } from 'redis'

const logSocketIO = createModuleLogger('@y/socket-io/server')
const PERSIST_INTERVAL = number.parseInt(env.getConf('y-socket-io-server-persist-interval') || '3000')
Expand All @@ -22,6 +23,7 @@ const REVALIDATE_TIMEOUT = number.parseInt(env.getConf('y-socket-io-server-reval
const WORKER_DISABLED = env.getConf('y-worker-disabled') === 'true'
const DEFAULT_CLEAR_TIMEOUT = number.parseInt(env.getConf('y-socket-io-default-clear-timeout') || '30000')
const WORKER_HEALTH_CHECK_INTERVAL = number.parseInt(env.getConf('y-socket-io-worker-health-check-interval') || '5000')
const NEVER_REJECT_CONNECTION = env.getConf('y-socket-io-never-reject-connection') === 'true'

process.on('SIGINT', function () {
// calling .shutdown allows your process to exit normally
Expand Down Expand Up @@ -232,7 +234,7 @@ export class YSocketIO {
assert(this.client)
assert(this.subscriber)
const namespace = this.getNamespaceString(socket.nsp)
if (toobusy()) {
if (!NEVER_REJECT_CONNECTION && toobusy()) {
logSocketIO(`warning server too busy, rejecting connection: ${namespace}`)
// wait a bit to prevent client reconnect too fast
await promise.wait(100)
Expand Down Expand Up @@ -328,8 +330,8 @@ export class YSocketIO {

/** @type {unknown} */
let prevMsg = null
socket.on('sync-update', (/** @type {ArrayBuffer} */ update) => {
if (isDeepStrictEqual(update, prevMsg)) return
socket.on('sync-update', (/** @type {ArrayBuffer} */ update, /** @type {() => void} */ ack) => {
if (isDeepStrictEqual(update, prevMsg)) return ack()
assert(this.client)
const namespace = this.getNamespaceString(socket.nsp)
const message = Buffer.from(update.slice(0, update.byteLength))
Expand All @@ -339,6 +341,7 @@ export class YSocketIO {
'index',
Buffer.from(this.toRedis('sync-update', message))
)
.then(() => ack())
.catch(console.error)
prevMsg = update
})
Expand Down Expand Up @@ -392,6 +395,7 @@ export class YSocketIO {
this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT)
if (this.namespaceDocMap.has(ns)) this.debouncedPersist(ns, true)
}
logSocketIO(`disconnecting socket in ${ns}, ${nsp?.sockets.size || 0} remaining`)
}
})
socket.onAnyOutgoing(async (ev) => {
Expand Down Expand Up @@ -562,7 +566,10 @@ export class YSocketIO {

await this.client.trimRoomStream(namespace, 'index')
} catch (e) {
console.error(e)
// suppress redis client closed error
if (!(e instanceof ClientClosedError)) {
console.error(e)
}
}
},
timeoutInterval
Expand Down