Skip to content

Commit 53486ca

Browse files
committed
wip
1 parent a3df58a commit 53486ca

File tree

9 files changed

+80
-20
lines changed

9 files changed

+80
-20
lines changed

packages/job-worker/src/ingest/commit.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import {
1919
removeRundownFromDb,
2020
} from '../rundownPlaylists'
2121
import { ReadonlyDeep } from 'type-fest'
22-
import { IngestModel, IngestModelReadonly } from './model/IngestModel'
22+
import { IngestDatabasePersistedModel, IngestModel, IngestModelReadonly } from './model/IngestModel'
2323
import { JobContext } from '../jobs'
2424
import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist'
2525
import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance'
@@ -40,7 +40,6 @@ import { PlayoutRundownModelImpl } from '../playout/model/implementation/Playout
4040
import { PlayoutSegmentModelImpl } from '../playout/model/implementation/PlayoutSegmentModelImpl'
4141
import { createPlayoutModelFromIngestModel } from '../playout/model/implementation/LoadPlayoutModel'
4242
import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part'
43-
import { DatabasePersistedModel } from '../modelBase'
4443
import { updateSegmentIdsForAdlibbedPartInstances } from './commit/updateSegmentIdsForAdlibbedPartInstances'
4544
import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError'
4645
import { AnyBulkWriteOperation } from 'mongodb'
@@ -64,7 +63,7 @@ interface PlaylistIdPair {
6463
*/
6564
export async function CommitIngestOperation(
6665
context: JobContext,
67-
ingestModel: IngestModel & DatabasePersistedModel,
66+
ingestModel: IngestModel & IngestDatabasePersistedModel,
6867
beforeRundown: ReadonlyDeep<DBRundown> | undefined,
6968
beforePartMap: BeforeIngestOperationPartMap,
7069
data: ReadonlyDeep<CommitIngestData>
@@ -223,7 +222,7 @@ export async function CommitIngestOperation(
223222
)
224223

225224
// Start the save
226-
const pSaveIngest = ingestModel.saveAllToDatabase()
225+
const pSaveIngest = ingestModel.saveAllToDatabase(playlistLock)
227226
pSaveIngest.catch(() => null) // Ensure promise isn't reported as unhandled
228227

229228
await validateAdlibTestingSegment(context, playoutModel)

packages/job-worker/src/ingest/model/IngestExpectedPackage.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import type {
88
import type { ExpectedPackageId } from '@sofie-automation/corelib/dist/dataModel/Ids'
99
import type { ReadonlyDeep } from 'type-fest'
1010

11+
/**
12+
* A simpler form of ExpectedPackageDB that is scoped to the properties relevant to ingest.
13+
*/
1114
export interface IngestExpectedPackage<
1215
TPackageSource extends { fromPieceType: ExpectedPackageDBType } =
1316
| ExpectedPackageIngestSourcePart

packages/job-worker/src/ingest/model/IngestModel.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { CoreUserEditingDefinition } from '@sofie-automation/corelib/dist/dataMo
1414
import { RundownBaselineAdLibAction } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibAction'
1515
import { RundownBaselineAdLibItem } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibPiece'
1616
import { LazyInitialiseReadonly } from '../../lib/lazy'
17-
import { RundownLock } from '../../jobs/lock'
17+
import type { PlaylistLock, RundownLock } from '../../jobs/lock'
1818
import { IngestSegmentModel, IngestSegmentModelReadonly } from './IngestSegmentModel'
1919
import { IngestPartModel, IngestPartModelReadonly } from './IngestPartModel'
2020
import { ReadonlyDeep } from 'type-fest'
@@ -265,3 +265,10 @@ export interface IngestModel extends IngestModelReadonly, BaseModel, INotificati
265265
}
266266

267267
export type IngestReplaceSegmentType = Omit<DBSegment, '_id' | 'rundownId'>
268+
269+
export interface IngestDatabasePersistedModel {
270+
/**
271+
* Issue a save of the contents of this model to the database
272+
*/
273+
saveAllToDatabase(lock: PlaylistLock): Promise<void>
274+
}

packages/job-worker/src/ingest/model/implementation/DocumentChangeTracker.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,35 @@ export class DocumentChangeTracker<TDoc extends { _id: ProtectedString<any> }> {
125125

126126
return ops
127127
}
128+
129+
/**
130+
* Generate the mongodb BulkWrite operations for the documents known to this tracker
131+
* @returns mongodb BulkWrite operations
132+
*/
133+
generateWriteOpsWithTransform<TDoc2 extends { _id: ProtectedString<any> } = TDoc>(
134+
transformDocs: (doc: TDoc[]) => TDoc2[]
135+
): AnyBulkWriteOperation<TDoc2>[] {
136+
const ops: AnyBulkWriteOperation<TDoc2>[] = []
137+
138+
const transformedDocs = transformDocs(Array.from(this.#documentsToSave.values()))
139+
for (const doc of transformedDocs) {
140+
ops.push({
141+
replaceOne: {
142+
filter: { _id: doc._id },
143+
replacement: doc,
144+
upsert: true,
145+
},
146+
})
147+
}
148+
149+
if (this.#deletedIds.size > 0) {
150+
ops.push({
151+
deleteMany: {
152+
filter: { _id: { $in: Array.from(this.#deletedIds) as any } },
153+
},
154+
})
155+
}
156+
157+
return ops
158+
}
128159
}

packages/job-worker/src/ingest/model/implementation/IngestModelImpl.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment'
3030
import { JobContext, ProcessedShowStyleBase, ProcessedShowStyleVariant } from '../../../jobs'
3131
import { LazyInitialise, LazyInitialiseReadonly } from '../../../lib/lazy'
3232
import { getRundownId, getSegmentId } from '../../lib'
33-
import { RundownLock } from '../../../jobs/lock'
33+
import { PlaylistLock, RundownLock } from '../../../jobs/lock'
3434
import { IngestSegmentModel } from '../IngestSegmentModel'
3535
import { IngestSegmentModelImpl } from './IngestSegmentModelImpl'
3636
import { IngestPartModel } from '../IngestPartModel'
@@ -44,10 +44,9 @@ import {
4444
literal,
4545
} from '@sofie-automation/corelib/dist/lib'
4646
import { IngestPartModelImpl } from './IngestPartModelImpl'
47-
import { DatabasePersistedModel } from '../../../modelBase'
4847
import { ExpectedPackagesStore } from './ExpectedPackagesStore'
4948
import { ReadonlyDeep } from 'type-fest'
50-
import { IngestModel, IngestReplaceSegmentType } from '../IngestModel'
49+
import { IngestDatabasePersistedModel, IngestModel, IngestReplaceSegmentType } from '../IngestModel'
5150
import { RundownNote } from '@sofie-automation/corelib/dist/dataModel/Notes'
5251
import { diffAndReturnLatestObjects } from './utils'
5352
import _ = require('underscore')
@@ -85,7 +84,7 @@ interface SegmentWrapper {
8584
/**
8685
* Cache of relevant documents for an Ingest Operation
8786
*/
88-
export class IngestModelImpl implements IngestModel, DatabasePersistedModel {
87+
export class IngestModelImpl implements IngestModel, IngestDatabasePersistedModel {
8988
public readonly isIngest = true
9089

9190
public readonly rundownLock: RundownLock
@@ -638,7 +637,7 @@ export class IngestModelImpl implements IngestModel, DatabasePersistedModel {
638637
this.#disposed = true
639638
}
640639

641-
async saveAllToDatabase(): Promise<void> {
640+
async saveAllToDatabase(playlistLock: PlaylistLock): Promise<void> {
642641
if (this.#disposed) {
643642
throw new Error('Cannot save disposed IngestModel')
644643
}
@@ -647,6 +646,10 @@ export class IngestModelImpl implements IngestModel, DatabasePersistedModel {
647646
throw new Error('Cannot save changes with released RundownLock')
648647
}
649648

649+
if (this.#rundownImpl && playlistLock.playlistId !== this.#rundownImpl.playlistId) {
650+
throw new Error('Cannot save changes with incorrect PlaylistLock')
651+
}
652+
650653
const span = this.context.startSpan('IngestModelImpl.saveAllToDatabase')
651654

652655
// Ensure there are no duplicate part ids

packages/job-worker/src/ingest/model/implementation/LoadIngestModel.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ import { unprotectString } from '@sofie-automation/corelib/dist/protectedString'
33
import { JobContext } from '../../../jobs'
44
import { ReadonlyDeep } from 'type-fest'
55
import { RundownLock } from '../../../jobs/lock'
6-
import { IngestModel } from '../IngestModel'
7-
import { DatabasePersistedModel } from '../../../modelBase'
6+
import { IngestDatabasePersistedModel, IngestModel } from '../IngestModel'
87
import { getRundownId } from '../../lib'
98
import { ExpectedMediaItemRundown } from '@sofie-automation/corelib/dist/dataModel/ExpectedMediaItem'
109
import { ExpectedPlayoutItemRundown } from '@sofie-automation/corelib/dist/dataModel/ExpectedPlayoutItem'
@@ -24,7 +23,7 @@ export async function loadIngestModelFromRundown(
2423
context: JobContext,
2524
rundownLock: RundownLock,
2625
rundown: ReadonlyDeep<DBRundown>
27-
): Promise<IngestModel & DatabasePersistedModel> {
26+
): Promise<IngestModel & IngestDatabasePersistedModel> {
2827
const span = context.startSpan('IngestModel.loadFromRundown')
2928
if (span) span.setLabel('rundownId', unprotectString(rundown._id))
3029

@@ -58,7 +57,7 @@ export async function loadIngestModelFromRundownExternalId(
5857
context: JobContext,
5958
rundownLock: RundownLock,
6059
rundownExternalId: string
61-
): Promise<IngestModel & DatabasePersistedModel> {
60+
): Promise<IngestModel & IngestDatabasePersistedModel> {
6261
const span = context.startSpan('IngestModel.loadFromExternalId')
6362
if (span) span.setLabel('externalId', rundownExternalId)
6463

packages/job-worker/src/ingest/model/implementation/SaveIngestModel.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@ import { IngestSegmentModelImpl } from './IngestSegmentModelImpl'
1212
import { DocumentChangeTracker } from './DocumentChangeTracker'
1313
import { logger } from '../../../logging'
1414
import { ProtectedString } from '@sofie-automation/corelib/dist/protectedString'
15+
import { IngestExpectedPackage } from '../IngestExpectedPackage'
1516

1617
export class SaveIngestModelHelper {
17-
#expectedPackages = new DocumentChangeTracker<ExpectedPackageDBNew>()
18+
#expectedPackages = new DocumentChangeTracker<IngestExpectedPackage<any>>()
1819
#expectedPlayoutItems = new DocumentChangeTracker<ExpectedPlayoutItem>()
1920
#expectedMediaItems = new DocumentChangeTracker<ExpectedMediaItem>()
2021

@@ -74,7 +75,11 @@ export class SaveIngestModelHelper {
7475
}
7576

7677
return [
77-
context.directCollections.ExpectedPackages.bulkWrite(this.#expectedPackages.generateWriteOps()),
78+
context.directCollections.ExpectedPackages.bulkWrite(
79+
this.#expectedPackages.generateWriteOpsWithTransform((docs) =>
80+
transformIngestExpectedPackageToDb(context, docs)
81+
)
82+
),
7883
context.directCollections.ExpectedPlayoutItems.bulkWrite(this.#expectedPlayoutItems.generateWriteOps()),
7984
context.directCollections.ExpectedMediaItems.bulkWrite(this.#expectedMediaItems.generateWriteOps()),
8085

@@ -86,3 +91,15 @@ export class SaveIngestModelHelper {
8691
]
8792
}
8893
}
94+
95+
function transformIngestExpectedPackageToDb(
96+
context: JobContext,
97+
docs: IngestExpectedPackage<any>[]
98+
): ExpectedPackageDBNew[] {
99+
return docs.map((doc) => ({
100+
...doc,
101+
studioId: context.studioId,
102+
rundownId: null,
103+
bucketId: null,
104+
}))
105+
}

packages/job-worker/src/ingest/packageInfo.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ export async function handleExpectedPackagesRegenerate(
2626
return runWithRundownLock(context, data.rundownId, async (rundown, rundownLock) => {
2727
if (!rundown) throw new Error(`Rundown "${data.rundownId}" not found`)
2828

29+
// TODO - this needs to be run inside a playlistLock
30+
2931
const ingestModel = await loadIngestModelFromRundown(context, rundownLock, rundown)
3032

3133
// nocommit reimplement this for packages
@@ -36,7 +38,7 @@ export async function handleExpectedPackagesRegenerate(
3638

3739
await updateExpectedMediaAndPlayoutItemsForRundownBaseline(context, ingestModel, undefined)
3840

39-
await ingestModel.saveAllToDatabase()
41+
await ingestModel.saveAllToDatabase(rundownLock as any) // nocommit
4042
})
4143
}
4244

packages/job-worker/src/ingest/runOperation.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { IngestModel, IngestModelReadonly } from './model/IngestModel'
1+
import { IngestDatabasePersistedModel, IngestModel, IngestModelReadonly } from './model/IngestModel'
22
import { BeforeIngestOperationPartMap, CommitIngestOperation } from './commit'
33
import { SofieIngestRundownDataCache, SofieIngestRundownDataCacheGenerator } from './sofieIngestCache'
44
import { canRundownBeUpdated, getRundownId, getSegmentId } from './lib'
@@ -8,7 +8,6 @@ import { UserError, UserErrorMessage } from '@sofie-automation/corelib/dist/erro
88
import { loadIngestModelFromRundownExternalId } from './model/implementation/LoadIngestModel'
99
import { Complete, clone } from '@sofie-automation/corelib/dist/lib'
1010
import { CommitIngestData, runWithRundownLockWithoutFetchingRundown } from './lock'
11-
import { DatabasePersistedModel } from '../modelBase'
1211
import {
1312
NrcsIngestChangeDetails,
1413
IngestRundown,
@@ -347,7 +346,7 @@ function sortIngestRundown(rundown: IngestRundown): void {
347346

348347
async function updateSofieRundownModel(
349348
context: JobContext,
350-
pIngestModel: Promise<IngestModel & DatabasePersistedModel>,
349+
pIngestModel: Promise<IngestModel & IngestDatabasePersistedModel>,
351350
computedIngestChanges: ComputedIngestChanges | null
352351
) {
353352
const ingestModel = await pIngestModel

0 commit comments

Comments
 (0)