Skip to content

Commit 692737e

Browse files
committed
wip: refactor
1 parent 957f1f7 commit 692737e

File tree

4 files changed

+202
-104
lines changed

4 files changed

+202
-104
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import type { RundownId, PartId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2+
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
3+
import type { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
4+
import {
5+
IngestRundownStatus,
6+
IngestPartPlaybackStatus,
7+
IngestRundownActiveStatus,
8+
IngestPartStatus,
9+
} from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
10+
import type { ReadonlyDeep } from 'type-fest'
11+
import _ from 'underscore'
12+
import type { ContentCache, PartFields, PartInstanceFields, PlaylistFields } from './reactiveContentCache'
13+
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
14+
import { ReactiveCacheCollection } from '../lib/ReactiveCacheCollection'
15+
import { PartInstance } from '@sofie-automation/meteor-lib/dist/collections/PartInstances'
16+
import { IngestPart } from '@sofie-automation/blueprints-integration'
17+
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
18+
19+
export function createIngestRundownStatus(
20+
cache: ReadonlyDeep<ContentCache>,
21+
rundownId: RundownId
22+
): IngestRundownStatus | null {
23+
const rundown = cache.Rundowns.findOne(rundownId)
24+
if (!rundown) return null
25+
26+
const newDoc: IngestRundownStatus = {
27+
_id: rundownId,
28+
externalId: rundown.externalId,
29+
30+
active: IngestRundownActiveStatus.INACTIVE,
31+
32+
segments: [],
33+
}
34+
35+
const playlist = cache.Playlists.findOne({
36+
_id: rundown.playlistId,
37+
activationId: { $exists: true },
38+
})
39+
40+
if (playlist) {
41+
newDoc.active = playlist.rehearsal ? IngestRundownActiveStatus.REHEARSAL : IngestRundownActiveStatus.ACTIVE
42+
}
43+
44+
// Find the most important part instance for each part
45+
const partInstanceMap = findPartInstanceForEachPart(playlist, rundownId, cache.PartInstances)
46+
47+
const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
48+
for (const nrcsSegment of nrcsSegments) {
49+
const nrcsParts = cache.NrcsIngestData.find({
50+
rundownId,
51+
segmentId: nrcsSegment.segmentId,
52+
type: NrcsIngestCacheType.PART,
53+
}).fetch()
54+
55+
newDoc.segments.push({
56+
externalId: nrcsSegment.data.externalId,
57+
parts: _.compact(
58+
nrcsParts.map((nrcsPart) => {
59+
if (!nrcsPart.partId || !nrcsPart.segmentId) return null
60+
61+
const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId })
62+
const partInstance = partInstanceMap.get(nrcsPart.partId)
63+
64+
return createIngestPartStatus(playlist, partInstance, part, nrcsPart.data as IngestPart)
65+
})
66+
),
67+
})
68+
}
69+
70+
return newDoc
71+
}
72+
73+
function findPartInstanceForEachPart(
74+
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
75+
rundownId: RundownId,
76+
partInstancesCache: ReadonlyDeep<ReactiveCacheCollection<Pick<PartInstance, PartInstanceFields>>>
77+
) {
78+
const partInstanceMap = new Map<PartId, Pick<DBPartInstance, PartInstanceFields>>()
79+
if (!playlist) return partInstanceMap
80+
81+
for (const partInstance of partInstancesCache.find({}).fetch()) {
82+
if (partInstance.rundownId !== rundownId) continue
83+
// Ignore the next partinstance
84+
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue
85+
86+
// The current part instance is the most important
87+
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
88+
partInstanceMap.set(partInstance.part._id, partInstance)
89+
continue
90+
}
91+
92+
// Take the part with the highest takeCount
93+
const existingEntry = partInstanceMap.get(partInstance.part._id)
94+
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
95+
partInstanceMap.set(partInstance.part._id, partInstance)
96+
}
97+
}
98+
99+
return partInstanceMap
100+
}
101+
102+
function createIngestPartStatus(
103+
playlist: Pick<DBRundownPlaylist, PlaylistFields> | undefined,
104+
partInstance: Pick<PartInstance, PartInstanceFields> | undefined,
105+
part: Pick<DBPart, PartFields> | undefined,
106+
ingestPart: IngestPart
107+
): IngestPartStatus {
108+
// Determine the playback status from the PartInstance
109+
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN
110+
if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) {
111+
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id
112+
113+
if (isCurrentPartInstance) {
114+
// If the current, it is playing
115+
playbackStatus = IngestPartPlaybackStatus.PLAY
116+
} else {
117+
// If not the current, but has been played, it is stopped
118+
playbackStatus = IngestPartPlaybackStatus.STOP
119+
}
120+
}
121+
122+
// Determine the ready status from the PartInstance or Part
123+
const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady
124+
125+
return {
126+
externalId: ingestPart.externalId,
127+
128+
isReady: isReady ?? null,
129+
130+
playbackStatus,
131+
}
132+
}

meteor/server/publications/ingestStatus/publication.ts

Lines changed: 6 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { PartId, PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
1+
import { PeripheralDeviceId, RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
22
import { ReadonlyDeep } from 'type-fest'
33
import {
44
CustomPublishCollection,
@@ -8,7 +8,7 @@ import {
88
TriggerUpdate,
99
} from '../../lib/customPublication'
1010
import { logger } from '../../logging'
11-
import { ContentCache, createReactiveContentCache, PartInstanceFields } from './reactiveContentCache'
11+
import { ContentCache, createReactiveContentCache } from './reactiveContentCache'
1212
import { RundownsObserver } from '../lib/rundownsObserver'
1313
import { RundownContentObserver } from './rundownContentObserver'
1414
import {
@@ -17,12 +17,10 @@ import {
1717
} from '@sofie-automation/shared-lib/dist/pubsub/peripheralDevice'
1818
import { checkAccessAndGetPeripheralDevice } from '../../security/check'
1919
import { check } from '../../lib/check'
20-
import { IngestPartPlaybackStatus, IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
20+
import { IngestRundownStatus } from '@sofie-automation/shared-lib/dist/ingest/rundownStatus'
2121
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
2222
import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
23-
import { NrcsIngestCacheType } from '@sofie-automation/corelib/dist/dataModel/NrcsIngestDataCache'
24-
import _ from 'underscore'
25-
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
23+
import { createIngestRundownStatus } from './createIngestRundownStatus'
2624

2725
interface IngestRundownStatusArgs {
2826
readonly deviceId: PeripheralDeviceId
@@ -137,7 +135,7 @@ async function manipulateIngestRundownStatusPublicationData(
137135
const knownRundownIds = new Set(state.contentCache.RundownIds)
138136

139137
for (const rundownId of knownRundownIds) {
140-
const newDoc = regenerateForRundown(state.contentCache, rundownId)
138+
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
141139
if (newDoc) collection.replace(newDoc)
142140
}
143141
} else {
@@ -162,7 +160,7 @@ async function manipulateIngestRundownStatusPublicationData(
162160
}
163161

164162
for (const rundownId of regenerateForRundownIds) {
165-
const newDoc = regenerateForRundown(state.contentCache, rundownId)
163+
const newDoc = createIngestRundownStatus(state.contentCache, rundownId)
166164
if (newDoc) {
167165
collection.replace(newDoc)
168166
} else {
@@ -172,99 +170,6 @@ async function manipulateIngestRundownStatusPublicationData(
172170
}
173171
}
174172

175-
function regenerateForRundown(cache: ReadonlyDeep<ContentCache>, rundownId: RundownId): IngestRundownStatus | null {
176-
const rundown = cache.Rundowns.findOne(rundownId)
177-
if (!rundown) return null
178-
179-
const newDoc: IngestRundownStatus = {
180-
_id: rundownId,
181-
externalId: rundown.externalId,
182-
183-
active: 'inactive',
184-
185-
segments: [],
186-
}
187-
188-
const playlist = cache.Playlists.findOne({
189-
_id: rundown.playlistId,
190-
activationId: { $exists: true },
191-
})
192-
193-
if (playlist) {
194-
newDoc.active = playlist.rehearsal ? 'rehearsal' : 'active'
195-
}
196-
197-
// Find the most important part instance for each part
198-
const partInstanceMap = new Map<PartId, Pick<DBPartInstance, PartInstanceFields>>()
199-
if (playlist) {
200-
for (const partInstance of cache.PartInstances.find({}).fetch()) {
201-
if (partInstance.rundownId !== rundownId) continue
202-
// Ignore the next partinstance
203-
if (partInstance._id === playlist.nextPartInfo?.partInstanceId) continue
204-
205-
// The current part instance is the most important
206-
if (partInstance._id === playlist.currentPartInfo?.partInstanceId) {
207-
partInstanceMap.set(partInstance.part._id, partInstance)
208-
continue
209-
}
210-
211-
// Take the part with the highest takeCount
212-
const existingEntry = partInstanceMap.get(partInstance.part._id)
213-
if (!existingEntry || existingEntry.takeCount < partInstance.takeCount) {
214-
partInstanceMap.set(partInstance.part._id, partInstance)
215-
}
216-
}
217-
}
218-
219-
const nrcsSegments = cache.NrcsIngestData.find({ rundownId, type: NrcsIngestCacheType.SEGMENT }).fetch()
220-
for (const nrcsSegment of nrcsSegments) {
221-
const nrcsParts = cache.NrcsIngestData.find({
222-
rundownId,
223-
segmentId: nrcsSegment.segmentId,
224-
type: NrcsIngestCacheType.PART,
225-
}).fetch()
226-
227-
newDoc.segments.push({
228-
externalId: nrcsSegment.data.externalId,
229-
parts: _.compact(
230-
nrcsParts.map((nrcsPart) => {
231-
if (!nrcsPart.partId || !nrcsPart.segmentId) return null
232-
233-
const part = cache.Parts.findOne({ _id: nrcsPart.partId, rundownId })
234-
const partInstance = partInstanceMap.get(nrcsPart.partId)
235-
236-
// Determine the playback status from the PartInstance
237-
let playbackStatus = IngestPartPlaybackStatus.UNKNOWN
238-
if (playlist && partInstance && partInstance.part.shouldNotifyCurrentPlayingPart) {
239-
const isCurrentPartInstance = playlist.currentPartInfo?.partInstanceId === partInstance._id
240-
241-
if (isCurrentPartInstance) {
242-
// If the current, it is playing
243-
playbackStatus = IngestPartPlaybackStatus.PLAY
244-
} else {
245-
// If not the current, but has been played, it is stopped
246-
playbackStatus = IngestPartPlaybackStatus.STOP
247-
}
248-
}
249-
250-
// Determine the ready status from the PartInstance or Part
251-
const isReady = partInstance ? partInstance.part.ingestNotifyPartReady : part?.ingestNotifyPartReady
252-
253-
return {
254-
externalId: nrcsPart.data.externalId,
255-
256-
isReady: isReady ?? null,
257-
258-
playbackStatus,
259-
}
260-
})
261-
),
262-
})
263-
}
264-
265-
return newDoc
266-
}
267-
268173
meteorCustomPublish(
269174
PeripheralDevicePubSub.ingestDeviceRundownStatus,
270175
PeripheralDevicePubSubCollectionsNames.ingestRundownStatus,

meteor/server/publications/lib/__tests__/rundownsObserver.test.ts

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { RundownId, RundownPlaylistId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
1+
import {
2+
PeripheralDeviceId,
3+
RundownId,
4+
RundownPlaylistId,
5+
StudioId,
6+
} from '@sofie-automation/corelib/dist/dataModel/Ids'
27
import { protectString } from '@sofie-automation/corelib/dist/protectedString'
38
import { Rundown } from '@sofie-automation/corelib/dist/dataModel/Rundown'
49
import { Rundowns } from '../../../collections'
@@ -263,4 +268,56 @@ describe('RundownsObserver', () => {
263268
observer.stop()
264269
}
265270
})
271+
272+
test('create and destroy observer - for peripheraldevice', async () => {
273+
const deviceId = protectString<PeripheralDeviceId>('device0')
274+
275+
const onChangedCleanup = jest.fn()
276+
const onChanged = jest.fn(async () => onChangedCleanup)
277+
278+
// should not be any observers yet
279+
expect(RundownsMock.observers).toHaveLength(0)
280+
281+
const observer = await RundownsObserver.createForPeripheralDevice(deviceId, onChanged)
282+
try {
283+
// should now be an observer
284+
expect(RundownsMock.observers).toHaveLength(1)
285+
286+
// Before debounce
287+
expect(onChanged).toHaveBeenCalledTimes(0)
288+
289+
// After debounce
290+
await waitUntil(async () => {
291+
// Run timers, so that promises in the observer has a chance to resolve:
292+
await runAllTimers()
293+
expect(onChanged).toHaveBeenCalledTimes(1)
294+
expect(onChangedCleanup).toHaveBeenCalledTimes(0)
295+
}, MAX_WAIT_TIME)
296+
297+
// still got an observer
298+
expect(RundownsMock.observers).toHaveLength(1)
299+
300+
// get the mock observer, and ensure to looks sane
301+
expect(RundownsMock.observers).toHaveLength(1)
302+
const mockObserver = RundownsMock.observers[0]
303+
expect(mockObserver).toBeTruthy()
304+
expect(mockObserver.callbacksChanges).toBeFalsy()
305+
expect(mockObserver.callbacksObserve).toBeTruthy()
306+
expect(mockObserver.callbacksObserve?.added).toBeTruthy()
307+
expect(mockObserver.callbacksObserve?.changed).toBeTruthy()
308+
expect(mockObserver.callbacksObserve?.removed).toBeTruthy()
309+
expect(mockObserver.query).toEqual({
310+
'source.peripheralDeviceId': 'device0',
311+
'source.type': 'nrcs',
312+
})
313+
} finally {
314+
// Make sure to cleanup
315+
observer.stop()
316+
317+
// Check it stopped
318+
expect(onChanged).toHaveBeenCalledTimes(1)
319+
expect(onChangedCleanup).toHaveBeenCalledTimes(1)
320+
expect(RundownsMock.observers).toHaveLength(0)
321+
}
322+
})
266323
})

packages/shared-lib/src/ingest/rundownStatus.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@ export interface IngestRundownStatus {
66
/** Rundown external id */
77
externalId: string
88

9-
active: 'active' | 'rehearsal' | 'inactive'
9+
active: IngestRundownActiveStatus
1010

1111
segments: IngestSegmentStatus[]
12+
}
1213

13-
// Future: the rundown could have a status that we report?
14+
export enum IngestRundownActiveStatus {
15+
ACTIVE = 'active',
16+
REHEARSAL = 'rehearsal',
17+
INACTIVE = 'inactive',
1418
}
1519

1620
export interface IngestSegmentStatus {

0 commit comments

Comments
 (0)