Skip to content

Commit 37abaa1

Browse files
committed
feat: implement job options
1 parent 91ab535 commit 37abaa1

File tree

7 files changed

+48
-23
lines changed

7 files changed

+48
-23
lines changed

meteor/server/worker/worker.ts

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { MetricsCounter } from '@sofie-automation/corelib/dist/prometheus'
2424
import { isInTestWrite } from '../security/securityVerify'
2525
import { UserError } from '@sofie-automation/corelib/dist/error'
2626
import { QueueJobOptions } from '@sofie-automation/job-worker/dist/jobs'
27+
import _ from 'underscore'
2728

2829
const FREEZE_LIMIT = 1000 // how long to wait for a response to a Ping
2930
const RESTART_TIMEOUT = 30000 // how long to wait for a restart to complete before throwing an error
@@ -52,8 +53,9 @@ const metricsQueueErrorsCounter = new MetricsCounter({
5253
})
5354

5455
interface JobQueue {
55-
jobs: Array<JobEntry | null>
56-
// lowPriorityJobs: Array<JobEntry>
56+
// TODO: figure out why there can be null types in the array.
57+
jobsHighPriority: Array<JobEntry | null>
58+
jobsLowPriority: Array<JobEntry | null>
5759

5860
/** Notify that there is a job waiting (aka worker is long-polling) */
5961
notifyWorker: ManualPromise<void> | null
@@ -78,7 +80,8 @@ function getOrCreateQueue(queueName: string): JobQueue {
7880
let queue = queues.get(queueName)
7981
if (!queue) {
8082
queue = {
81-
jobs: [],
83+
jobsHighPriority: [],
84+
jobsLowPriority: [],
8285
notifyWorker: null,
8386
metricsTotal: metricsQueueTotalCounter.labels(queueName),
8487
metricsSuccess: metricsQueueSuccessCounter.labels(queueName),
@@ -120,7 +123,7 @@ async function jobFinished(
120123
async function waitForNextJob(queueName: string): Promise<void> {
121124
// Check if there is a job waiting:
122125
const queue = getOrCreateQueue(queueName)
123-
if (queue.jobs.length > 0) {
126+
if (queue.jobsHighPriority.length + queue.jobsLowPriority.length > 0) {
124127
return
125128
}
126129
// No job ready, do a long-poll
@@ -147,7 +150,14 @@ async function waitForNextJob(queueName: string): Promise<void> {
147150
async function getNextJob(queueName: string): Promise<JobSpec | null> {
148151
// Check if there is a job waiting:
149152
const queue = getOrCreateQueue(queueName)
150-
const job = queue.jobs.shift()
153+
// Prefer high priority jobs
154+
let job: JobEntry | null | undefined
155+
if (queue.jobsHighPriority.length > 0) {
156+
job = queue.jobsLowPriority.shift()
157+
} else {
158+
job = queue.jobsHighPriority.shift()
159+
}
160+
151161
if (job) {
152162
// If there is a completion handler, register it for execution
153163
runningJobs.set(job.spec.id, {
@@ -179,7 +189,7 @@ async function interruptJobStream(queueName: string): Promise<void> {
179189
}
180190
})
181191
} else {
182-
queue.jobs.unshift(null)
192+
queue.jobsLowPriority.unshift(null)
183193
}
184194
}
185195
async function queueJobWithoutResult(
@@ -202,13 +212,28 @@ async function queueJobWithoutResult(
202212
)
203213
}
204214

205-
function queueJobInner(queueName: string, jobToQueue: JobEntry, options: QueueJobOptions): void {
215+
function queueJobInner(queueName: string, jobToQueue: JobEntry, options?: QueueJobOptions): void {
206216
// Put the job at the end of the queue:
207217
const queue = getOrCreateQueue(queueName)
208-
queue.jobs.push(jobToQueue)
209-
queue.metricsTotal.inc()
210218

211-
// nocommit - use options
219+
// Debounce: skip if an identical job is already queued
220+
if (options?.debounce) {
221+
const alreadyQueued = [...queue.jobsHighPriority, ...queue.jobsLowPriority].some(
222+
(job) => job && job.spec.name === jobToQueue.spec.name && _.isEqual(job.spec.data, jobToQueue.spec.data)
223+
)
224+
225+
if (alreadyQueued) {
226+
logger.debug(`Debounced duplicate job "${jobToQueue.spec.name}" in queue "${queueName}"`)
227+
return
228+
}
229+
}
230+
231+
// Queue the job based on priority
232+
if (options?.lowPriority) queue.jobsLowPriority.push(jobToQueue)
233+
else queue.jobsHighPriority.push(jobToQueue)
234+
235+
queue.metricsTotal.inc()
236+
// nocommit - add test for priority and debounce
212237

213238
// If there is a worker waiting to pick up a job
214239
if (queue.notifyWorker) {
@@ -232,7 +257,7 @@ function queueJobAndWrapResult<TRes>(
232257
queueName: string,
233258
job: JobSpec,
234259
now: Time,
235-
options: QueueJobOptions | undefined
260+
options?: QueueJobOptions
236261
): WorkerJob<TRes> {
237262
const { result, completionHandler } = generateCompletionHandler<TRes>(job.id, now)
238263

@@ -242,7 +267,7 @@ function queueJobAndWrapResult<TRes>(
242267
spec: job,
243268
completionHandler: completionHandler,
244269
},
245-
options ?? {}
270+
options
246271
)
247272

248273
return result

packages/job-worker/src/ipc.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { FastTrackTimelineFunc, JobSpec, JobWorkerBase } from './main.js'
44
import { JobManager, JobStream } from './manager.js'
55
import { WorkerId } from '@sofie-automation/corelib/dist/dataModel/Ids'
66
import { getPrometheusMetricsString, setupPrometheusMetrics } from '@sofie-automation/corelib/dist/prometheus'
7-
import type { QueueJobOptions } from './jobs'
7+
import type { QueueJobOptions } from './jobs/index.js'
88

99
/**
1010
* A very simple implementation of JobManager, that is designed to work via threadedClass over IPC

packages/job-worker/src/jobs/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ export { ProcessedShowStyleVariant, ProcessedShowStyleBase, ProcessedShowStyleCo
2525
export { JobStudio }
2626

2727
export interface QueueJobOptions {
28-
/**
28+
/**
2929
* The job should be run with a low priority, allowing other operations to be run first ]
3030
*/
3131
lowPriority?: boolean
3232

33-
/**
33+
/**
3434
* Debounce execution, delaying execution until at least this wait time.
35-
* If the job is already queued, it will not be queued again
35+
* If the job is already queued, it will not be queued again
3636
*/
3737
debounce?: number
3838
}

packages/job-worker/src/playout/expectedPackages.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { CleanupOrphanedExpectedPackageReferencesProps } from '@sofie-automation/corelib/dist/worker/studio'
2-
import type { JobContext } from '../jobs'
3-
import { runWithPlaylistLock } from './lock'
2+
import type { JobContext } from '../jobs/index.js'
3+
import { runWithPlaylistLock } from './lock.js'
44
import {
55
ExpectedPackageDB,
66
isPackageReferencedByPlayout,

packages/job-worker/src/playout/model/implementation/PlayoutPieceInstanceModelImpl.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { clone, getRandomId } from '@sofie-automation/corelib/dist/lib'
55
import { ExpectedPackage, Time } from '@sofie-automation/blueprints-integration'
66
import { PlayoutPieceInstanceModel } from '../PlayoutPieceInstanceModel.js'
77
import _ from 'underscore'
8-
import { getExpectedPackageId } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages.js'
8+
import { getExpectedPackageId } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages'
99

1010
export class PlayoutPieceInstanceModelImpl implements PlayoutPieceInstanceModel {
1111
/**

packages/job-worker/src/playout/model/implementation/SavePlayoutModel.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import { PlayoutPartInstanceModelImpl } from './PlayoutPartInstanceModelImpl.js'
1515
import { PlayoutRundownModelImpl } from './PlayoutRundownModelImpl.js'
1616
import { ReadonlyDeep } from 'type-fest'
1717
import { ExpectedPackage } from '@sofie-automation/blueprints-integration'
18-
import { normalizeArrayToMap } from '@sofie-automation/corelib/dist/lib.js'
19-
import { ExpectedPackageDB } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages.js'
20-
import { StudioJobs } from '@sofie-automation/corelib/dist/worker/studio.js'
18+
import { normalizeArrayToMap } from '@sofie-automation/corelib/dist/lib'
19+
import { ExpectedPackageDB } from '@sofie-automation/corelib/dist/dataModel/ExpectedPackages'
20+
import { StudioJobs } from '@sofie-automation/corelib/dist/worker/studio'
2121

2222
/**
2323
* Save any changed AdlibTesting Segments

packages/job-worker/src/workers/context/util.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { QueueJobOptions } from '../../jobs'
1+
import type { QueueJobOptions } from '../../jobs/index.js'
22

33
export type QueueJobFunc = (
44
queueName: string,

0 commit comments

Comments
 (0)