Skip to content

Commit 5067d60

Browse files
committed
wip: tidying and potential perf
1 parent 8fb0383 commit 5067d60

File tree

5 files changed

+44
-52
lines changed

5 files changed

+44
-52
lines changed

meteor/server/collections/collection.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,8 @@ export interface AsyncOnlyReadOnlyMongoCollection<DBInterface extends { _id: Pro
249249
observeChanges(
250250
selector: MongoQuery<DBInterface> | DBInterface['_id'],
251251
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
252-
options?: FindOptions<DBInterface>
252+
findOptions?: FindOptions<DBInterface>,
253+
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
253254
): Promise<Meteor.LiveQueryHandle>
254255

255256
/**

meteor/server/collections/implementations/asyncCollection.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
137137
async observeChanges(
138138
selector: MongoQuery<DBInterface> | DBInterface['_id'],
139139
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
140-
options?: FindOptions<DBInterface>
140+
findOptions?: FindOptions<DBInterface>,
141+
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
141142
): Promise<Meteor.LiveQueryHandle> {
142143
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
143144
if (span) {
@@ -148,8 +149,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
148149
}
149150
try {
150151
const res = await this._collection
151-
.find((selector ?? {}) as any, options as any)
152-
.observeChangesAsync(callbacks)
152+
.find((selector ?? {}) as any, findOptions as any)
153+
.observeChangesAsync(callbacks, callbackOptions)
153154
if (span) span.end()
154155
return res
155156
} catch (e) {

meteor/server/publications/ingestStatus/publication.ts

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -54,35 +54,36 @@ async function setupIngestRundownStatusPublicationObservers(
5454
const contentObserver = await RundownContentObserver.create(rundownIds, cache)
5555

5656
const innerQueries = [
57-
cache.Playlists.find({}).observeChanges({
58-
added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
59-
changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
60-
removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
61-
}),
62-
cache.Rundowns.find({}).observe({
63-
added: (doc) => {
64-
triggerUpdate({ invalidateRundownIds: [doc._id] })
65-
contentObserver.checkPlaylistIds()
66-
},
67-
changed: (doc) => {
68-
triggerUpdate({ invalidateRundownIds: [doc._id] })
69-
contentObserver.checkPlaylistIds()
57+
cache.Playlists.find({}).observeChanges(
58+
{
59+
added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
60+
changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
61+
removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
7062
},
71-
removed: (doc) => {
72-
triggerUpdate({ invalidateRundownIds: [doc._id] })
73-
contentObserver.checkPlaylistIds()
63+
{ nonMutatingCallbacks: true }
64+
),
65+
cache.Rundowns.find({}).observeChanges(
66+
{
67+
added: (docId) => {
68+
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
69+
contentObserver.checkPlaylistIds()
70+
},
71+
changed: (docId) => {
72+
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
73+
contentObserver.checkPlaylistIds()
74+
},
75+
removed: (docId) => {
76+
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
77+
contentObserver.checkPlaylistIds()
78+
},
7479
},
75-
}),
80+
{ nonMutatingCallbacks: true }
81+
),
7682
cache.Parts.find({}).observe({
7783
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
7884
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
7985
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
8086
}),
81-
// cache.Segments.find({}).observe({
82-
// added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
83-
// changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
84-
// removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
85-
// }),
8687
cache.PartInstances.find({}).observe({
8788
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
8889
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
@@ -116,9 +117,6 @@ async function manipulateIngestRundownStatusPublicationData(
116117
): Promise<void> {
117118
// Prepare data for publication:
118119

119-
// We know that `collection` does diffing when 'commiting' all of the changes we have made
120-
// meaning that for anything we will call `replace()` on, we can `remove()` it first for no extra cost
121-
122120
if (updateProps?.newCache !== undefined) {
123121
state.contentCache = updateProps.newCache ?? undefined
124122
}

meteor/server/publications/ingestStatus/reactiveContentCache.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import type { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/di
55
import type { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances'
66
import type { NrcsIngestDataCacheObj } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
77
import type { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids'
8-
// import type { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment'
98
import type { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
109
import type { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
1110

@@ -41,13 +40,6 @@ export const partFieldSpecifier = literal<MongoFieldSpecifierOnesStrict<Pick<DBP
4140
ingestNotifyPartReady: 1,
4241
})
4342

44-
// export type SegmentFields = '_id' | 'rundownId' | 'externalId'
45-
// export const segmentFieldSpecifier = literal<MongoFieldSpecifierOnesStrict<Pick<DBSegment, SegmentFields>>>({
46-
// _id: 1,
47-
// rundownId: 1,
48-
// externalId: 1,
49-
// })
50-
5143
export type PartInstanceFields = '_id' | 'rundownId' | 'segmentId' | 'part'
5244
export const partInstanceFieldSpecifier = literal<
5345
MongoFieldSpecifierOnesStrict<Pick<PartInstance, PartInstanceFields>>
@@ -65,7 +57,6 @@ export interface ContentCache {
6557
Rundowns: ReactiveCacheCollection<Pick<DBRundown, RundownFields>>
6658
NrcsIngestData: ReactiveCacheCollection<NrcsIngestDataCacheObj>
6759
Parts: ReactiveCacheCollection<Pick<DBPart, PartFields>>
68-
// Segments: ReactiveCacheCollection<Pick<DBSegment, SegmentFields>>
6960
PartInstances: ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>
7061
}
7162

@@ -77,7 +68,6 @@ export function createReactiveContentCache(rundownIds: RundownId[]): ContentCach
7768
Rundowns: new ReactiveCacheCollection<Pick<DBRundown, RundownFields>>('rundowns'),
7869
NrcsIngestData: new ReactiveCacheCollection<NrcsIngestDataCacheObj>('nrcsIngestData'), // TODO - is this needed?
7970
Parts: new ReactiveCacheCollection<Pick<DBPart, PartFields>>('parts'),
80-
// Segments: new ReactiveCacheCollection<Pick<DBSegment, SegmentFields>>('segments'),
8171
PartInstances: new ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>('partInstances'),
8272
}
8373

meteor/server/publications/ingestStatus/rundownContentObserver.ts

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,19 +73,11 @@ export class RundownContentObserver {
7373
cache.Rundowns.link(),
7474
{
7575
projection: rundownFieldSpecifier,
76+
},
77+
{
78+
nonMutatingCallbacks: true,
7679
}
7780
),
78-
// Segments.observeChanges(
79-
// {
80-
// rundownId: {
81-
// $in: rundownIds,
82-
// },
83-
// },
84-
// cache.Segments.link(),
85-
// {
86-
// projection: segmentFieldSpecifier,
87-
// }
88-
// ),
8981
Parts.observeChanges(
9082
{
9183
rundownId: {
@@ -95,6 +87,9 @@ export class RundownContentObserver {
9587
cache.Parts.link(),
9688
{
9789
projection: partFieldSpecifier,
90+
},
91+
{
92+
nonMutatingCallbacks: true,
9893
}
9994
),
10095
PartInstances.observeChanges(
@@ -104,15 +99,22 @@ export class RundownContentObserver {
10499
orphaned: { $exists: false },
105100
},
106101
cache.PartInstances.link(),
107-
{ fields: partInstanceFieldSpecifier }
102+
{ fields: partInstanceFieldSpecifier },
103+
{
104+
nonMutatingCallbacks: true,
105+
}
108106
),
109107
NrcsIngestDataCache.observeChanges(
110108
{
111109
rundownId: {
112110
$in: rundownIds,
113111
},
114112
},
115-
cache.NrcsIngestData.link()
113+
cache.NrcsIngestData.link(),
114+
{},
115+
{
116+
nonMutatingCallbacks: true,
117+
}
116118
),
117119

118120
observer.#playlistIdObserver,

0 commit comments

Comments
 (0)