Skip to content

Commit 9db08fd

Browse files
committed
wip
1 parent 0ffdcd7 commit 9db08fd

File tree

13 files changed

+283
-37
lines changed

13 files changed

+283
-37
lines changed

meteor/server/worker/worker.ts

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { MongoQuery } from '@sofie-automation/corelib/dist/mongo'
2222
import { UserActionsLog } from '../collections'
2323
import { MetricsCounter } from '@sofie-automation/corelib/dist/prometheus'
2424
import { isInTestWrite } from '../security/securityVerify'
25+
import { QueueJobOptions } from '@sofie-automation/job-worker/dist/jobs'
2526

2627
const FREEZE_LIMIT = 1000 // how long to wait for a response to a Ping
2728
const RESTART_TIMEOUT = 30000 // how long to wait for a restart to complete before throwing an error
@@ -51,6 +52,8 @@ const metricsQueueErrorsCounter = new MetricsCounter({
5152

5253
interface JobQueue {
5354
jobs: Array<JobEntry | null>
55+
// lowPriorityJobs: Array<JobEntry>
56+
5457
/** Notify that there is a job waiting (aka worker is long-polling) */
5558
notifyWorker: ManualPromise<void> | null
5659

@@ -177,18 +180,27 @@ async function interruptJobStream(queueName: string): Promise<void> {
177180
queue.jobs.unshift(null)
178181
}
179182
}
180-
async function queueJobWithoutResult(queueName: string, jobName: string, jobData: unknown): Promise<void> {
181-
queueJobInner(queueName, {
182-
spec: {
183-
id: getRandomString(),
184-
name: jobName,
185-
data: jobData,
183+
async function queueJobWithoutResult(
184+
queueName: string,
185+
jobName: string,
186+
jobData: unknown,
187+
options: QueueJobOptions | undefined
188+
): Promise<void> {
189+
queueJobInner(
190+
queueName,
191+
{
192+
spec: {
193+
id: getRandomString(),
194+
name: jobName,
195+
data: jobData,
196+
},
197+
completionHandler: null,
186198
},
187-
completionHandler: null,
188-
})
199+
options ?? {}
200+
)
189201
}
190202

191-
function queueJobInner(queueName: string, jobToQueue: JobEntry): void {
203+
function queueJobInner(queueName: string, jobToQueue: JobEntry, options: QueueJobOptions): void {
192204
// Put the job at the end of the queue:
193205
const queue = getOrCreateQueue(queueName)
194206
queue.jobs.push(jobToQueue)
@@ -212,13 +224,22 @@ function queueJobInner(queueName: string, jobToQueue: JobEntry): void {
212224
}
213225
}
214226

215-
function queueJobAndWrapResult<TRes>(queueName: string, job: JobSpec, now: Time): WorkerJob<TRes> {
227+
function queueJobAndWrapResult<TRes>(
228+
queueName: string,
229+
job: JobSpec,
230+
now: Time,
231+
options: QueueJobOptions | undefined
232+
): WorkerJob<TRes> {
216233
const { result, completionHandler } = generateCompletionHandler<TRes>(job.id, now)
217234

218-
queueJobInner(queueName, {
219-
spec: job,
220-
completionHandler: completionHandler,
221-
})
235+
queueJobInner(
236+
queueName,
237+
{
238+
spec: job,
239+
completionHandler: completionHandler,
240+
},
241+
options ?? {}
242+
)
222243

223244
return result
224245
}
@@ -413,7 +434,8 @@ export async function QueueForceClearAllCaches(studioIds: StudioId[]): Promise<v
413434
name: FORCE_CLEAR_CACHES_JOB,
414435
data: undefined,
415436
},
416-
now
437+
now,
438+
{}
417439
)
418440
)
419441

@@ -426,7 +448,8 @@ export async function QueueForceClearAllCaches(studioIds: StudioId[]): Promise<v
426448
name: FORCE_CLEAR_CACHES_JOB,
427449
data: undefined,
428450
},
429-
now
451+
now,
452+
{}
430453
)
431454
)
432455

@@ -439,7 +462,8 @@ export async function QueueForceClearAllCaches(studioIds: StudioId[]): Promise<v
439462
name: FORCE_CLEAR_CACHES_JOB,
440463
data: undefined,
441464
},
442-
now
465+
now,
466+
{}
443467
)
444468
)
445469
}
@@ -458,7 +482,8 @@ export async function QueueForceClearAllCaches(studioIds: StudioId[]): Promise<v
458482
export async function QueueStudioJob<T extends keyof StudioJobFunc>(
459483
jobName: T,
460484
studioId: StudioId,
461-
jobParameters: Parameters<StudioJobFunc[T]>[0]
485+
jobParameters: Parameters<StudioJobFunc[T]>[0],
486+
options?: QueueJobOptions
462487
): Promise<WorkerJob<ReturnType<StudioJobFunc[T]>>> {
463488
if (isInTestWrite()) throw new Meteor.Error(404, 'Should not be reachable during startup tests')
464489
if (!studioId) throw new Meteor.Error(500, 'Missing studioId')
@@ -471,7 +496,8 @@ export async function QueueStudioJob<T extends keyof StudioJobFunc>(
471496
name: jobName,
472497
data: jobParameters,
473498
},
474-
now
499+
now,
500+
options
475501
)
476502
}
477503

packages/corelib/src/worker/studio.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ export enum StudioJobs {
204204
* for use in ad.lib actions and other triggers
205205
*/
206206
SwitchRouteSet = 'switchRouteSet',
207+
208+
/**
209+
* Cleanup any expected packages playout references that are orphaned
210+
* During playout it is hard to track removal of PieceInstances (particularly when resetting PieceInstances)
211+
*/
212+
CleanupOrphanedExpectedPackageReferences = 'cleanupOrphanedExpectedPackageReferences',
207213
}
208214

209215
export interface RundownPlayoutPropsBase {
@@ -366,6 +372,11 @@ export interface SwitchRouteSetProps {
366372
state: boolean | 'toggle'
367373
}
368374

375+
export interface CleanupOrphanedExpectedPackageReferencesProps {
376+
playlistId: RundownPlaylistId
377+
rundownId: RundownId
378+
}
379+
369380
/**
370381
* Set of valid functions, of form:
371382
* `id: (data) => return`
@@ -422,6 +433,8 @@ export type StudioJobFunc = {
422433
[StudioJobs.ClearQuickLoopMarkers]: (data: ClearQuickLoopMarkersProps) => void
423434

424435
[StudioJobs.SwitchRouteSet]: (data: SwitchRouteSetProps) => void
436+
437+
[StudioJobs.CleanupOrphanedExpectedPackageReferences]: (data: CleanupOrphanedExpectedPackageReferencesProps) => void
425438
}
426439

427440
export function getStudioQueueName(id: StudioId): string {

packages/job-worker/src/__mocks__/context.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import {
4646
ProcessedShowStyleBase,
4747
ProcessedShowStyleCompound,
4848
ProcessedShowStyleVariant,
49+
QueueJobOptions,
4950
} from '../jobs'
5051
import { PlaylistLock, RundownLock } from '../jobs/lock'
5152
import { BaseModel } from '../modelBase'
@@ -154,7 +155,8 @@ export class MockJobContext implements JobContext {
154155
}
155156
async queueStudioJob<T extends keyof StudioJobFunc>(
156157
_name: T,
157-
_data: Parameters<StudioJobFunc[T]>[0]
158+
_data: Parameters<StudioJobFunc[T]>[0],
159+
_options?: QueueJobOptions
158160
): Promise<void> {
159161
throw new Error('Method not implemented.')
160162
}

packages/job-worker/src/ingest/expectedPackages.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async function writeUpdatedExpectedPackages(
165165
},
166166
},
167167
update: {
168-
$push: {
168+
$addToSet: {
169169
ingestSources: doc.ingestSources[0],
170170
},
171171
},

packages/job-worker/src/ipc.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { FastTrackTimelineFunc, JobSpec, JobWorkerBase } from './main'
44
import { JobManager, JobStream } from './manager'
55
import { WorkerId } from '@sofie-automation/corelib/dist/dataModel/Ids'
66
import { getPrometheusMetricsString, setupPrometheusMetrics } from '@sofie-automation/corelib/dist/prometheus'
7+
import type { QueueJobOptions } from './jobs'
78

89
/**
910
* A very simple implementation of JobManager, that is designed to work via threadedClass over IPC
@@ -17,7 +18,12 @@ class IpcJobManager implements JobManager {
1718
error: any,
1819
result: any
1920
) => Promise<void>,
20-
public readonly queueJob: (queueName: string, jobName: string, jobData: unknown) => Promise<void>,
21+
public readonly queueJob: (
22+
queueName: string,
23+
jobName: string,
24+
jobData: unknown,
25+
options: QueueJobOptions | undefined
26+
) => Promise<void>,
2127
private readonly interruptJobStream: (queueName: string) => Promise<void>,
2228
private readonly waitForNextJob: (queueName: string) => Promise<void>,
2329
private readonly getNextJob: (queueName: string) => Promise<JobSpec | null>
@@ -47,7 +53,12 @@ export class IpcJobWorker extends JobWorkerBase {
4753
interruptJobStream: (queueName: string) => Promise<void>,
4854
waitForNextJob: (queueName: string) => Promise<void>,
4955
getNextJob: (queueName: string) => Promise<JobSpec | null>,
50-
queueJob: (queueName: string, jobName: string, jobData: unknown) => Promise<void>,
56+
queueJob: (
57+
queueName: string,
58+
jobName: string,
59+
jobData: unknown,
60+
options: QueueJobOptions | undefined
61+
) => Promise<void>,
5162
logLine: (msg: LogEntry) => Promise<void>,
5263
fastTrackTimeline: FastTrackTimelineFunc,
5364
enableFreezeLimit: boolean

packages/job-worker/src/jobs/index.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ export { ApmSpan }
2424
export { ProcessedShowStyleVariant, ProcessedShowStyleBase, ProcessedShowStyleCompound }
2525
export { JobStudio }
2626

27+
export interface QueueJobOptions {
28+
/** Optional: The job will be run with a low priority, allowing other operations to be run first */
29+
lowPriority?: boolean
30+
31+
/** Optional: Debounce execution, if the job is already queued, it will not be queued again */
32+
debounce?: boolean
33+
}
34+
2735
/**
2836
* Context for any job run in the job-worker
2937
*/
@@ -54,7 +62,11 @@ export interface JobContext extends StudioCacheContext {
5462
* @param data Data for the job
5563
* @returns Promise which resolves once successfully queued
5664
*/
57-
queueStudioJob<T extends keyof StudioJobFunc>(name: T, data: Parameters<StudioJobFunc[T]>[0]): Promise<void>
65+
queueStudioJob<T extends keyof StudioJobFunc>(
66+
name: T,
67+
data: Parameters<StudioJobFunc[T]>[0],
68+
options?: QueueJobOptions
69+
): Promise<void>
5870
/**
5971
* Queue an Event job to be run
6072
* It is not possible to wait for the result. This ensures the threads don't get deadlocked

packages/job-worker/src/manager.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import { WorkerId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2-
import { UserError } from '@sofie-automation/corelib/dist/error'
3-
import { JobSpec } from './main'
1+
import type { WorkerId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2+
import type { UserError } from '@sofie-automation/corelib/dist/error'
3+
import type { JobSpec } from './main'
4+
import type { QueueJobOptions } from './jobs'
45

56
export interface JobManager {
67
jobFinished: (
@@ -11,7 +12,12 @@ export interface JobManager {
1112
result: any
1213
) => Promise<void>
1314
// getNextJob: (queueName: string) => Promise<JobSpec>
14-
queueJob: (queueName: string, jobName: string, jobData: unknown) => Promise<void>
15+
queueJob: (
16+
queueName: string,
17+
jobName: string,
18+
jobData: unknown,
19+
options: QueueJobOptions | undefined
20+
) => Promise<void>
1521
subscribeToQueue: (queueName: string, workerId: WorkerId) => JobStream
1622
}
1723

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import type { CleanupOrphanedExpectedPackageReferencesProps } from '@sofie-automation/corelib/dist/worker/studio'
2+
import type { JobContext } from '../jobs'
3+
4+
export async function handleCleanupOrphanedExpectedPackageReferences(
5+
context: JobContext,
6+
data: CleanupOrphanedExpectedPackageReferencesProps
7+
): Promise<void> {
8+
// TODO
9+
}

packages/job-worker/src/playout/model/implementation/PlayoutModelImpl.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,13 @@ import _ = require('underscore')
3737
import { unprotectString } from '@sofie-automation/corelib/dist/protectedString'
3838
import { PlaylistLock } from '../../../jobs/lock'
3939
import { logger } from '../../../logging'
40-
import { clone, getRandomId, literal, normalizeArrayToMapFunc } from '@sofie-automation/corelib/dist/lib'
40+
import {
41+
clone,
42+
getRandomId,
43+
groupByToMapFunc,
44+
literal,
45+
normalizeArrayToMapFunc,
46+
} from '@sofie-automation/corelib/dist/lib'
4147
import { sleep } from '@sofie-automation/shared-lib/dist/lib/lib'
4248
import { sortRundownIDsInPlaylist } from '@sofie-automation/corelib/dist/playout/playlist'
4349
import { PlayoutRundownModel } from '../PlayoutRundownModel'
@@ -50,7 +56,11 @@ import { protectString } from '@sofie-automation/shared-lib/dist/lib/protectedSt
5056
import { queuePartInstanceTimingEvent } from '../../timings/events'
5157
import { IS_PRODUCTION } from '../../../environment'
5258
import { DeferredAfterSaveFunction, DeferredFunction, PlayoutModel, PlayoutModelReadonly } from '../PlayoutModel'
53-
import { writePartInstancesAndPieceInstances, writeAdlibTestingSegments } from './SavePlayoutModel'
59+
import {
60+
writePartInstancesAndPieceInstances,
61+
writeAdlibTestingSegments,
62+
writeExpectedPackagesForPlayoutSources,
63+
} from './SavePlayoutModel'
5464
import { PlayoutPieceInstanceModel } from '../PlayoutPieceInstanceModel'
5565
import { DatabasePersistedModel } from '../../../modelBase'
5666
import { ExpectedPlayoutItemStudio } from '@sofie-automation/corelib/dist/dataModel/ExpectedPlayoutItem'
@@ -672,12 +682,20 @@ export class PlayoutModelImpl extends PlayoutModelReadonlyImpl implements Playou
672682
}
673683
this.#timelineHasChanged = false
674684

685+
const partInstancesByRundownId = groupByToMapFunc(
686+
Array.from(this.allPartInstances.values()).filter((p) => !!p),
687+
(p) => p.partInstance.rundownId
688+
)
689+
675690
await Promise.all([
676691
this.#playlistHasChanged
677692
? this.context.directCollections.RundownPlaylists.replace(this.playlistImpl)
678693
: undefined,
679694
...writePartInstancesAndPieceInstances(this.context, this.allPartInstances),
680695
writeAdlibTestingSegments(this.context, this.rundownsImpl),
696+
...Array.from(partInstancesByRundownId.entries()).map(async ([rundownId, partInstances]) =>
697+
writeExpectedPackagesForPlayoutSources(this.context, this.playlistId, rundownId, partInstances)
698+
),
681699
this.#baselineHelper.saveAllToDatabase(),
682700
this.#notificationsHelper.saveAllToDatabase(),
683701
this.context.saveRouteSetChanges(),

0 commit comments

Comments
 (0)