Skip to content

Commit 191353f

Browse files
committed
chore: split job-worker context.ts into multiple files
1 parent 132ebc0 commit 191353f

File tree

8 files changed

+226
-219
lines changed

8 files changed

+226
-219
lines changed

packages/job-worker/src/workers/caches.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import { clone, deepFreeze } from '@sofie-automation/corelib/dist/lib'
1616
import { logger } from '../logging'
1717
import deepmerge = require('deepmerge')
1818
import { ProcessedShowStyleBase, ProcessedShowStyleVariant, StudioCacheContext } from '../jobs'
19-
import { StudioCacheContextImpl } from './context'
19+
import { StudioCacheContextImpl } from './context/StudioCacheContextImpl'
2020

2121
/**
2222
* A Wrapper to maintain a cache and provide a context using the cache when appropriate
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import { IDirectCollections } from '../../db'
2+
import { JobContext } from '../../jobs'
3+
import { WorkerDataCache } from '../caches'
4+
import { RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
5+
import { getIngestQueueName, IngestJobFunc } from '@sofie-automation/corelib/dist/worker/ingest'
6+
import { ApmSpan, ApmTransaction } from '../../profiler'
7+
import { getRandomString } from '@sofie-automation/corelib/dist/lib'
8+
import { stringifyError } from '@sofie-automation/shared-lib/dist/lib/stringifyError'
9+
import { getStudioQueueName, StudioJobFunc } from '@sofie-automation/corelib/dist/worker/studio'
10+
import { LockBase, PlaylistLock, RundownLock } from '../../jobs/lock'
11+
import { logger } from '../../logging'
12+
import { BaseModel } from '../../modelBase'
13+
import { LocksManager } from '../locks'
14+
import { unprotectString } from '@sofie-automation/corelib/dist/protectedString'
15+
import { EventsJobFunc, getEventsQueueName } from '@sofie-automation/corelib/dist/worker/events'
16+
import { FastTrackTimelineFunc } from '../../main'
17+
import { TimelineComplete } from '@sofie-automation/corelib/dist/dataModel/Timeline'
18+
import type { QueueJobFunc } from './util'
19+
import { StudioCacheContextImpl } from './StudioCacheContextImpl'
20+
import { PlaylistLockImpl, RundownLockImpl } from './Locks'
21+
22+
export class JobContextImpl extends StudioCacheContextImpl implements JobContext {
23+
private readonly locks: Array<LockBase> = []
24+
private readonly caches: Array<BaseModel> = []
25+
26+
constructor(
27+
directCollections: Readonly<IDirectCollections>,
28+
cacheData: WorkerDataCache,
29+
private readonly locksManager: LocksManager,
30+
private readonly transaction: ApmTransaction | undefined,
31+
private readonly queueJob: QueueJobFunc,
32+
private readonly fastTrackTimeline: FastTrackTimelineFunc | null
33+
) {
34+
super(directCollections, cacheData)
35+
}
36+
37+
trackCache(cache: BaseModel): void {
38+
this.caches.push(cache)
39+
}
40+
41+
async lockPlaylist(playlistId: RundownPlaylistId): Promise<PlaylistLock> {
42+
const span = this.startSpan('lockPlaylist')
43+
if (span) span.setLabel('playlistId', unprotectString(playlistId))
44+
45+
const lockId = getRandomString()
46+
logger.silly(`PlaylistLock: Locking "${playlistId}"`)
47+
48+
const resourceId = `playlist:${playlistId}`
49+
await this.locksManager.aquire(lockId, resourceId)
50+
51+
const doRelease = async () => {
52+
const span = this.startSpan('unlockPlaylist')
53+
if (span) span.setLabel('playlistId', unprotectString(playlistId))
54+
55+
await this.locksManager.release(lockId, resourceId)
56+
57+
if (span) span.end()
58+
}
59+
const lock = new PlaylistLockImpl(playlistId, doRelease)
60+
this.locks.push(lock)
61+
62+
logger.silly(`PlaylistLock: Locked "${playlistId}"`)
63+
64+
if (span) span.end()
65+
66+
return lock
67+
}
68+
69+
async lockRundown(rundownId: RundownId): Promise<RundownLock> {
70+
const span = this.startSpan('lockRundown')
71+
if (span) span.setLabel('rundownId', unprotectString(rundownId))
72+
73+
const lockId = getRandomString()
74+
logger.silly(`RundownLock: Locking "${rundownId}"`)
75+
76+
const resourceId = `rundown:${rundownId}`
77+
await this.locksManager.aquire(lockId, resourceId)
78+
79+
const doRelease = async () => {
80+
const span = this.startSpan('unlockRundown')
81+
if (span) span.setLabel('rundownId', unprotectString(rundownId))
82+
83+
await this.locksManager.release(lockId, resourceId)
84+
85+
if (span) span.end()
86+
}
87+
const lock = new RundownLockImpl(rundownId, doRelease)
88+
this.locks.push(lock)
89+
90+
logger.silly(`RundownLock: Locked "${rundownId}"`)
91+
92+
if (span) span.end()
93+
94+
return lock
95+
}
96+
97+
/** Ensure resources are cleaned up after the job completes */
98+
async cleanupResources(): Promise<void> {
99+
// Ensure all locks are freed
100+
for (const lock of this.locks) {
101+
if (lock.isLocked) {
102+
logger.warn(`Lock never freed: ${lock}`)
103+
await lock.release().catch((e) => {
104+
logger.error(`Lock free failed: ${stringifyError(e)}`)
105+
})
106+
}
107+
}
108+
109+
// Ensure all caches were saved/aborted
110+
for (const cache of this.caches) {
111+
try {
112+
cache.assertNoChanges()
113+
} catch (e) {
114+
logger.warn(`${cache.displayName} has unsaved changes: ${stringifyError(e)}`)
115+
}
116+
}
117+
}
118+
119+
startSpan(spanName: string): ApmSpan | null {
120+
if (this.transaction) return this.transaction.startSpan(spanName)
121+
return null
122+
}
123+
124+
async queueIngestJob<T extends keyof IngestJobFunc>(name: T, data: Parameters<IngestJobFunc[T]>[0]): Promise<void> {
125+
await this.queueJob(getIngestQueueName(this.studioId), name, data)
126+
}
127+
async queueStudioJob<T extends keyof StudioJobFunc>(name: T, data: Parameters<StudioJobFunc[T]>[0]): Promise<void> {
128+
await this.queueJob(getStudioQueueName(this.studioId), name, data)
129+
}
130+
async queueEventJob<T extends keyof EventsJobFunc>(name: T, data: Parameters<EventsJobFunc[T]>[0]): Promise<void> {
131+
await this.queueJob(getEventsQueueName(this.studioId), name, data)
132+
}
133+
134+
hackPublishTimelineToFastTrack(newTimeline: TimelineComplete): void {
135+
if (this.fastTrackTimeline) {
136+
this.fastTrackTimeline(newTimeline).catch((e) => {
137+
logger.error(`Failed to publish timeline to fast track: ${stringifyError(e)}`)
138+
})
139+
}
140+
}
141+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import type { RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids'
2+
import { PlaylistLock, RundownLock } from '../../jobs/lock'
3+
import { logger } from '../../logging'
4+
5+
export class PlaylistLockImpl extends PlaylistLock {
6+
#isLocked = true
7+
8+
public constructor(playlistId: RundownPlaylistId, private readonly doRelease: () => Promise<void>) {
9+
super(playlistId)
10+
}
11+
12+
get isLocked(): boolean {
13+
return this.#isLocked
14+
}
15+
16+
async release(): Promise<void> {
17+
if (!this.#isLocked) {
18+
logger.warn(`PlaylistLock: Already released "${this.playlistId}"`)
19+
} else {
20+
logger.silly(`PlaylistLock: Releasing "${this.playlistId}"`)
21+
22+
this.#isLocked = false
23+
24+
await this.doRelease()
25+
26+
logger.silly(`PlaylistLock: Released "${this.playlistId}"`)
27+
28+
if (this.deferedFunctions.length > 0) {
29+
for (const fcn of this.deferedFunctions) {
30+
await fcn()
31+
}
32+
}
33+
}
34+
}
35+
}
36+
37+
export class RundownLockImpl extends RundownLock {
38+
#isLocked = true
39+
40+
public constructor(rundownId: RundownId, private readonly doRelease: () => Promise<void>) {
41+
super(rundownId)
42+
}
43+
44+
get isLocked(): boolean {
45+
return this.#isLocked
46+
}
47+
48+
async release(): Promise<void> {
49+
if (!this.#isLocked) {
50+
logger.warn(`RundownLock: Already released "${this.rundownId}"`)
51+
} else {
52+
logger.silly(`RundownLock: Releasing "${this.rundownId}"`)
53+
54+
this.#isLocked = false
55+
56+
await this.doRelease()
57+
58+
logger.silly(`RundownLock: Released "${this.rundownId}"`)
59+
60+
if (this.deferedFunctions.length > 0) {
61+
for (const fcn of this.deferedFunctions) {
62+
await fcn()
63+
}
64+
}
65+
}
66+
}
67+
}

0 commit comments

Comments
 (0)