Skip to content

Commit b05d92b

Browse files
committed
wip: save under new ids for bucket items
1 parent 6181a30 commit b05d92b

File tree

2 files changed

+140
-87
lines changed

2 files changed

+140
-87
lines changed

packages/corelib/src/dataModel/ExpectedPackages.ts

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ExpectedPackage, Time } from '@sofie-automation/blueprints-integration'
22
import { protectString, unprotectString } from '../protectedString'
3-
import { assertNever, getHash } from '../lib'
3+
import { assertNever, getHash, hashObj } from '../lib'
44
import {
55
AdLibActionId,
66
BucketAdLibActionId,
@@ -53,8 +53,10 @@ export interface ExpectedPackageDB {
5353

5454
package: ReadonlyDeep<ExpectedPackage.Any>
5555

56-
// HACK: This should be ExpectedPackageIngestSource[], but for the first iteration this is limited to a single source
57-
ingestSources: [ExpectedPackageIngestSource]
56+
/**
57+
* The ingest sources that generated this package.
58+
*/
59+
ingestSources: ExpectedPackageIngestSource[]
5860

5961
// playoutSources: {
6062
// /** Any playout PieceInstance. This is limited to the current and next partInstances */
@@ -183,18 +185,20 @@ export function getExpectedPackageIdFromIngestSource(
183185
return protectString(`${parentId}_${ownerId}_${getHash(localExpectedPackageId)}`)
184186
}
185187

186-
// Future implementation of id generation, once shared ownership is implemented
187-
// export function getExpectedPackageIdNew(
188-
// /** _id of the rundown*/
189-
// rundownId: RundownId,
190-
// /** The locally unique id of the expectedPackage */
191-
// expectedPackage: ReadonlyDeep<ExpectedPackage.Any>
192-
// ): ExpectedPackageId {
193-
// // This may be too agressive, but we don't know how to merge some of the properties
194-
// const objHash = hashObj({
195-
// ...expectedPackage,
196-
// listenToPackageInfoUpdates: false, // Not relevant for the hash
197-
// } satisfies ReadonlyDeep<ExpectedPackage.Any>)
198-
199-
// return protectString(`${rundownId}_${getHash(objHash)}`)
200-
// }
188+
/**
189+
* Generate the expectedPackageId for the given expectedPackage.
190+
*/
191+
export function getExpectedPackageIdNew(
192+
/** Preferably a RundownId or BucketId, but StudioId is allowed when not owned by a rundown or bucket */
193+
parentId: RundownId | StudioId | BucketId,
194+
/** The locally unique id of the expectedPackage */
195+
expectedPackage: ReadonlyDeep<ExpectedPackage.Any>
196+
): ExpectedPackageId {
197+
// This may be too agressive, but we don't know how to merge some of the properties
198+
const objHash = hashObj({
199+
...expectedPackage,
200+
listenToPackageInfoUpdates: false, // Not relevant for the hash
201+
} satisfies ReadonlyDeep<ExpectedPackage.Any>)
202+
203+
return protectString(`${parentId}_${getHash(objHash)}`)
204+
}

packages/job-worker/src/ingest/expectedPackages.ts

Lines changed: 118 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,10 @@ import { BucketAdLib } from '@sofie-automation/corelib/dist/dataModel/BucketAdLi
33
import {
44
ExpectedPackageDBType,
55
ExpectedPackageDB,
6-
getExpectedPackageIdFromIngestSource,
76
ExpectedPackageIngestSource,
7+
getExpectedPackageIdNew,
88
} from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages'
99
import { AdLibActionId, PieceId, BucketId } from '@sofie-automation/corelib/dist/dataModel/Ids'
10-
import { saveIntoDb } from '../db/changes'
1110
import { PlayoutModel } from '../playout/model/PlayoutModel'
1211
import { StudioPlayoutModel } from '../studio/model/StudioPlayoutModel'
1312
import { ReadonlyDeep } from 'type-fest'
@@ -21,7 +20,8 @@ import {
2120
import { JobContext, JobStudio } from '../jobs'
2221
import { IngestModel } from './model/IngestModel'
2322
import { IngestPartModel } from './model/IngestPartModel'
24-
import { clone, hashObj } from '@sofie-automation/corelib/dist/lib'
23+
import { hashObj } from '@sofie-automation/corelib/dist/lib'
24+
import { AnyBulkWriteOperation } from 'mongodb'
2525

2626
export function updateExpectedMediaAndPlayoutItemsForPartModel(context: JobContext, part: IngestPartModel): void {
2727
updateExpectedMediaItemsForPartModel(context, part)
@@ -87,89 +87,134 @@ function generateBucketExpectedPackages(
8787

8888
for (let i = 0; i < expectedPackages.length; i++) {
8989
const expectedPackage = expectedPackages[i]
90-
const id = expectedPackage._id || '__unnamed' + i
90+
91+
const fullPackage: ReadonlyDeep<ExpectedPackage.Any> = {
92+
...expectedPackage,
93+
_id: expectedPackage._id || '__unnamed' + i,
94+
}
9195

9296
bases.push({
93-
_id: getExpectedPackageIdFromIngestSource(bucketId, source, id),
94-
package: {
95-
...clone<ExpectedPackage.Any>(expectedPackage),
96-
_id: id,
97-
},
97+
_id: getExpectedPackageIdNew(bucketId, fullPackage),
98+
package: fullPackage,
9899
studioId: studio._id,
99100
rundownId: null,
100101
bucketId: bucketId,
101-
created: Date.now(), // This will be preserved during the `saveIntoDb`
102+
created: Date.now(), // This will be preserved during the save if needed
102103
ingestSources: [source],
103104
})
104105
}
105106

106107
return bases
107108
}
108109

109-
export async function updateExpectedPackagesForBucketAdLibPiece(
110+
async function writeUpdatedExpectedPackages(
110111
context: JobContext,
111-
adlib: BucketAdLib
112+
bucketId: BucketId,
113+
documentsToSave: ExpectedPackageDB[],
114+
matchSource: Partial<ExpectedPackageIngestSource>
112115
): Promise<void> {
113-
const packages = generateExpectedPackagesForBucketAdlib(context.studio, adlib)
114-
115-
await saveIntoDb(
116-
context,
117-
context.directCollections.ExpectedPackages,
118-
{
119-
studioId: context.studioId,
120-
bucketId: adlib.bucketId,
121-
// Note: This assumes that there is only one ingest source for each piece
122-
ingestSources: {
123-
$elemMatch: {
124-
fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB,
125-
pieceId: adlib._id,
116+
const writeOps: AnyBulkWriteOperation<ExpectedPackageDB>[] = []
117+
118+
const documentIdsToSave = documentsToSave.map((doc) => doc._id)
119+
120+
// Find which documents already exist in the database
121+
// It would be nice to avoid this, but that would make the update operation incredibly complex
122+
// There is no risk of race conditions, as bucket packages are only modified in the ingest job worker
123+
const existingDocIds = new Set(
124+
(
125+
await context.directCollections.ExpectedPackages.findFetch(
126+
{
127+
_id: { $in: documentIdsToSave },
128+
studioId: context.studioId,
129+
bucketId: bucketId,
126130
},
127-
},
128-
},
129-
packages,
130-
{
131-
beforeDiff: (obj, oldObj) => {
132-
return {
133-
...obj,
134-
// Preserve old created timestamp
135-
created: oldObj.created,
131+
{
132+
projection: {
133+
_id: 1,
134+
},
136135
}
137-
},
138-
}
136+
)
137+
).map((doc) => doc._id)
139138
)
139+
140+
for (const doc of documentsToSave) {
141+
if (existingDocIds.has(doc._id)) {
142+
// Document already exists, perform an update to merge the source into the existing document
143+
writeOps.push({
144+
updateOne: {
145+
filter: {
146+
_id: doc._id,
147+
ingestSources: {
148+
// This is pretty messy, but we need to make sure that we don't add the same source twice
149+
// nocommit - does this work?
150+
$not: {
151+
elemMatch: matchSource,
152+
},
153+
},
154+
},
155+
update: {
156+
$push: {
157+
ingestSources: doc.ingestSources[0],
158+
},
159+
},
160+
},
161+
})
162+
} else {
163+
// Perform a simple insert
164+
writeOps.push({
165+
insertOne: {
166+
document: doc,
167+
},
168+
})
169+
}
170+
}
171+
172+
// Remove any old references from this source
173+
writeOps.push({
174+
updateMany: {
175+
filter: {
176+
studioId: context.studioId,
177+
bucketId: bucketId,
178+
_id: { $nin: documentIdsToSave },
179+
},
180+
update: {
181+
$pull: {
182+
ingestSources: matchSource,
183+
},
184+
},
185+
},
186+
})
187+
188+
await context.directCollections.ExpectedPackages.bulkWrite(writeOps)
189+
190+
// Check for any packages that no longer have any sources
191+
await cleanUpUnusedPackagesInBucket(context, bucketId)
192+
}
193+
194+
export async function updateExpectedPackagesForBucketAdLibPiece(
195+
context: JobContext,
196+
adlib: BucketAdLib
197+
): Promise<void> {
198+
const documentsToSave = generateExpectedPackagesForBucketAdlib(context.studio, adlib)
199+
200+
await writeUpdatedExpectedPackages(context, adlib.bucketId, documentsToSave, {
201+
fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB,
202+
pieceId: adlib._id,
203+
// nocommit - does this work?
204+
})
140205
}
141206

142207
export async function updateExpectedPackagesForBucketAdLibAction(
143208
context: JobContext,
144209
action: BucketAdLibAction
145210
): Promise<void> {
146-
const packages = generateExpectedPackagesForBucketAdlibAction(context.studio, action)
147-
148-
await saveIntoDb(
149-
context,
150-
context.directCollections.ExpectedPackages,
151-
{
152-
studioId: context.studioId,
153-
bucketId: action.bucketId,
154-
// Note: This assumes that there is only one ingest source for each piece
155-
ingestSources: {
156-
$elemMatch: {
157-
fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB_ACTION,
158-
pieceId: action._id,
159-
},
160-
},
161-
},
162-
packages,
163-
{
164-
beforeDiff: (obj, oldObj) => {
165-
return {
166-
...obj,
167-
// Preserve old created timestamp
168-
created: oldObj.created,
169-
}
170-
},
171-
}
172-
)
211+
const documentsToSave = generateExpectedPackagesForBucketAdlibAction(context.studio, action)
212+
213+
await writeUpdatedExpectedPackages(context, action.bucketId, documentsToSave, {
214+
fromPieceType: ExpectedPackageDBType.BUCKET_ADLIB_ACTION,
215+
pieceId: action._id,
216+
// nocommit - does this work?
217+
})
173218
}
174219

175220
export async function cleanUpExpectedPackagesForBucketAdLibs(
@@ -205,15 +250,19 @@ export async function cleanUpExpectedPackagesForBucketAdLibs(
205250
)
206251

207252
// Remove any expected packages that have now have no owners
208-
await context.directCollections.ExpectedPackages.remove({
209-
studioId: context.studioId,
210-
bucketId: bucketId,
211-
ingestSources: { $size: 0 },
212-
// Future: these currently can't be referenced by playoutSources, but they could be in the future
213-
})
253+
await cleanUpUnusedPackagesInBucket(context, bucketId)
214254
}
215255
}
216256

257+
async function cleanUpUnusedPackagesInBucket(context: JobContext, bucketId: BucketId) {
258+
await context.directCollections.ExpectedPackages.remove({
259+
studioId: context.studioId,
260+
bucketId: bucketId,
261+
ingestSources: { $size: 0 },
262+
// Future: these currently can't be referenced by playoutSources, but they could be in the future
263+
})
264+
}
265+
217266
export function updateBaselineExpectedPackagesOnStudio(
218267
context: JobContext,
219268
playoutModel: StudioPlayoutModel | PlayoutModel,

0 commit comments

Comments
 (0)