Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions .github/workflows/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -490,16 +490,19 @@ jobs:
- node-version: 22.x
package-name: job-worker
send-coverage: true
# No tests for the gateways yet
# No tests for some gateways yet
# - node-version: 22.x
# package-name: playout-gateway
# - node-version: 22.x
# package-name: mos-gateway
# send-coverage: true
- node-version: 22.x
package-name: mos-gateway
send-coverage: true
- node-version: 22.x
package-name: live-status-gateway
send-coverage: true
- node-version: 22.x
package-name: webui
send-coverage: true
# manual meteor-lib as it only needs a couple of versions
- node-version: 22.x
package-name: meteor-lib
Expand Down
3 changes: 2 additions & 1 deletion meteor/server/collections/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ export interface AsyncOnlyReadOnlyMongoCollection<DBInterface extends { _id: Pro
observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle>

/**
Expand Down
7 changes: 4 additions & 3 deletions meteor/server/collections/implementations/asyncCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
async observeChanges(
selector: MongoQuery<DBInterface> | DBInterface['_id'],
callbacks: PromisifyCallbacks<ObserveChangesCallbacks<DBInterface>>,
options?: FindOptions<DBInterface>
findOptions?: FindOptions<DBInterface>,
callbackOptions?: { nonMutatingCallbacks?: boolean | undefined }
): Promise<Meteor.LiveQueryHandle> {
const span = profiler.startSpan(`MongoCollection.${this.name}.observeChanges`)
if (span) {
Expand All @@ -152,8 +153,8 @@ export class WrappedAsyncMongoCollection<DBInterface extends { _id: ProtectedStr
}
try {
const res = await this._collection
.find((selector ?? {}) as any, options as any)
.observeChangesAsync(callbacks)
.find((selector ?? {}) as any, findOptions as any)
.observeChangesAsync(callbacks, callbackOptions)
if (span) span.end()
return res
} catch (e) {
Expand Down
1 change: 1 addition & 0 deletions meteor/server/publications/_publications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import './lib/lib'

import './buckets'
import './blueprintUpgradeStatus/publication'
import './ingestStatus/publication'
import './packageManager/expectedPackages/publication'
import './packageManager/packageContainers'
import './packageManager/playoutContext'
Expand Down
134 changes: 134 additions & 0 deletions meteor/server/publications/ingestStatus/createIngestRundownStatus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import type { RundownId, PartId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
import type { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
import {
IngestRundownStatus,
IngestPartPlaybackStatus,
IngestRundownActiveStatus,
IngestPartStatus,
} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import type { ReadonlyDeep } from 'type-fest'
import _ from 'underscore'
import type { ContentCache, PartFields, PartInstanceFields, PlaylistFields } from './reactiveContentCache'
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection'
import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances'
import { IngestPart } from '@sofie-automation/blueprints-integration'
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'

export function createIngestRundownStatus(
cache: ReadonlyDeep<ContentCache>,
rundownId: RundownId
): IngestRundownStatus | null {
const rundown = cache.Rundowns.findOne(rundownId)
if (!rundown) return null

const newDoc: IngestRundownStatus = {
_id: rundownId,
externalId: rundown.externalId,

active: IngestRundownActiveStatus.INACTIVE,

segments: [],
}

const playlist = cache.Playlists.findOne({
_id: rundown.playlistId,
activationId: { $exists: true },
})

if (playlist) {
newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE
}

// Find the most important part instance for each part
const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances)

const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
for (const nrcsSegment of nrcsSegments) {
const nrcsParts = cache.NrcsIngestData.find({
rundownId,
segmentId: nrcsSegment.segmentId,
type: NrcsIngestCacheType.PART,
}).fetch()

newDoc.segments.push({
externalId: nrcsSegment.data.externalId,
parts: _.compact(
nrcsParts.map((nrcsPart) => {
if (!nrcsPart.partId || !nrcsPart.segmentId) return null

const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId })
const partInstance = partInstanceMap.get(nrcsPart.partId)

return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart)
})
),
})
}

return newDoc
}

function findPartInstanceForEachPart(
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
rundownId: RundownId,
partInstancesCache: ReadonlyDeep<ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>>
) {
const partInstanceMap = new Map<PartId, Pick<DBPartInstance, PartInstanceFields>>()
if (!playlist) return partInstanceMap

for (const partInstance of partInstancesCache.find({}).fetch()) {
if (partInstance.rundownId !== rundownId) continue
// Ignore the next partinstance
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue

// The current part instance is the most important
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
partInstanceMap.set(partInstance.part._id, partInstance)
continue
}

// Take the part with the highest takeCount
const existingEntry = partInstanceMap.get(partInstance.part._id)
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
partInstanceMap.set(partInstance.part._id, partInstance)
}
}

return partInstanceMap
}

function createIngestPartStatus(
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
partInstance: Pick<PartInstance, PartInstanceFields> | undefined,
part: Pick<DBPart, PartFields> | undefined,
ingestPart: IngestPart
): IngestPartStatus {
// Determine the playback status from the PartInstance
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN
if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) {
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id

if (isCurrentPartInstance) {
// If the current, it is playing
playbackStatus = IngestPartPlaybackStatus.PLAY
} else {
// If not the current, but has been played, it is stopped
playbackStatus = IngestPartPlaybackStatus.STOP
}
}

// Determine the ready status from the PartInstance or Part
const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady
const itemsReady = partInstance ? partInstance.part.ingestNotifyItemsReady : part?.ingestNotifyItemsReady

return {
externalId: ingestPart.externalId,

isReady: isReady ?? null,
itemsReady: itemsReady ?? {},

playbackStatus,
}
}
195 changes: 195 additions & 0 deletions meteor/server/publications/ingestStatus/publication.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
import { ReadonlyDeep } from 'type-fest'
import {
CustomPublishCollection,
meteorCustomPublish,
setUpCollectionOptimizedObserver,
SetupObserversResult,
TriggerUpdate,
} from '../../lib/customPublication'
import { logger } from '../../logging'
import { ContentCache, createReactiveContentCache } from './reactiveContentCache'
import { RundownsObserver } from '../lib/rundownsObserver'
import { RundownContentObserver } from './rundownContentObserver'
import {
PeripheralDevicePubSub,
PeripheralDevicePubSubCollectionsNames,
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
import { checkAccessAndGetPeripheralDevice } from '../../security/check'
import { check } from '../../lib/check'
import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
import { createIngestRundownStatus } from './createIngestRundownStatus'

interface IngestRundownStatusArgs {
readonly deviceId: PeripheralDeviceId
}

export interface IngestRundownStatusState {
contentCache: ReadonlyDeep<ContentCache>
}

interface IngestRundownStatusUpdateProps {
newCache: ContentCache

invalidateRundownIds: RundownId[]
invalidatePlaylistIds: RundownPlaylistId[]
}

async function setupIngestRundownStatusPublicationObservers(
args: ReadonlyDeep<IngestRundownStatusArgs>,
triggerUpdate: TriggerUpdate<IngestRundownStatusUpdateProps>
): Promise<SetupObserversResult> {
const rundownsObserver = await RundownsObserver.createForPeripheralDevice(args.deviceId, async (rundownIds) => {
logger.silly(`Creating new RundownContentObserver`, rundownIds)

// TODO - can this be done cheaper?
const cache = createReactiveContentCache(rundownIds)

// Push update
triggerUpdate({ newCache: cache })

const contentObserver = await RundownContentObserver.create(rundownIds, cache)

const innerQueries = [
cache.Playlists.find({}).observeChanges(
{
added: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
changed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
removed: (docId) => triggerUpdate({ invalidatePlaylistIds: [protectString(docId)] }),
},
{ nonMutatingCallbacks: true }
),
cache.Rundowns.find({}).observeChanges(
{
added: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
changed: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
removed: (docId) => {
triggerUpdate({ invalidateRundownIds: [protectString(docId)] })
contentObserver.checkPlaylistIds()
},
},
{ nonMutatingCallbacks: true }
),
cache.Parts.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
cache.PartInstances.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
cache.NrcsIngestData.find({}).observe({
added: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
changed: (doc, oldDoc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId, oldDoc.rundownId] }),
removed: (doc) => triggerUpdate({ invalidateRundownIds: [doc.rundownId] }),
}),
]

return () => {
contentObserver.dispose()

for (const query of innerQueries) {
query.stop()
}
}
})

// Set up observers:
return [rundownsObserver]
}

async function manipulateIngestRundownStatusPublicationData(
_args: IngestRundownStatusArgs,
state: Partial<IngestRundownStatusState>,
collection: CustomPublishCollection<IngestRundownStatus>,
updateProps: Partial<ReadonlyDeep<IngestRundownStatusUpdateProps>> | undefined
): Promise<void> {
// Prepare data for publication:

if (updateProps?.newCache !== undefined) {
state.contentCache = updateProps.newCache ?? undefined
}

if (!state.contentCache) {
// Remove all the notes
collection.remove(null)

return
}

const updateAll = !updateProps || !!updateProps?.newCache
if (updateAll) {
// Remove all the notes
collection.remove(null)

const knownRundownIds = new Set(state.contentCache.RundownIds)

for (const rundownId of knownRundownIds) {
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
if (newDoc) collection.replace(newDoc)
}
} else {
const regenerateForRundownIds = new Set(updateProps.invalidateRundownIds)

// Include anything where the playlist has changed
if (updateProps.invalidatePlaylistIds && updateProps.invalidatePlaylistIds.length > 0) {
const rundownsToUpdate = state.contentCache.Rundowns.find(
{
playlistId: { $in: updateProps.invalidatePlaylistIds },
},
{
projection: {
_id: 1,
},
}
).fetch() as Pick<DBRundown, '_id'>[]

for (const rundown of rundownsToUpdate) {
regenerateForRundownIds.add(rundown._id)
}
}

for (const rundownId of regenerateForRundownIds) {
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
if (newDoc) {
collection.replace(newDoc)
} else {
collection.remove(rundownId)
}
}
}
}

meteorCustomPublish(
PeripheralDevicePubSub.ingestDeviceRundownStatus,
PeripheralDevicePubSubCollectionsNames.ingestRundownStatus,
async function (pub, deviceId: PeripheralDeviceId, token: string | undefined) {
check(deviceId, String)

await checkAccessAndGetPeripheralDevice(deviceId, token, this)

await setUpCollectionOptimizedObserver<
IngestRundownStatus,
IngestRundownStatusArgs,
IngestRundownStatusState,
IngestRundownStatusUpdateProps
>(
`pub_${PeripheralDevicePubSub.ingestDeviceRundownStatus}_${deviceId}`,
{ deviceId },
setupIngestRundownStatusPublicationObservers,
manipulateIngestRundownStatusPublicationData,
pub,
100
)
}
)
Loading
Loading