Skip to content

Commit f559621

Browse files
committed
feat: rework mos statuses flow SOFIE-97
1 parent b64155a commit f559621

File tree

38 files changed

+1932
-729
lines changed

38 files changed

+1932
-729
lines changed

.github/workflows/node.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -500,16 +500,19 @@ jobs:
500500
- node-version: 22.x
501501
package-name: job-worker
502502
send-coverage: true
503-
# No tests for the gateways yet
503+
# No tests for some gateways yet
504504
# - node-version: 22.x
505505
# package-name: playout-gateway
506-
# - node-version: 22.x
507-
# package-name: mos-gateway
506+
# send-coverage: true
507+
- node-version: 22.x
508+
package-name: mos-gateway
509+
send-coverage: true
508510
- node-version: 22.x
509511
package-name: live-status-gateway
510512
send-coverage: true
511513
- node-version: 22.x
512514
package-name: webui
515+
send-coverage: true
513516
# manual meteor-lib as it only needs a couple of versions
514517
- node-version: 22.x
515518
package-name: meteor-lib

meteor/server/collections/collection.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,8 @@ export interface AsyncOnlyReadOnlyMongoCollection<DBInterface extends { _id: Pro
246246
observeChanges(
247247
selector: MongoQuery<DBInterface> | DBInterface['_id'],
248248
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
249-
options?: FindOptions<DBInterface>
249+
findOptions?: FindOptions<DBInterface>,
250+
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
250251
): Promise<Meteor.LiveQueryHandle>
251252

252253
/**

meteor/server/collections/implementations/asyncCollection.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
141141
async observeChanges(
142142
selector: MongoQuery<DBInterface> | DBInterface['_id'],
143143
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
144-
options?: FindOptions<DBInterface>
144+
findOptions?: FindOptions<DBInterface>,
145+
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
145146
): Promise<Meteor.LiveQueryHandle> {
146147
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
147148
if (span) {
@@ -152,8 +153,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
152153
}
153154
try {
154155
const res = await this._collection
155-
.find((selector ?? {}) as any, options as any)
156-
.observeChangesAsync(callbacks)
156+
.find((selector ?? {}) as any, findOptions as any)
157+
.observeChangesAsync(callbacks, callbackOptions)
157158
if (span) span.end()
158159
return res
159160
} catch (e) {

meteor/server/publications/_publications.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import './lib/lib'
33

44
import './buckets'
55
import './blueprintUpgradeStatus/publication'
6+
import './ingestStatus/publication'
67
import './packageManager/expectedPackages/publication'
78
import './packageManager/packageContainers'
89
import './packageManager/playoutContext'
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import type { RundownId, PartId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2+
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
3+
import type { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
4+
import {
5+
IngestRundownStatus,
6+
IngestPartPlaybackStatus,
7+
IngestRundownActiveStatus,
8+
IngestPartStatus,
9+
} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
10+
import type { ReadonlyDeep } from 'type-fest'
11+
import _ from 'underscore'
12+
import type { ContentCache, PartFields, PartInstanceFields, PlaylistFields } from './reactiveContentCache'
13+
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
14+
import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection'
15+
import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances'
16+
import { IngestPart } from '@sofie-automation/blueprints-integration'
17+
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
18+
19+
export function createIngestRundownStatus(
20+
cache: ReadonlyDeep<ContentCache>,
21+
rundownId: RundownId
22+
): IngestRundownStatus | null {
23+
const rundown = cache.Rundowns.findOne(rundownId)
24+
if (!rundown) return null
25+
26+
const newDoc: IngestRundownStatus = {
27+
_id: rundownId,
28+
externalId: rundown.externalId,
29+
30+
active: IngestRundownActiveStatus.INACTIVE,
31+
32+
segments: [],
33+
}
34+
35+
const playlist = cache.Playlists.findOne({
36+
_id: rundown.playlistId,
37+
activationId: { $exists: true },
38+
})
39+
40+
if (playlist) {
41+
newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE
42+
}
43+
44+
// Find the most important part instance for each part
45+
const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances)
46+
47+
const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
48+
for (const nrcsSegment of nrcsSegments) {
49+
const nrcsParts = cache.NrcsIngestData.find({
50+
rundownId,
51+
segmentId: nrcsSegment.segmentId,
52+
type: NrcsIngestCacheType.PART,
53+
}).fetch()
54+
55+
newDoc.segments.push({
56+
externalId: nrcsSegment.data.externalId,
57+
parts: _.compact(
58+
nrcsParts.map((nrcsPart) => {
59+
if (!nrcsPart.partId || !nrcsPart.segmentId) return null
60+
61+
const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId })
62+
const partInstance = partInstanceMap.get(nrcsPart.partId)
63+
64+
return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart)
65+
})
66+
),
67+
})
68+
}
69+
70+
return newDoc
71+
}
72+
73+
function findPartInstanceForEachPart(
74+
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
75+
rundownId: RundownId,
76+
partInstancesCache: ReadonlyDeep<ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>>
77+
) {
78+
const partInstanceMap = new Map<PartId, Pick<DBPartInstance, PartInstanceFields>>()
79+
if (!playlist) return partInstanceMap
80+
81+
for (const partInstance of partInstancesCache.find({}).fetch()) {
82+
if (partInstance.rundownId !== rundownId) continue
83+
// Ignore the next partinstance
84+
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue
85+
86+
// The current part instance is the most important
87+
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
88+
partInstanceMap.set(partInstance.part._id, partInstance)
89+
continue
90+
}
91+
92+
// Take the part with the highest takeCount
93+
const existingEntry = partInstanceMap.get(partInstance.part._id)
94+
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
95+
partInstanceMap.set(partInstance.part._id, partInstance)
96+
}
97+
}
98+
99+
return partInstanceMap
100+
}
101+
102+
function createIngestPartStatus(
103+
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
104+
partInstance: Pick<PartInstance, PartInstanceFields> | undefined,
105+
part: Pick<DBPart, PartFields> | undefined,
106+
ingestPart: IngestPart
107+
): IngestPartStatus {
108+
// Determine the playback status from the PartInstance
109+
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN
110+
if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) {
111+
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id
112+
113+
if (isCurrentPartInstance) {
114+
// If the current, it is playing
115+
playbackStatus = IngestPartPlaybackStatus.PLAY
116+
} else {
117+
// If not the current, but has been played, it is stopped
118+
playbackStatus = IngestPartPlaybackStatus.STOP
119+
}
120+
}
121+
122+
// Determine the ready status from the PartInstance or Part
123+
const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady
124+
const itemsReady = partInstance ? partInstance.part.ingestNotifyItemsReady : part?.ingestNotifyItemsReady
125+
126+
return {
127+
externalId: ingestPart.externalId,
128+
129+
isReady: isReady ?? null,
130+
itemsReady: itemsReady ?? {},
131+
132+
playbackStatus,
133+
}
134+
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2+
import { ReadonlyDeep } from 'type-fest'
3+
import {
4+
CustomPublishCollection,
5+
meteorCustomPublish,
6+
setUpCollectionOptimizedObserver,
7+
SetupObserversResult,
8+
TriggerUpdate,
9+
} from '../../lib/customPublication'
10+
import { logger } from '../../logging'
11+
import { ContentCache, createReactiveContentCache } from './reactiveContentCache'
12+
import { RundownsObserver } from '../lib/rundownsObserver'
13+
import { RundownContentObserver } from './rundownContentObserver'
14+
import {
15+
PeripheralDevicePubSub,
16+
PeripheralDevicePubSubCollectionsNames,
17+
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
18+
import { checkAccessAndGetPeripheralDevice } from '../../security/check'
19+
import { check } from '../../lib/check'
20+
import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
21+
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
22+
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
23+
import { createIngestRundownStatus } from './createIngestRundownStatus'
24+
25+
interface IngestRundownStatusArgs {
26+
readonly deviceId: PeripheralDeviceId
27+
}
28+
29+
export interface IngestRundownStatusState {
30+
contentCache: ReadonlyDeep<ContentCache>
31+
}
32+
33+
interface IngestRundownStatusUpdateProps {
34+
newCache: ContentCache
35+
36+
invalidateRundownIds: RundownId[]
37+
invalidatePlaylistIds: RundownPlaylistId[]
38+
}
39+
40+
async function setupIngestRundownStatusPublicationObservers(
41+
args: ReadonlyDeep<IngestRundownStatusArgs>,
42+
triggerUpdate: TriggerUpdate<IngestRundownStatusUpdateProps>
43+
): Promise<SetupObserversResult> {
44+
const rundownsObserver = await RundownsObserver.createForPeripheralDevice(args.deviceId, async (rundownIds) => {
45+
logger.silly(`Creating new RundownContentObserver`, rundownIds)
46+
47+
// TODO - can this be done cheaper?
48+
const cache = createReactiveContentCache(rundownIds)
49+
50+
// Push update
51+
triggerUpdate({ newCache: cache })
52+
53+
const contentObserver = await RundownContentObserver.create(rundownIds, cache)
54+
55+
const innerQueries = [
56+
cache.Playlists.find({}).observeChanges(
57+
{
58+
added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
59+
changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
60+
removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
61+
},
62+
{ nonMutatingCallbacks: true }
63+
),
64+
cache.Rundowns.find({}).observeChanges(
65+
{
66+
added: (docId) => {
67+
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
68+
contentObserver.checkPlaylistIds()
69+
},
70+
changed: (docId) => {
71+
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
72+
contentObserver.checkPlaylistIds()
73+
},
74+
removed: (docId) => {
75+
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
76+
contentObserver.checkPlaylistIds()
77+
},
78+
},
79+
{ nonMutatingCallbacks: true }
80+
),
81+
cache.Parts.find({}).observe({
82+
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
83+
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
84+
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
85+
}),
86+
cache.PartInstances.find({}).observe({
87+
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
88+
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
89+
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
90+
}),
91+
cache.NrcsIngestData.find({}).observe({
92+
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
93+
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
94+
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
95+
}),
96+
]
97+
98+
return () => {
99+
contentObserver.dispose()
100+
101+
for (const query of innerQueries) {
102+
query.stop()
103+
}
104+
}
105+
})
106+
107+
// Set up observers:
108+
return [rundownsObserver]
109+
}
110+
111+
async function manipulateIngestRundownStatusPublicationData(
112+
_args: IngestRundownStatusArgs,
113+
state: Partial<IngestRundownStatusState>,
114+
collection: CustomPublishCollection<IngestRundownStatus>,
115+
updateProps: Partial<ReadonlyDeep<IngestRundownStatusUpdateProps>> | undefined
116+
): Promise<void> {
117+
// Prepare data for publication:
118+
119+
if (updateProps?.newCache !== undefined) {
120+
state.contentCache = updateProps.newCache ?? undefined
121+
}
122+
123+
if (!state.contentCache) {
124+
// Remove all the notes
125+
collection.remove(null)
126+
127+
return
128+
}
129+
130+
const updateAll = !updateProps || !!updateProps?.newCache
131+
if (updateAll) {
132+
// Remove all the notes
133+
collection.remove(null)
134+
135+
const knownRundownIds = new Set(state.contentCache.RundownIds)
136+
137+
for (const rundownId of knownRundownIds) {
138+
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
139+
if (newDoc) collection.replace(newDoc)
140+
}
141+
} else {
142+
const regenerateForRundownIds = new Set(updateProps.invalidateRundownIds)
143+
144+
// Include anything where the playlist has changed
145+
if (updateProps.invalidatePlaylistIds && updateProps.invalidatePlaylistIds.length > 0) {
146+
const rundownsToUpdate = state.contentCache.Rundowns.find(
147+
{
148+
playlistId: { $in: updateProps.invalidatePlaylistIds },
149+
},
150+
{
151+
projection: {
152+
_id: 1,
153+
},
154+
}
155+
).fetch() as Pick<DBRundown, '_id'>[]
156+
157+
for (const rundown of rundownsToUpdate) {
158+
regenerateForRundownIds.add(rundown._id)
159+
}
160+
}
161+
162+
for (const rundownId of regenerateForRundownIds) {
163+
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
164+
if (newDoc) {
165+
collection.replace(newDoc)
166+
} else {
167+
collection.remove(rundownId)
168+
}
169+
}
170+
}
171+
}
172+
173+
meteorCustomPublish(
174+
PeripheralDevicePubSub.ingestDeviceRundownStatus,
175+
PeripheralDevicePubSubCollectionsNames.ingestRundownStatus,
176+
async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) {
177+
check(deviceId, String)
178+
179+
await checkAccessAndGetPeripheralDevice(deviceId, token, this)
180+
181+
await setUpCollectionOptimizedObserver<
182+
IngestRundownStatus,
183+
IngestRundownStatusArgs,
184+
IngestRundownStatusState,
185+
IngestRundownStatusUpdateProps
186+
>(
187+
`pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`,
188+
{ deviceId },
189+
setupIngestRundownStatusPublicationObservers,
190+
manipulateIngestRundownStatusPublicationData,
191+
pub,
192+
100
193+
)
194+
}
195+
)

0 commit comments

Comments
 (0)