@@ -13,12 +13,14 @@ import { isDeepStrictEqual } from 'util'
1313import { User } from './user.js'
1414import { createModuleLogger } from 'lib0/logging'
1515import toobusy from 'toobusy-js'
16+ import { promiseWithResolvers } from './utils.js'
1617
1718const logSocketIO = createModuleLogger ( '@y/socket-io/server' )
1819const PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-persist-interval' ) || '3000' )
1920const MAX_PERSIST_INTERVAL = number . parseInt ( env . getConf ( 'y-socket-io-server-max-persist-interval' ) || '30000' )
2021const REVALIDATE_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-server-revalidate-timeout' ) || '60000' )
2122const WORKER_DISABLED = env . getConf ( 'y-worker-disabled' ) === 'true'
23+ const DEFAULT_CLEAR_TIMEOUT = number . parseInt ( env . getConf ( 'y-socket-io-default-clear-timeout' ) || '30000' )
2224
2325process . on ( 'SIGINT' , function ( ) {
2426 // calling .shutdown allows your process to exit normally
@@ -137,11 +139,17 @@ export class YSocketIO {
137139 */
138140 namespacePersistentMap = new Map ( )
139141 /**
140- * @type {Map<string, () => void> }
142+ * @type {Map<string, { promise: Promise<void>, resolve: () => void } > }
141143 * @private
142144 * @readonly
143145 */
144146 awaitingPersistMap = new Map ( )
147+ /**
148+ * @type {Map<string, NodeJS.Timeout> }
149+ * @private
150+ * @readonly
151+ */
152+ awaitingCleanupNamespace = new Map ( )
145153
146154 /**
147155 * YSocketIO constructor.
@@ -213,6 +221,12 @@ export class YSocketIO {
213221 'index' ,
214222 redisPrefix
215223 )
224+ const prevAwaitCleanup = this . awaitingCleanupNamespace . get ( namespace )
225+ if ( prevAwaitCleanup ) {
226+ clearTimeout ( prevAwaitCleanup )
227+ this . cleanupNamespace ( namespace , stream )
228+ }
229+
216230 if ( ! this . namespaceMap . has ( namespace ) ) {
217231 this . namespaceMap . set ( namespace , socket . nsp )
218232 }
@@ -346,13 +360,9 @@ export class YSocketIO {
346360 if ( ! ns ) continue
347361 const nsp = this . namespaceMap . get ( ns )
348362 if ( nsp ?. sockets . size === 0 && stream ) {
349- this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
350- this . namespaceStreamMap . delete ( ns )
351- this . streamNamespaceMap . delete ( stream )
352- this . namespaceMap . delete ( ns )
353- this . namespaceDocMap . get ( ns ) ?. ydoc . destroy ( )
354- this . namespaceDocMap . delete ( ns )
355- this . namespacePersistentMap . delete ( ns )
363+ this . cleanupNamespace ( ns , stream , DEFAULT_CLEAR_TIMEOUT )
364+ const doc = this . namespaceDocMap . get ( ns )
365+ if ( doc ) this . debouncedPersist ( ns , doc . ydoc , true )
356366 }
357367 }
358368 } )
@@ -398,18 +408,13 @@ export class YSocketIO {
398408 * @param {Array<Uint8Array> } messages
399409 */
400410 redisMessageSubscriber = async ( stream , messages ) => {
411+ console . log ( '[DEBUG]' , { stream, messages } )
401412 const namespace = this . streamNamespaceMap . get ( stream )
402413 if ( ! namespace ) return
403414 const nsp = this . namespaceMap . get ( namespace )
404415 if ( ! nsp ) return
405416 if ( nsp . sockets . size === 0 && this . subscriber ) {
406- this . subscriber . unsubscribe ( stream , this . redisMessageSubscriber )
407- this . namespaceStreamMap . delete ( namespace )
408- this . streamNamespaceMap . delete ( stream )
409- this . namespaceMap . delete ( namespace )
410- this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
411- this . namespaceDocMap . delete ( namespace )
412- this . namespacePersistentMap . delete ( namespace )
417+ this . cleanupNamespace ( namespace , stream , DEFAULT_CLEAR_TIMEOUT )
413418 }
414419
415420 /** @type {Uint8Array[] } */
@@ -463,9 +468,9 @@ export class YSocketIO {
463468 const lastPersistCalledAt = this . namespacePersistentMap . get ( namespace ) ?? 0
464469 const now = Date . now ( )
465470 const shouldPersist = now - lastPersistCalledAt > MAX_PERSIST_INTERVAL
466- if ( changed || shouldPersist ) {
471+ if ( changed || shouldPersist || nsp . sockets . size === 0 ) {
467472 this . namespacePersistentMap . set ( namespace , now )
468- this . debouncedPersist ( namespace , doc . ydoc )
473+ this . debouncedPersist ( namespace , doc . ydoc , nsp . sockets . size === 0 )
469474 }
470475 this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
471476 this . namespaceDocMap . set ( namespace , doc )
@@ -474,47 +479,50 @@ export class YSocketIO {
474479 /**
475480 * @param {string } namespace
476481 * @param {Y.Doc } doc
482+ * @param {boolean= } immediate
477483 */
478- async debouncedPersist ( namespace , doc ) {
484+ debouncedPersist ( namespace , doc , immediate = false ) {
479485 this . debouncedPersistDocMap . set ( namespace , doc )
480- if ( this . debouncedPersistMap . has ( namespace ) ) return
486+ if ( this . debouncedPersistMap . has ( namespace ) ) {
487+ if ( ! immediate ) return
488+ clearTimeout ( this . debouncedPersistMap . get ( namespace ) || undefined )
489+ }
490+ const timeoutInterval = immediate
491+ ? 0
492+ : PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
481493 const timeout = setTimeout (
482494 async ( ) => {
483495 try {
484496 assert ( this . client )
485497 const doc = this . debouncedPersistDocMap . get ( namespace )
486498 logSocketIO ( `trying to persist ${ namespace } ` )
487499 if ( ! doc ) return
488- /** @type {Promise<void> | null } */
489- let workerPromise = null
490500 if ( this . client . persistWorker ) {
491- workerPromise = new Promise ( ( resolve ) => {
492- assert ( this . client ?. persistWorker )
493- this . awaitingPersistMap . set ( namespace , resolve )
494-
495- const docState = Y . encodeStateAsUpdateV2 ( doc )
496- const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
497- buf . set ( docState )
498- this . client . persistWorker . postMessage ( {
499- room : namespace ,
500- docstate : buf
501- } )
501+ /** @type { ReturnType<typeof promiseWithResolvers<void>> } */
502+ const { promise , resolve } = promiseWithResolvers ( )
503+ assert ( this . client ?. persistWorker )
504+ this . awaitingPersistMap . set ( namespace , { promise , resolve } )
505+
506+ const docState = Y . encodeStateAsUpdateV2 ( doc )
507+ const buf = new Uint8Array ( new SharedArrayBuffer ( docState . length ) )
508+ buf . set ( docState )
509+ this . client . persistWorker . postMessage ( {
510+ room : namespace ,
511+ docstate : buf
502512 } )
503- if ( workerPromise ) {
504- await workerPromise
505- }
513+ await promise
506514 } else {
507515 await this . client . store . persistDoc ( namespace , 'index' , doc )
508516 }
509- await this . client . trimRoomStream ( namespace , 'index' , true )
517+ await this . client . trimRoomStream ( namespace , 'index' )
510518 } catch ( e ) {
511519 console . error ( e )
512520 } finally {
513521 this . debouncedPersistDocMap . delete ( namespace )
514522 this . debouncedPersistMap . delete ( namespace )
515523 }
516524 } ,
517- PERSIST_INTERVAL + ( Math . random ( ) - 0.5 ) * PERSIST_INTERVAL
525+ timeoutInterval
518526 )
519527
520528 this . debouncedPersistMap . set ( namespace , timeout )
@@ -608,7 +616,45 @@ export class YSocketIO {
608616 registerPersistWorkerResolve ( ) {
609617 if ( ! this . client ?. persistWorker ) return
610618 this . client . persistWorker . on ( 'message' , ( { event, room } ) => {
611- if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. ( )
619+ if ( event === 'persisted' ) this . awaitingPersistMap . get ( room ) ?. resolve ( )
612620 } )
613621 }
622+
623+ /**
624+ * @param {string } namespace
625+ * @param {string } stream
626+ * @param {number= } removeAfterWait
627+ */
628+ cleanupNamespace ( namespace , stream , removeAfterWait ) {
629+ if ( ! removeAfterWait ) {
630+ this . awaitingCleanupNamespace . delete ( namespace )
631+ return this . cleanupNamespaceImpl ( namespace , stream )
632+ }
633+ if ( this . awaitingCleanupNamespace . has ( namespace ) ) return
634+
635+ const timer = setTimeout ( async ( ) => {
636+ const awaitingPersist = this . awaitingPersistMap . get ( namespace )
637+ if ( awaitingPersist ) await awaitingPersist . promise
638+ this . cleanupNamespaceImpl ( namespace , stream )
639+ this . awaitingCleanupNamespace . delete ( namespace )
640+ logSocketIO ( `no active connection, namespace: ${ namespace } cleared` )
641+ } , removeAfterWait )
642+ this . awaitingCleanupNamespace . set ( namespace , timer )
643+ }
644+
645+ /**
646+ * @param {string } namespace
647+ * @param {string } stream
648+ * @private
649+ */
650+ cleanupNamespaceImpl ( namespace , stream ) {
651+ this . subscriber ?. unsubscribe ( stream , this . redisMessageSubscriber )
652+ this . namespaceStreamMap . delete ( namespace )
653+ this . streamNamespaceMap . delete ( stream )
654+ this . namespaceMap . delete ( namespace )
655+ this . namespaceDocMap . get ( namespace ) ?. ydoc . destroy ( )
656+ this . namespaceDocMap . delete ( namespace )
657+ this . namespacePersistentMap . delete ( namespace )
658+ this . client ?. trimRoomStream ( namespace , 'index' , true )
659+ }
614660}
0 commit comments