Skip to content

Commit 45f6e2c

Browse files
committed
wip
1 parent 5312872 commit 45f6e2c

File tree

5 files changed

+182
-23
lines changed

5 files changed

+182
-23
lines changed

meteor/server/publications/packageManager/expectedPackages/contentCache.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,31 @@ import { ReactiveCacheCollection } from '../../lib/ReactiveCacheCollection'
22
import { literal } from '@sofie-automation/corelib/dist/lib'
33
import { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo'
44
import { ExpectedPackageDB } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages'
5+
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
6+
import { PieceInstance } from '@sofie-automation/corelib/dist/dataModel/PieceInstance'
7+
8+
export type RundownPlaylistCompact = Pick<
9+
DBRundownPlaylist,
10+
'_id' | 'activationId' | 'currentPartInfo' | 'nextPartInfo'
11+
>
12+
export const rundownPlaylistFieldSpecifier = literal<MongoFieldSpecifierOnesStrict<RundownPlaylistCompact>>({
13+
_id: 1,
14+
activationId: 1,
15+
currentPartInfo: 1, // So that it invalidates when the current changes
16+
nextPartInfo: 1, // So that it invalidates when the next changes
17+
})
18+
19+
export type PieceInstanceCompact = Pick<
20+
PieceInstance,
21+
'_id' | 'rundownId' | 'partInstanceId' | 'neededExpectedPackageIds'
22+
>
23+
24+
export const pieceInstanceFieldsSpecifier = literal<MongoFieldSpecifierOnesStrict<PieceInstanceCompact>>({
25+
_id: 1,
26+
rundownId: 1,
27+
partInstanceId: 1,
28+
neededExpectedPackageIds: 1,
29+
})
530

631
export type ExpectedPackageDBCompact = Pick<ExpectedPackageDB, '_id' | 'package'>
732

@@ -12,11 +37,15 @@ export const expectedPackageDBFieldsSpecifier = literal<MongoFieldSpecifierOnesS
1237

1338
export interface ExpectedPackagesContentCache {
1439
ExpectedPackages: ReactiveCacheCollection<ExpectedPackageDBCompact>
40+
RundownPlaylists: ReactiveCacheCollection<RundownPlaylistCompact>
41+
PieceInstances: ReactiveCacheCollection<PieceInstanceCompact>
1542
}
1643

1744
export function createReactiveContentCache(): ExpectedPackagesContentCache {
1845
const cache: ExpectedPackagesContentCache = {
1946
ExpectedPackages: new ReactiveCacheCollection<ExpectedPackageDBCompact>('expectedPackages'),
47+
RundownPlaylists: new ReactiveCacheCollection<RundownPlaylistCompact>('rundownPlaylists'),
48+
PieceInstances: new ReactiveCacheCollection<PieceInstanceCompact>('pieceInstances'),
2049
}
2150

2251
return cache

meteor/server/publications/packageManager/expectedPackages/contentObserver.ts

Lines changed: 82 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,29 @@
11
import { Meteor } from 'meteor/meteor'
2-
import { StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2+
import { PartInstanceId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
33
import { logger } from '../../../logging'
4-
import { ExpectedPackagesContentCache, expectedPackageDBFieldsSpecifier } from './contentCache'
5-
import { ExpectedPackages } from '../../../collections'
4+
import {
5+
ExpectedPackagesContentCache,
6+
expectedPackageDBFieldsSpecifier,
7+
pieceInstanceFieldsSpecifier,
8+
rundownPlaylistFieldSpecifier,
9+
} from './contentCache'
10+
import { ExpectedPackages, PieceInstances, RundownPlaylists } from '../../../collections'
611
import { waitForAllObserversReady } from '../../lib/lib'
12+
import { ReactiveMongoObserverGroup, ReactiveMongoObserverGroupHandle } from '../../lib/observerGroup'
13+
import { equivalentArrays } from '@sofie-automation/shared-lib/dist/lib/lib'
14+
import _ from 'underscore'
15+
16+
const REACTIVITY_DEBOUNCE = 20
717

818
export class ExpectedPackagesContentObserver implements Meteor.LiveQueryHandle {
919
#observers: Meteor.LiveQueryHandle[] = []
1020
#cache: ExpectedPackagesContentCache
1121

22+
#partInstanceIds: PartInstanceId[] = []
23+
#partInstanceIdObserver!: ReactiveMongoObserverGroupHandle
24+
25+
#disposed = false
26+
1227
private constructor(cache: ExpectedPackagesContentCache) {
1328
this.#cache = cache
1429
}
@@ -21,7 +36,27 @@ export class ExpectedPackagesContentObserver implements Meteor.LiveQueryHandle {
2136

2237
const observer = new ExpectedPackagesContentObserver(cache)
2338

39+
// Run the ShowStyleBase query in a ReactiveMongoObserverGroup, so that it can be restarted whenever
40+
observer.#partInstanceIdObserver = await ReactiveMongoObserverGroup(async () => {
41+
// Clear already cached data
42+
cache.PieceInstances.remove({})
43+
44+
return [
45+
PieceInstances.observeChanges(
46+
{
47+
// We can use the `this.#partInstanceIds` here, as this is restarted every time that property changes
48+
partInstanceId: { $in: observer.#partInstanceIds },
49+
},
50+
cache.PieceInstances.link(),
51+
{
52+
projection: pieceInstanceFieldsSpecifier,
53+
}
54+
),
55+
]
56+
})
57+
2458
// Subscribe to the database, and pipe any updates into the ReactiveCacheCollections
59+
// This takes ownership of the #partInstanceIdObserver, and will stop it if this throws
2560
observer.#observers = await waitForAllObserversReady([
2661
ExpectedPackages.observeChanges(
2762
{
@@ -32,16 +67,60 @@ export class ExpectedPackagesContentObserver implements Meteor.LiveQueryHandle {
3267
projection: expectedPackageDBFieldsSpecifier,
3368
}
3469
),
70+
71+
RundownPlaylists.observeChanges(
72+
{
73+
studioId: studioId,
74+
},
75+
cache.RundownPlaylists.link(() => {
76+
observer.updatePartInstanceIds()
77+
}),
78+
{
79+
fields: rundownPlaylistFieldSpecifier,
80+
}
81+
),
82+
83+
observer.#partInstanceIdObserver,
3584
])
3685

3786
return observer
3887
}
3988

89+
private updatePartInstanceIds = _.debounce(
90+
Meteor.bindEnvironment(() => {
91+
if (this.#disposed) return
92+
93+
const newPartInstanceIdsSet = new Set<PartInstanceId>()
94+
95+
this.#cache.RundownPlaylists.find({}).forEach((playlist) => {
96+
if (playlist.activationId) {
97+
if (playlist.nextPartInfo) {
98+
newPartInstanceIdsSet.add(playlist.nextPartInfo.partInstanceId)
99+
}
100+
if (playlist.currentPartInfo) {
101+
newPartInstanceIdsSet.add(playlist.currentPartInfo.partInstanceId)
102+
}
103+
}
104+
})
105+
106+
const newPartInstanceIds = Array.from(newPartInstanceIdsSet)
107+
108+
if (!equivalentArrays(newPartInstanceIds, this.#partInstanceIds)) {
109+
this.#partInstanceIds = newPartInstanceIds
110+
// trigger the rundown group to restart
111+
this.#partInstanceIdObserver.restart()
112+
}
113+
}),
114+
REACTIVITY_DEBOUNCE
115+
)
116+
40117
public get cache(): ExpectedPackagesContentCache {
41118
return this.#cache
42119
}
43120

44121
public stop = (): void => {
122+
this.#disposed = true
123+
45124
this.#observers.forEach((observer) => observer.stop())
46125
}
47126
}

meteor/server/publications/packageManager/expectedPackages/generate.ts

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { DBStudio, StudioLight, StudioPackageContainer } from '@sofie-automation
1414
import { clone, omit } from '../../../lib/tempLib'
1515
import { CustomPublishCollection } from '../../../lib/customPublication'
1616
import { logger } from '../../../logging'
17-
import { ExpectedPackagesContentCache } from './contentCache'
17+
import { ExpectedPackageDBCompact, ExpectedPackagesContentCache } from './contentCache'
1818
import type { StudioFields } from './publication'
1919

2020
/**
@@ -60,14 +60,9 @@ export async function updateCollectionForExpectedPackageIds(
6060

6161
const routedPackage = generateExpectedPackageForDevice(
6262
studio,
63-
{
64-
...packageDoc.package,
65-
_id: unprotectString(packageDoc._id),
66-
},
63+
packageDoc,
6764
deviceId,
6865
null,
69-
// nocommit - how to handle this priority in the new flow?
70-
Priorities.OTHER, // low priority
7166
packageContainers
7267
)
7368

@@ -88,7 +83,7 @@ export async function updateCollectionForExpectedPackageIds(
8883
})
8984
}
9085

91-
enum Priorities {
86+
export enum ExpectedPackagePriorities {
9287
// Lower priorities are done first
9388

9489
/** Highest priority */
@@ -103,16 +98,15 @@ function generateExpectedPackageForDevice(
10398
StudioLight,
10499
'_id' | 'packageContainersWithOverrides' | 'previewContainerIds' | 'thumbnailContainerIds'
105100
>,
106-
expectedPackage: PackageManagerExpectedPackageBase,
101+
expectedPackage: ExpectedPackageDBCompact,
107102
deviceId: PeripheralDeviceId,
108103
pieceInstanceId: PieceInstanceId | null,
109-
priority: Priorities,
110104
packageContainers: Record<string, StudioPackageContainer>
111105
): PackageManagerExpectedPackage {
112106
// Lookup Package sources:
113107
const combinedSources: PackageContainerOnPackage[] = []
114108

115-
for (const packageSource of expectedPackage.sources) {
109+
for (const packageSource of expectedPackage.package.sources) {
116110
const lookedUpSource = packageContainers[packageSource.containerId]
117111
if (lookedUpSource) {
118112
combinedSources.push(calculateCombinedSource(packageSource, lookedUpSource))
@@ -130,25 +124,26 @@ function generateExpectedPackageForDevice(
130124
}
131125

132126
// Lookup Package targets:
133-
const combinedTargets = calculateCombinedTargets(expectedPackage, deviceId, packageContainers)
127+
const combinedTargets = calculateCombinedTargets(expectedPackage.package, deviceId, packageContainers)
134128

135-
if (!combinedSources.length && expectedPackage.sources.length !== 0) {
129+
if (!combinedSources.length && expectedPackage.package.sources.length !== 0) {
136130
logger.warn(`Pub.expectedPackagesForDevice: No sources found for "${expectedPackage._id}"`)
137131
}
138132
if (!combinedTargets.length) {
139133
logger.warn(`Pub.expectedPackagesForDevice: No targets found for "${expectedPackage._id}"`)
140134
}
141-
const packageSideEffect = getSideEffect(expectedPackage, studio)
135+
const packageSideEffect = getSideEffect(expectedPackage.package, studio)
142136

143137
return {
144138
_id: protectString(`${expectedPackage._id}_${deviceId}_${pieceInstanceId}`),
139+
expectedPackageId: expectedPackage._id,
145140
expectedPackage: {
146-
...expectedPackage,
141+
...expectedPackage.package,
147142
sideEffect: packageSideEffect,
148143
},
149144
sources: combinedSources,
150145
targets: combinedTargets,
151-
priority: priority,
146+
priority: ExpectedPackagePriorities.OTHER, // This gets overriden later if needed
152147
playoutDeviceId: deviceId,
153148
pieceInstanceId,
154149
}

meteor/server/publications/packageManager/expectedPackages/publication.ts

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,19 @@ import { logger } from '../../../logging'
1111
import { ReadonlyDeep } from 'type-fest'
1212
import { applyAndValidateOverrides } from '@sofie-automation/corelib/dist/settings/objectWithOverrides'
1313
import { MongoFieldSpecifierOnesStrict } from '@sofie-automation/corelib/dist/mongo'
14-
import { ExpectedPackageId, PeripheralDeviceId, StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids'
14+
import {
15+
ExpectedPackageId,
16+
PeripheralDeviceId,
17+
PieceInstanceId,
18+
StudioId,
19+
} from '@sofie-automation/corelib/dist/dataModel/Ids'
1520
import { Studios } from '../../../collections'
1621
import { check, Match } from 'meteor/check'
1722
import { PackageManagerExpectedPackage } from '@sofie-automation/shared-lib/dist/package-manager/publications'
1823
import { ExpectedPackagesContentObserver } from './contentObserver'
1924
import { createReactiveContentCache, ExpectedPackagesContentCache } from './contentCache'
2025
import { buildMappingsToDeviceIdMap } from './util'
21-
import { updateCollectionForExpectedPackageIds } from './generate'
26+
import { ExpectedPackagePriorities, updateCollectionForExpectedPackageIds } from './generate'
2227
import {
2328
PeripheralDevicePubSub,
2429
PeripheralDevicePubSubCollectionsNames,
@@ -37,6 +42,7 @@ interface ExpectedPackagesPublicationUpdateProps {
3742
newCache: ExpectedPackagesContentCache
3843

3944
invalidateExpectedPackageIds?: ExpectedPackageId[]
45+
invalidatePieceInstanceIds?: PieceInstanceId[]
4046
}
4147

4248
interface ExpectedPackagesPublicationState {
@@ -81,6 +87,11 @@ async function setupExpectedPackagesPublicationObservers(
8187
changed: (id) => triggerUpdate({ invalidateExpectedPackageIds: [protectString<ExpectedPackageId>(id)] }),
8288
removed: (id) => triggerUpdate({ invalidateExpectedPackageIds: [protectString<ExpectedPackageId>(id)] }),
8389
}),
90+
contentCache.PieceInstances.find({}).observeChanges({
91+
added: (id) => triggerUpdate({ invalidatePieceInstanceIds: [protectString<PieceInstanceId>(id)] }),
92+
changed: (id) => triggerUpdate({ invalidatePieceInstanceIds: [protectString<PieceInstanceId>(id)] }),
93+
removed: (id) => triggerUpdate({ invalidatePieceInstanceIds: [protectString<PieceInstanceId>(id)] }),
94+
}),
8495

8596
Studios.observeChanges(
8697
args.studioId,
@@ -151,7 +162,7 @@ async function manipulateExpectedPackagesPublicationData(
151162

152163
let regenerateExpectedPackageIds: Set<ExpectedPackageId>
153164
if (invalidateAllItems) {
154-
// force every piece to be regenerated
165+
// force every package to be regenerated
155166
collection.remove(null)
156167
regenerateExpectedPackageIds = new Set(state.contentCache.ExpectedPackages.find({}).map((p) => p._id))
157168
} else {
@@ -168,6 +179,49 @@ async function manipulateExpectedPackagesPublicationData(
168179
args.filterPlayoutDeviceIds,
169180
regenerateExpectedPackageIds
170181
)
182+
183+
// Ensure the priorities are correct for the packages
184+
// We can do this as a post-step, as it means we can generate the packages solely based on the content
185+
// If one gets regenerated, its priority will be reset to OTHER. But as it has already changed, this fixup is 'free'
186+
// For those not regenerated, we can set the priority to the correct value if it has changed, without any deeper checks
187+
updatePackagePriorities(state.contentCache, collection)
188+
}
189+
190+
function updatePackagePriorities(
191+
contentCache: ReadonlyDeep<ExpectedPackagesContentCache>,
192+
collection: CustomPublishCollection<PackageManagerExpectedPackage>
193+
) {
194+
const highPriorityPackages = new Map<ExpectedPackageId, ExpectedPackagePriorities>()
195+
196+
// Compile the map of the expected priority of each package
197+
const knownPieceInstances = contentCache.PieceInstances.find({})
198+
const playlist = contentCache.RundownPlaylists.findOne({})
199+
const currentPartInstanceId = playlist?.currentPartInfo?.partInstanceId
200+
for (const pieceInstance of knownPieceInstances) {
201+
const packageIds = pieceInstance.neededExpectedPackageIds
202+
if (!packageIds) continue
203+
204+
const packagePriority =
205+
pieceInstance.partInstanceId === currentPartInstanceId
206+
? ExpectedPackagePriorities.PLAYOUT_CURRENT
207+
: ExpectedPackagePriorities.PLAYOUT_NEXT
208+
209+
for (const packageId of packageIds) {
210+
const existingPriority = highPriorityPackages.get(packageId) ?? ExpectedPackagePriorities.OTHER
211+
highPriorityPackages.set(packageId, Math.min(existingPriority, packagePriority))
212+
}
213+
}
214+
215+
// Iterate through and update each package
216+
collection.updateAll((pkg) => {
217+
const expectedPriority = highPriorityPackages.get(pkg.expectedPackageId) ?? ExpectedPackagePriorities.OTHER
218+
if (pkg.priority === expectedPriority) return false
219+
220+
return {
221+
...pkg,
222+
priority: expectedPriority,
223+
}
224+
})
171225
}
172226

173227
meteorCustomPublish(

packages/shared-lib/src/package-manager/publications.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ExpectedPackage, PackageContainer, PackageContainerOnPackage } from './package'
2-
import { PeripheralDeviceId, PieceInstanceId, RundownId, RundownPlaylistId } from '../core/model/Ids'
2+
import { ExpectedPackageId, PeripheralDeviceId, PieceInstanceId, RundownId, RundownPlaylistId } from '../core/model/Ids'
33
import { ProtectedString } from '../lib/protectedString'
44
import { ReadonlyDeep } from 'type-fest'
55

@@ -34,6 +34,8 @@ export interface PackageManagerExpectedPackage {
3434
/** Unique id of the expectedPackage */
3535
_id: PackageManagerExpectedPackageId
3636

37+
expectedPackageId: ExpectedPackageId
38+
3739
expectedPackage: PackageManagerExpectedPackageBase
3840
/** Lower should be done first */
3941
priority: number

0 commit comments

Comments
 (0)