Skip to content

Commit 650ba00

Browse files
committed
wip: more async of observeChanges and observe
1 parent 1d68ba3 commit 650ba00

File tree

9 files changed

+181
-38
lines changed

9 files changed

+181
-38
lines changed

meteor/server/__tests__/_testEnvironment.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ describe('Basic test of test environment', () => {
153153
const studios = await Studios.findFetchAsync({})
154154
expect(studios).toHaveLength(1)
155155

156-
const observer = Studios.observeChanges({ _id: protectString('abc') }, {})
156+
const observer = await Studios.observeChanges({ _id: protectString('abc') }, {})
157157
expect(observer).toBeTruthy()
158158

159159
await Studios.insertAsync({

meteor/server/publications/lib/__tests__/observerGroup.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ describe('ReactiveMongoObserverGroup', () => {
1010

1111
test('cleanup on stop', async () => {
1212
const handle: LiveQueryHandle = { stop: jest.fn() }
13-
const generator = jest.fn(async () => [handle])
13+
const generator = jest.fn(async () => [Promise.resolve(handle)])
1414

1515
const observerGroup = await ReactiveMongoObserverGroup(generator)
1616

@@ -39,7 +39,7 @@ describe('ReactiveMongoObserverGroup', () => {
3939

4040
test('restarting', async () => {
4141
const handle: LiveQueryHandle = { stop: jest.fn() }
42-
const generator = jest.fn(async () => [handle])
42+
const generator = jest.fn(async () => [Promise.resolve(handle)])
4343

4444
const observerGroup = await ReactiveMongoObserverGroup(generator)
4545

@@ -80,7 +80,7 @@ describe('ReactiveMongoObserverGroup', () => {
8080

8181
test('restart debounce', async () => {
8282
const handle: LiveQueryHandle = { stop: jest.fn() }
83-
const generator = jest.fn(async () => [handle])
83+
const generator = jest.fn(async () => [Promise.resolve(handle)])
8484

8585
const observerGroup = await ReactiveMongoObserverGroup(generator)
8686

meteor/server/publications/lib/__tests__/rundownsObserver.test.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ describe('RundownsObserver', () => {
2020
const playlistId = protectString<RundownPlaylistId>('playlist0')
2121

2222
const onChangedCleanup = jest.fn()
23-
const onChanged = jest.fn(() => onChangedCleanup)
23+
const onChanged = jest.fn(async () => onChangedCleanup)
2424

2525
// should not be any observers yet
2626
expect(RundownsMock.observers).toHaveLength(0)
2727

28-
const observer = new RundownsObserver(studioId, playlistId, onChanged)
28+
const observer = await RundownsObserver.create(studioId, playlistId, onChanged)
2929
try {
3030
// should now be an observer
3131
expect(RundownsMock.observers).toHaveLength(1)
@@ -73,12 +73,12 @@ describe('RundownsObserver', () => {
7373
const playlistId = protectString<RundownPlaylistId>('playlist0')
7474

7575
const onChangedCleanup = jest.fn()
76-
const onChanged = jest.fn<() => void, [RundownId[]]>(() => onChangedCleanup)
76+
const onChanged = jest.fn<Promise<() => void>, [RundownId[]]>(async () => onChangedCleanup)
7777

7878
// should not be any observers yet
7979
expect(RundownsMock.observers).toHaveLength(0)
8080

81-
const observer = new RundownsObserver(studioId, playlistId, onChanged)
81+
const observer = await RundownsObserver.create(studioId, playlistId, onChanged)
8282
try {
8383
// ensure starts correct
8484
await waitUntil(async () => {
@@ -127,12 +127,12 @@ describe('RundownsObserver', () => {
127127
const playlistId = protectString<RundownPlaylistId>('playlist0')
128128

129129
const onChangedCleanup = jest.fn()
130-
const onChanged = jest.fn<() => void, [RundownId[]]>(() => onChangedCleanup)
130+
const onChanged = jest.fn<Promise<() => void>, [RundownId[]]>(async () => onChangedCleanup)
131131

132132
// should not be any observers yet
133133
expect(RundownsMock.observers).toHaveLength(0)
134134

135-
const observer = new RundownsObserver(studioId, playlistId, onChanged)
135+
const observer = await RundownsObserver.create(studioId, playlistId, onChanged)
136136
try {
137137
// ensure starts correct
138138
// ensure starts correct
@@ -181,12 +181,12 @@ describe('RundownsObserver', () => {
181181
const playlistId = protectString<RundownPlaylistId>('playlist0')
182182

183183
const onChangedCleanup = jest.fn()
184-
const onChanged = jest.fn<() => void, [RundownId[]]>(() => onChangedCleanup)
184+
const onChanged = jest.fn<Promise<() => void>, [RundownId[]]>(async () => onChangedCleanup)
185185

186186
// should not be any observers yet
187187
expect(RundownsMock.observers).toHaveLength(0)
188188

189-
const observer = new RundownsObserver(studioId, playlistId, onChanged)
189+
const observer = await RundownsObserver.create(studioId, playlistId, onChanged)
190190
try {
191191
// ensure starts correct
192192
// ensure starts correct
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { Meteor } from 'meteor/meteor'
2+
3+
/**
4+
* Based on https://github.com/sindresorhus/p-debounce
5+
* With additional features:
6+
* - `cancel` method
7+
*/
8+
export class PromiseDebounce<TResult> {
9+
readonly #fn: () => Promise<TResult>
10+
readonly #wait: number
11+
12+
/** If an execution timeout has passed while */
13+
#isPending = false
14+
#timeout: number | undefined
15+
16+
#isExecuting = false
17+
#waitingListeners: Listener<TResult>[] = []
18+
19+
constructor(fn: () => Promise<TResult>, wait: number) {
20+
this.#fn = fn
21+
this.#wait = wait
22+
}
23+
24+
/**
25+
* Trigger an execution, and get the result.
26+
* @returns A promise that resolves with the result of the function
27+
*/
28+
async call(): Promise<TResult> {
29+
return new Promise<TResult>((resolve, reject) => {
30+
const listener: Listener<TResult> = { resolve, reject }
31+
this.#waitingListeners.push(listener)
32+
33+
// Trigger an execution
34+
this.trigger()
35+
})
36+
}
37+
38+
/**
39+
* Trigger an execution, but don't report the result.
40+
*/
41+
trigger(): void {
42+
// If an execution is 'imminent', don't do anything
43+
if (this.#isPending) return
44+
45+
// Clear an existing timeout
46+
if (this.#timeout) Meteor.clearTimeout(this.#timeout)
47+
48+
// Start a new one
49+
this.#timeout = Meteor.setTimeout(() => {
50+
this.#timeout = undefined
51+
52+
this.#executeFn()
53+
}, this.#wait)
54+
}
55+
56+
#executeFn(): void {
57+
// If an execution is still in progress, mark as pending and stop
58+
if (this.#isExecuting) {
59+
this.#isPending = true
60+
return
61+
}
62+
63+
// We have the clear to begin executing
64+
this.#isExecuting = true
65+
this.#isPending = false
66+
67+
// Collect up the listeners for this execution
68+
const listeners = this.#waitingListeners
69+
this.#waitingListeners = []
70+
71+
this.#fn()
72+
.then((result) => {
73+
for (const listener of listeners) {
74+
try {
75+
listener.resolve(result)
76+
} catch (e) {
77+
// TODO - error?
78+
}
79+
}
80+
})
81+
.catch((error) => {
82+
for (const listener of listeners) {
83+
try {
84+
listener.reject(error)
85+
} catch (e) {
86+
// TODO - error?
87+
}
88+
}
89+
})
90+
.finally(() => {
91+
this.#isExecuting = false
92+
93+
// If there is a pending execution, run that soon
94+
if (this.#isPending) {
95+
Meteor.setTimeout(() => this.#executeFn(), 0)
96+
}
97+
})
98+
}
99+
100+
/**
101+
* Cancel any waiting execution
102+
*/
103+
cancelWaiting(error?: Error): void {
104+
this.#isPending = false
105+
106+
if (this.#timeout) {
107+
Meteor.clearTimeout(this.#timeout)
108+
this.#timeout = undefined
109+
}
110+
111+
// Inform any listeners
112+
if (this.#waitingListeners.length > 0) {
113+
const listeners = this.#waitingListeners
114+
this.#waitingListeners = []
115+
116+
error = error ?? new Error('Cancelled')
117+
118+
for (const listener of listeners) {
119+
try {
120+
listener.reject(error)
121+
} catch (e) {
122+
// TODO - error?
123+
}
124+
}
125+
}
126+
}
127+
}
128+
129+
interface Listener<TResult> {
130+
resolve: (value: TResult) => void
131+
reject: (reason?: any) => void
132+
}

meteor/server/publications/lib/rundownsObserver.ts

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Meteor } from 'meteor/meteor'
22
import { RundownId, RundownPlaylistId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
3-
import _ from 'underscore'
43
import { Rundowns } from '../../collections'
4+
import { PromiseDebounce } from './debounce'
55

66
const REACTIVITY_DEBOUNCE = 20
77

@@ -12,30 +12,53 @@ type ChangedHandler = (rundownIds: RundownId[]) => Promise<() => void>
1212
* Note: Updates are debounced to avoid rapid updates firing
1313
*/
1414
export class RundownsObserver implements Meteor.LiveQueryHandle {
15-
#rundownsLiveQuery: Meteor.LiveQueryHandle
15+
#rundownsLiveQuery!: Meteor.LiveQueryHandle
1616
#rundownIds: Set<RundownId> = new Set<RundownId>()
1717
#changed: ChangedHandler | undefined
1818
#cleanup: (() => void) | undefined
1919

20-
constructor(studioId: StudioId, playlistId: RundownPlaylistId, onChanged: ChangedHandler) {
20+
readonly #triggerUpdateRundownContent = new PromiseDebounce<void>(async () => {
21+
if (!this.#changed) return
22+
this.#cleanup?.()
23+
24+
const changed = this.#changed
25+
this.#cleanup = await changed(this.rundownIds)
26+
}, REACTIVITY_DEBOUNCE)
27+
28+
private constructor(onChanged: ChangedHandler) {
2129
this.#changed = onChanged
22-
this.#rundownsLiveQuery = Rundowns.observe(
30+
}
31+
32+
static async create(
33+
studioId: StudioId,
34+
playlistId: RundownPlaylistId,
35+
onChanged: ChangedHandler
36+
): Promise<RundownsObserver> {
37+
const observer = new RundownsObserver(onChanged)
38+
39+
await observer.init(studioId, playlistId)
40+
41+
return observer
42+
}
43+
44+
private async init(studioId: StudioId, playlistId: RundownPlaylistId) {
45+
this.#rundownsLiveQuery = await Rundowns.observe(
2346
{
2447
playlistId,
2548
studioId,
2649
},
2750
{
2851
added: (doc) => {
2952
this.#rundownIds.add(doc._id)
30-
this.triggerUpdateRundownContent()
53+
this.#triggerUpdateRundownContent.trigger()
3154
},
3255
changed: (doc) => {
3356
this.#rundownIds.add(doc._id)
34-
this.triggerUpdateRundownContent()
57+
this.#triggerUpdateRundownContent.trigger()
3558
},
3659
removed: (doc) => {
3760
this.#rundownIds.delete(doc._id)
38-
this.triggerUpdateRundownContent()
61+
this.#triggerUpdateRundownContent.trigger()
3962
},
4063
},
4164
{
@@ -44,28 +67,16 @@ export class RundownsObserver implements Meteor.LiveQueryHandle {
4467
},
4568
}
4669
)
47-
this.triggerUpdateRundownContent()
70+
71+
this.#triggerUpdateRundownContent.trigger()
4872
}
4973

5074
public get rundownIds(): RundownId[] {
5175
return Array.from(this.#rundownIds)
5276
}
5377

54-
private innerUpdateRundownContent = () => {
55-
if (!this.#changed) return
56-
this.#cleanup?.()
57-
58-
const changed = this.#changed
59-
this.#cleanup = changed(this.rundownIds)
60-
}
61-
62-
private triggerUpdateRundownContent = _.debounce(
63-
Meteor.bindEnvironment(this.innerUpdateRundownContent),
64-
REACTIVITY_DEBOUNCE
65-
)
66-
6778
public stop = (): void => {
68-
this.triggerUpdateRundownContent.cancel()
79+
this.#triggerUpdateRundownContent.cancelWaiting()
6980
this.#rundownsLiveQuery.stop()
7081
this.#changed = undefined
7182
this.#cleanup?.()

meteor/server/publications/partInstancesUI/publication.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ async function setupUIPartInstancesPublicationObservers(
7474
)) as Pick<DBRundownPlaylist, RundownPlaylistFields> | undefined
7575
if (!playlist) throw new Error(`RundownPlaylist with activationId="${args.playlistActivationId}" not found!`)
7676

77-
const rundownsObserver = new RundownsObserver(playlist.studioId, playlist._id, async (rundownIds) => {
77+
const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => {
7878
logger.silly(`Creating new RundownContentObserver`)
7979

8080
const cache = createReactiveContentCache()

meteor/server/publications/partsUI/publication.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async function setupUIPartsPublicationObservers(
5959
})) as Pick<DBRundownPlaylist, RundownPlaylistFields> | undefined
6060
if (!playlist) throw new Error(`RundownPlaylist "${args.playlistId}" not found!`)
6161

62-
const rundownsObserver = new RundownsObserver(playlist.studioId, playlist._id, async (rundownIds) => {
62+
const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => {
6363
logger.silly(`Creating new RundownContentObserver`)
6464

6565
const cache = createReactiveContentCache()

meteor/server/publications/pieceContentStatusUI/rundown/publication.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ async function setupUIPieceContentStatusesPublicationObservers(
122122
})) as Pick<DBRundownPlaylist, RundownPlaylistFields> | undefined
123123
if (!playlist) throw new Error(`RundownPlaylist "${args.rundownPlaylistId}" not found!`)
124124

125-
const rundownsObserver = new RundownsObserver(playlist.studioId, playlist._id, async (rundownIds) => {
125+
const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => {
126126
logger.silly(`Creating new RundownContentObserver`)
127127

128128
// TODO - can this be done cheaper?

meteor/server/publications/segmentPartNotesUI/publication.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ async function setupUISegmentPartNotesPublicationObservers(
6666
})) as Pick<DBRundownPlaylist, RundownPlaylistFields> | undefined
6767
if (!playlist) throw new Error(`RundownPlaylist "${args.playlistId}" not found!`)
6868

69-
const rundownsObserver = new RundownsObserver(playlist.studioId, playlist._id, async (rundownIds) => {
69+
const rundownsObserver = await RundownsObserver.create(playlist.studioId, playlist._id, async (rundownIds) => {
7070
logger.silly(`Creating new RundownContentObserver`)
7171

7272
// TODO - can this be done cheaper?

0 commit comments

Comments
 (0)