@@ -29,7 +29,8 @@ const depsTypes = (TYPES) => [
2929 TYPES . DataInserterFactory ,
3030 TYPES . Progress ,
3131 TYPES . SyncSchema ,
32- TYPES . SyncInterrupter
32+ TYPES . SyncInterrupter ,
33+ TYPES . Authenticator
3334]
3435class SyncQueue extends EventEmitter {
3536 constructor (
@@ -39,7 +40,8 @@ class SyncQueue extends EventEmitter {
3940 dataInserterFactory ,
4041 progress ,
4142 syncSchema ,
42- syncInterrupter
43+ syncInterrupter ,
44+ authenticator
4345 ) {
4446 super ( )
4547
@@ -50,6 +52,7 @@ class SyncQueue extends EventEmitter {
5052 this . progress = progress
5153 this . syncSchema = syncSchema
5254 this . syncInterrupter = syncInterrupter
55+ this . authenticator = authenticator
5356 this . name = this . TABLES_NAMES . SYNC_QUEUE
5457
5558 this . methodCollMap = this . _filterMethodCollMap (
@@ -85,8 +88,14 @@ class SyncQueue extends EventEmitter {
8588 } = params ?? { }
8689
8790 if (
88- ! Number . isInteger ( ownerUserId ) &&
89- ! isOwnerScheduler
91+ (
92+ ! Number . isInteger ( ownerUserId ) &&
93+ ! isOwnerScheduler
94+ ) ||
95+ (
96+ Number . isInteger ( ownerUserId ) &&
97+ isOwnerScheduler
98+ )
9099 ) {
91100 throw new SyncQueueOwnerSettingError ( )
92101 }
@@ -96,17 +105,36 @@ class SyncQueue extends EventEmitter {
96105 : [ syncColls ]
97106 checkCollPermission ( _syncColls , this . ALLOWED_COLLS )
98107
99- const ownerUserIdFilter = Number . isInteger ( ownerUserId )
100- ? { ownerUserId }
101- : { }
102- const isOwnerSchedulerFilter = isOwnerScheduler
103- ? { isOwnerScheduler : 1 }
104- : { }
108+ const signedInUserIds = this . #getSignedInUserIds( )
109+
110+ if (
111+ isOwnerScheduler &&
112+ signedInUserIds . length === 0
113+ ) {
114+ return
115+ }
116+
117+ const subQueryFilter = Number . isInteger ( ownerUserId )
118+ ? {
119+ $or : {
120+ $eq : {
121+ isOwnerScheduler : 1 ,
122+ ownerUserId
123+ }
124+ }
125+ }
126+ : {
127+ $or : {
128+ $eq : { isOwnerScheduler : 1 } ,
129+ $in : { ownerUserId : signedInUserIds }
130+ }
131+ }
105132
106133 const allSyncs = await this . _getAll ( {
107- state : [ NEW_JOB_STATE , ERROR_JOB_STATE ] ,
108- ...ownerUserIdFilter ,
109- ...isOwnerSchedulerFilter
134+ filter : {
135+ $in : { state : [ NEW_JOB_STATE , ERROR_JOB_STATE , LOCKED_JOB_STATE ] }
136+ } ,
137+ subQuery : { filter : subQueryFilter }
110138 } )
111139 const hasALLInDB = allSyncs . some ( item => {
112140 return item . collName === this . ALLOWED_COLLS . ALL
@@ -272,13 +300,16 @@ class SyncQueue extends EventEmitter {
272300 : { }
273301
274302 const futureSyncs = await this . _getAll ( {
275- state : [
276- NEW_JOB_STATE ,
277- LOCKED_JOB_STATE ,
278- ERROR_JOB_STATE
279- ] ,
280- ...ownerUserIdFilter
281- } , { limit : 10 } )
303+ filter : {
304+ state : [
305+ NEW_JOB_STATE ,
306+ LOCKED_JOB_STATE ,
307+ ERROR_JOB_STATE
308+ ] ,
309+ ...ownerUserIdFilter
310+ } ,
311+ limit : 10
312+ } )
282313
283314 const allSyncs = (
284315 ! Array . isArray ( futureSyncs ) ||
@@ -335,17 +366,20 @@ class SyncQueue extends EventEmitter {
335366 return ( 1 / multipliersSum ) * currMultipliers
336367 }
337368
338- _getAll ( filter , opts ) {
369+ _getAll ( params ) {
339370 const {
371+ filter,
372+ subQuery,
340373 sort = this . _sort ,
341- limit = null
342- } = opts ?? { }
374+ limit
375+ } = params ?? { }
343376
344377 return this . dao . getElemsInCollBy (
345378 this . name ,
346379 {
347- sort,
348380 filter,
381+ subQuery,
382+ sort,
349383 limit
350384 }
351385 )
@@ -441,6 +475,13 @@ class SyncQueue extends EventEmitter {
441475
442476 this . emit ( 'progress' , progress )
443477 }
478+
479+ #getSignedInUserIds ( ) {
480+ const sessions = this . authenticator . getUserSessions ( )
481+
482+ return [ ...sessions ]
483+ . map ( ( [ token , session ] ) => session . _id )
484+ }
444485}
445486
446487decorateInjectable ( SyncQueue , depsTypes )
0 commit comments