Skip to content

Commit 3b0cbdc

Browse files
committed
Merge pull request #1325 from nrkno/fix/segment-not-found-issues
2 parents d908ac6 + 93f4b9d commit 3b0cbdc

File tree

9 files changed

+203
-60
lines changed

9 files changed

+203
-60
lines changed

packages/corelib/src/dataModel/Segment.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { SegmentNote } from './Notes'
55
export enum SegmentOrphanedReason {
66
/** Segment is deleted from the NRCS but we still need it */
77
DELETED = 'deleted',
8-
/** Segment should be hidden, but it is still playing */
8+
/** Blueprints want the Segment to be hidden, but it is still playing so is must not be hidden right now. */
99
HIDDEN = 'hidden',
1010
/** Segment is owned by playout, and is for AdlibTesting in its rundown */
1111
ADLIB_TESTING = 'adlib-testing',

packages/job-worker/src/ingest/commit.ts

Lines changed: 102 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
4343
import { DatabasePersistedModel } from '../modelBase'
4444
import { updateSegmentIdsForAdlibbedPartInstances } from './commit/updateSegmentIdsForAdlibbedPartInstances'
4545
import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError'
46+
import { AnyBulkWriteOperation } from 'mongodb'
4647

4748
export type BeforePartMapItem = { id: PartId; rank: number }
4849
export type BeforeIngestOperationPartMap = ReadonlyMap<SegmentId, Array<BeforePartMapItem>>
@@ -177,6 +178,9 @@ export async function CommitIngestOperation(
177178
// Ensure any adlibbed parts are updated to follow the segmentId of the previous part
178179
await updateSegmentIdsForAdlibbedPartInstances(context, ingestModel, beforePartMap)
179180

181+
if (data.renamedSegments && data.renamedSegments.size > 0) {
182+
logger.debug(`Renamed segments: ${JSON.stringify(Array.from(data.renamedSegments.entries()))}`)
183+
}
180184
// ensure instances have matching segmentIds with the parts
181185
await updatePartInstancesSegmentIds(context, ingestModel, data.renamedSegments, beforePartMap)
182186

@@ -259,10 +263,16 @@ export async function CommitIngestOperation(
259263
}
260264

261265
function canRemoveSegment(
266+
prevPartInstance: ReadonlyDeep<DBPartInstance> | undefined,
262267
currentPartInstance: ReadonlyDeep<DBPartInstance> | undefined,
263268
nextPartInstance: ReadonlyDeep<DBPartInstance> | undefined,
264269
segmentId: SegmentId
265270
): boolean {
271+
if (prevPartInstance?.segmentId === segmentId) {
272+
// Don't allow removing an active rundown
273+
logger.warn(`Not allowing removal of previous playing segment "${segmentId}", making segment unsynced instead`)
274+
return false
275+
}
266276
if (
267277
currentPartInstance?.segmentId === segmentId ||
268278
(nextPartInstance?.segmentId === segmentId && isTooCloseToAutonext(currentPartInstance, false))
@@ -295,26 +305,32 @@ async function updatePartInstancesSegmentIds(
295305
renamedSegments: ReadonlyMap<SegmentId, SegmentId> | null,
296306
beforePartMap: BeforeIngestOperationPartMap
297307
) {
298-
// A set of rules which can be translated to mongo queries for PartInstances to update
308+
/**
309+
* Maps new SegmentId ->
310+
* A set of rules which can be translated to mongo queries for PartInstances to update
311+
*/
299312
const renameRules = new Map<
300313
SegmentId,
301314
{
315+
/** Parts that have been moved to the new SegmentId */
302316
partIds: PartId[]
303-
fromSegmentId: SegmentId | null
317+
/** Segments that have been renamed to the new SegmentId */
318+
fromSegmentIds: SegmentId[]
304319
}
305320
>()
306321

307322
// Add whole segment renames to the set of rules
308323
if (renamedSegments) {
309324
for (const [fromSegmentId, toSegmentId] of renamedSegments) {
310-
const rule = renameRules.get(toSegmentId) ?? { partIds: [], fromSegmentId: null }
325+
const rule = renameRules.get(toSegmentId) ?? { partIds: [], fromSegmentIds: [] }
311326
renameRules.set(toSegmentId, rule)
312327

313-
rule.fromSegmentId = fromSegmentId
328+
rule.fromSegmentIds.push(fromSegmentId)
314329
}
315330
}
316331

317-
// Reverse the structure
332+
// Reverse the Map structure
333+
/** Maps Part -> SegmentId-of-the-part-before-ingest-changes */
318334
const beforePartSegmentIdMap = new Map<PartId, SegmentId>()
319335
for (const [segmentId, partItems] of beforePartMap.entries()) {
320336
for (const partItem of partItems) {
@@ -325,8 +341,11 @@ async function updatePartInstancesSegmentIds(
325341
// Some parts may have gotten a different segmentId to the base rule, so track those seperately in the rules
326342
for (const partModel of ingestModel.getAllOrderedParts()) {
327343
const oldSegmentId = beforePartSegmentIdMap.get(partModel.part._id)
344+
328345
if (oldSegmentId && oldSegmentId !== partModel.part.segmentId) {
329-
const rule = renameRules.get(partModel.part.segmentId) ?? { partIds: [], fromSegmentId: null }
346+
// The part has moved to another segment, add a rule to update its corresponding PartInstances:
347+
348+
const rule = renameRules.get(partModel.part.segmentId) ?? { partIds: [], fromSegmentIds: [] }
330349
renameRules.set(partModel.part.segmentId, rule)
331350

332351
rule.partIds.push(partModel.part._id)
@@ -335,30 +354,80 @@ async function updatePartInstancesSegmentIds(
335354

336355
// Perform a mongo update to modify the PartInstances
337356
if (renameRules.size > 0) {
338-
await context.directCollections.PartInstances.bulkWrite(
339-
Array.from(renameRules.entries()).map(([newSegmentId, rule]) => ({
340-
updateMany: {
341-
filter: {
342-
$or: _.compact([
343-
rule.fromSegmentId
344-
? {
345-
segmentId: rule.fromSegmentId,
346-
}
347-
: undefined,
348-
{
349-
'part._id': { $in: rule.partIds },
357+
const rulesInOrder = Array.from(renameRules.entries()).sort((a, b) => {
358+
// Ensure that the ones with partIds are processed last,
359+
// as that should take precedence.
360+
361+
if (a[1].partIds.length && !b[1].partIds.length) return 1
362+
if (!a[1].partIds.length && b[1].partIds.length) return -1
363+
return 0
364+
})
365+
366+
const writeOps: AnyBulkWriteOperation<DBPartInstance>[] = []
367+
368+
for (const [newSegmentId, rule] of rulesInOrder) {
369+
if (rule.fromSegmentIds.length) {
370+
writeOps.push({
371+
updateMany: {
372+
filter: {
373+
rundownId: ingestModel.rundownId,
374+
segmentId: { $in: rule.fromSegmentIds },
375+
},
376+
update: {
377+
$set: {
378+
segmentId: newSegmentId,
379+
'part.segmentId': newSegmentId,
350380
},
351-
]),
381+
},
352382
},
353-
update: {
354-
$set: {
355-
segmentId: newSegmentId,
356-
'part.segmentId': newSegmentId,
383+
})
384+
}
385+
if (rule.partIds.length) {
386+
writeOps.push({
387+
updateMany: {
388+
filter: {
389+
rundownId: ingestModel.rundownId,
390+
'part._id': { $in: rule.partIds },
391+
},
392+
update: {
393+
$set: {
394+
segmentId: newSegmentId,
395+
'part.segmentId': newSegmentId,
396+
},
357397
},
358398
},
359-
},
360-
}))
361-
)
399+
})
400+
}
401+
}
402+
if (writeOps.length) await context.directCollections.PartInstances.bulkWrite(writeOps)
403+
404+
// Double check that there are no parts using the old segment ids:
405+
const oldSegmentIds = Array.from(renameRules.keys())
406+
const [badPartInstances, badParts] = await Promise.all([
407+
await context.directCollections.PartInstances.findFetch({
408+
rundownId: ingestModel.rundownId,
409+
segmentId: { $in: oldSegmentIds },
410+
}),
411+
await context.directCollections.Parts.findFetch({
412+
rundownId: ingestModel.rundownId,
413+
segmentId: { $in: oldSegmentIds },
414+
}),
415+
])
416+
if (badPartInstances.length > 0) {
417+
logger.error(
418+
`updatePartInstancesSegmentIds: Failed to update all PartInstances using old SegmentIds "${JSON.stringify(
419+
oldSegmentIds
420+
)}": ${JSON.stringify(badPartInstances)}, writeOps: ${JSON.stringify(writeOps)}`
421+
)
422+
}
423+
424+
if (badParts.length > 0) {
425+
logger.error(
426+
`updatePartInstancesSegmentIds: Failed to update all Parts using old SegmentIds "${JSON.stringify(
427+
oldSegmentIds
428+
)}": ${JSON.stringify(badParts)}, writeOps: ${JSON.stringify(writeOps)}`
429+
)
430+
}
362431
}
363432
}
364433

@@ -662,7 +731,7 @@ async function removeSegments(
662731
_changedSegmentIds: ReadonlyDeep<SegmentId[]>,
663732
removedSegmentIds: ReadonlyDeep<SegmentId[]>
664733
) {
665-
const { currentPartInstance, nextPartInstance } = await getSelectedPartInstances(
734+
const { previousPartInstance, currentPartInstance, nextPartInstance } = await getSelectedPartInstances(
666735
context,
667736
newPlaylist,
668737
rundownsInPlaylist.map((r) => r._id)
@@ -672,7 +741,7 @@ async function removeSegments(
672741
const orphanDeletedSegmentIds = new Set<SegmentId>()
673742
const orphanHiddenSegmentIds = new Set<SegmentId>()
674743
for (const segmentId of removedSegmentIds) {
675-
if (canRemoveSegment(currentPartInstance, nextPartInstance, segmentId)) {
744+
if (canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
676745
purgeSegmentIds.add(segmentId)
677746
} else {
678747
logger.warn(
@@ -685,8 +754,10 @@ async function removeSegments(
685754
for (const segment of ingestModel.getAllSegments()) {
686755
const segmentId = segment.segment._id
687756
if (segment.segment.isHidden) {
688-
if (!canRemoveSegment(currentPartInstance, nextPartInstance, segmentId)) {
689-
// Protect live segment from being hidden
757+
// Blueprints want to hide the Segment
758+
759+
if (!canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
760+
// The Segment is live, so we need to protect it from being hidden
690761
logger.warn(`Cannot hide live segment ${segmentId}, it will be orphaned`)
691762
switch (segment.segment.orphaned) {
692763
case SegmentOrphanedReason.DELETED:
@@ -706,7 +777,7 @@ async function removeSegments(
706777
} else if (!orphanDeletedSegmentIds.has(segmentId) && segment.parts.length === 0) {
707778
// No parts in segment
708779

709-
if (!canRemoveSegment(currentPartInstance, nextPartInstance, segmentId)) {
780+
if (!canRemoveSegment(previousPartInstance, currentPartInstance, nextPartInstance, segmentId)) {
710781
// Protect live segment from being hidden
711782
logger.warn(`Cannot hide live segment ${segmentId}, it will be orphaned`)
712783
orphanHiddenSegmentIds.add(segmentId)

packages/job-worker/src/ingest/model/IngestSegmentModel.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export interface IngestSegmentModel extends IngestSegmentModelReadonly {
6767
setOrphaned(orphaned: SegmentOrphanedReason | undefined): void
6868

6969
/**
70-
* Mark this Part as being hidden
70+
* Mark this Segment as being hidden
7171
* @param hidden New hidden state
7272
*/
7373
setHidden(hidden: boolean): void

packages/job-worker/src/ingest/model/implementation/DocumentChangeTracker.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@ export class DocumentChangeTracker<TDoc extends { _id: ProtectedString<any> }> {
9494
}
9595
}
9696

97+
getDeletedIds(): TDoc['_id'][] {
98+
return Array.from(this.#deletedIds.values())
99+
}
100+
97101
/**
98102
* Generate the mongodb BulkWrite operations for the documents known to this tracker
99103
* @returns mongodb BulkWrite operations

packages/job-worker/src/ingest/model/implementation/SaveIngestModel.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { JobContext } from '../../../jobs'
1111
import { ExpectedPackagesStore } from './ExpectedPackagesStore'
1212
import { IngestSegmentModelImpl } from './IngestSegmentModelImpl'
1313
import { DocumentChangeTracker } from './DocumentChangeTracker'
14+
import { logger } from '../../../logging'
15+
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
1416

1517
export class SaveIngestModelHelper {
1618
#expectedPackages = new DocumentChangeTracker<ExpectedPackageDB>()
@@ -55,6 +57,23 @@ export class SaveIngestModelHelper {
5557
}
5658

5759
commit(context: JobContext): Array<Promise<unknown>> {
60+
// Log deleted ids:
61+
const deletedIds: { [key: string]: ProtectedString<any>[] } = {
62+
expectedPackages: this.#expectedPackages.getDeletedIds(),
63+
expectedPlayoutItems: this.#expectedPlayoutItems.getDeletedIds(),
64+
expectedMediaItems: this.#expectedMediaItems.getDeletedIds(),
65+
segments: this.#segments.getDeletedIds(),
66+
parts: this.#parts.getDeletedIds(),
67+
pieces: this.#pieces.getDeletedIds(),
68+
adLibPieces: this.#adLibPieces.getDeletedIds(),
69+
adLibActions: this.#adLibActions.getDeletedIds(),
70+
}
71+
for (const [key, ids] of Object.entries<ProtectedString<any>[]>(deletedIds)) {
72+
if (ids.length > 0) {
73+
logger.debug(`Deleted ${key}: ${JSON.stringify(ids)} `)
74+
}
75+
}
76+
5877
return [
5978
context.directCollections.ExpectedPackages.bulkWrite(this.#expectedPackages.generateWriteOps()),
6079
context.directCollections.ExpectedPlayoutItems.bulkWrite(this.#expectedPlayoutItems.generateWriteOps()),

packages/job-worker/src/playout/infinites.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { SegmentOrphanedReason } from '@sofie-automation/corelib/dist/dataModel/
2323
import { sortRundownIDsInPlaylist } from '@sofie-automation/corelib/dist/playout/playlist'
2424
import { mongoWhere } from '@sofie-automation/corelib/dist/mongo'
2525
import { PlayoutRundownModel } from './model/PlayoutRundownModel'
26+
import { logger } from '../logging'
2627

2728
/** When we crop a piece, set the piece as "it has definitely ended" this far into the future. */
2829
export const DEFINITELY_ENDED_FUTURE_DURATION = 1 * 1000
@@ -330,7 +331,26 @@ export function getPieceInstancesForPart(
330331
if (!playingRundown) throw new Error(`Rundown "${playingPartInstance.partInstance.rundownId}" not found!`)
331332

332333
playingSegment = playingRundown.getSegment(playingPartInstance.partInstance.segmentId)
333-
if (!playingSegment) throw new Error(`Segment "${playingPartInstance.partInstance.segmentId}" not found!`)
334+
if (!playingSegment) {
335+
const rundownId = playingRundown.rundown._id
336+
context.directCollections.Segments.findFetch({
337+
rundownId: rundownId,
338+
})
339+
.then((segment) => {
340+
logger.error(
341+
`TROUBLESHOOT: Segment not found, rundown "${rundownId}", segments in db: ${JSON.stringify(
342+
segment.map((s) => s._id)
343+
)}`
344+
)
345+
})
346+
.catch((e) => logger.error(e))
347+
348+
throw new Error(
349+
`Segment "${playingPartInstance.partInstance.segmentId}" in Rundown "${
350+
playingRundown.rundown._id
351+
}" not found! (other segments: ${JSON.stringify(playingRundown.segments.map((s) => s.segment._id))})`
352+
)
353+
}
334354
}
335355

336356
const segment = rundown.getSegment(part.segmentId)

packages/job-worker/src/playout/model/implementation/LoadPlayoutModel.ts

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { PlayoutModel, PlayoutModelPreInit } from '../PlayoutModel'
2323
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
2424
import { RundownBaselineObj } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineObj'
2525
import { sortRundownsWithinPlaylist } from '@sofie-automation/corelib/dist/playout/playlist'
26+
import { logger } from '../../../logging'
2627

2728
/**
2829
* Load a PlayoutModelPreInit for the given RundownPlaylist
@@ -188,7 +189,7 @@ async function loadRundowns(
188189
context.directCollections.Segments.findFetch({
189190
$or: [
190191
{
191-
// In a different rundown
192+
// Either in rundown when ingestModel === null or not available in ingestModel
192193
rundownId: { $in: loadRundownIds },
193194
},
194195
{
@@ -233,14 +234,25 @@ async function loadRundowns(
233234
}
234235
}
235236

236-
return rundowns.map(
237-
(rundown) =>
238-
new PlayoutRundownModelImpl(
239-
rundown,
240-
groupedSegmentsWithParts.get(rundown._id) ?? [],
241-
groupedBaselineObjects.get(rundown._id) ?? []
237+
return rundowns.map((rundown) => {
238+
const groupedSegmentsWithPartsForRundown = groupedSegmentsWithParts.get(rundown._id)
239+
if (!groupedSegmentsWithPartsForRundown) {
240+
logger.debug(
241+
`groupedSegmentsWithPartsForRundown for Rundown "${rundown._id}" is undefined (has the rundown no segments?)`
242242
)
243-
)
243+
}
244+
const groupedBaselineObjectsForRundown = groupedBaselineObjects.get(rundown._id)
245+
if (!groupedBaselineObjectsForRundown)
246+
logger.debug(
247+
`groupedBaselineObjectsForRundown for Rundown "${rundown._id}" is undefined (has the rundown no baseline objects?)`
248+
)
249+
250+
return new PlayoutRundownModelImpl(
251+
rundown,
252+
groupedSegmentsWithPartsForRundown ?? [],
253+
groupedBaselineObjectsForRundown ?? []
254+
)
255+
})
244256
}
245257

246258
async function loadPartInstances(

0 commit comments

Comments
 (0)