Skip to content

Commit bca4b49

Browse files
authored
feat: option to persist state using valkey store (#8)
1 parent 23e811c commit bca4b49

File tree

11 files changed

+351
-23
lines changed

11 files changed

+351
-23
lines changed

package-lock.json

Lines changed: 90 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
"fastify": "5.3.2",
2828
"minio": "^8.0.5",
2929
"nodemon": "^2.0.20",
30+
"redis": "^5.5.6",
3031
"ts-node": "^10.9.1"
3132
},
3233
"devDependencies": {
3334
"@commitlint/cli": "^17.4.2",
3435
"@commitlint/config-conventional": "^17.4.2",
3536
"@types/jest": "^29.4.0",
3637
"@types/node": "^18.11.18",
38+
"@types/redis": "^4.0.10",
3739
"@typescript-eslint/eslint-plugin": "^5.51.0",
3840
"@typescript-eslint/parser": "^5.51.0",
3941
"eslint": "^8.33.0",

readme.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,17 @@ ENCORE_URL=<svtencore-url>
163163
SUBTITLE_GENERATOR_URL=<subtitle-generator-url>
164164
```
165165

166+
To ensure that the orchestrator maintains state during a restart you can provide a URL to a Redis or Valkey store. Create the Valkey for this purpose first.
167+
168+
```bash
169+
% npx -y @osaas/cli db create valkey orchestrator
170+
redis://<orchestrator-valkey-ip>:<orchestrator-valkey-port>
171+
```
172+
173+
```
174+
REDIS_URL=redis://<orchestrator-valkey-ip>:<orchestrator-valkey-port>
175+
```
176+
166177
For deployment of this repository you need to first create a GitHub personal access token. Follow the instructions in this [guide](https://docs.osaas.io/osaas.wiki/Service%3A-Web-Runner.html) to create it.
167178

168179
Navigate to the [Web Runner service](https://app.osaas.io/dashboard/service/eyevinn-web-runner) in Open Source Cloud web console and create a service secret `ghtoken` to store the GitHub personal access token and another service secret `osctoken` for your OSC personal access token.

src/orchestrator.ts

Lines changed: 41 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,13 @@ export interface OrchestratorOptions {
2929
s3AccessKeyId: string;
3030
s3SecretAccessKey: string;
3131
api: FastifyInstance;
32+
redisUrl?: string;
3233
}
3334

3435
export default (opts: OrchestratorOptions) => {
35-
const workOrderManager = new WorkOrderManager();
36+
const workOrderManager = new WorkOrderManager({
37+
redisUrl: opts.redisUrl ? new URL(opts.redisUrl) : undefined
38+
});
3639
const ctx = new Context();
3740

3841
const timer = setInterval(async () => {
@@ -54,23 +57,54 @@ export default (opts: OrchestratorOptions) => {
5457
) {
5558
console.log(`[${workOrder.id}]: Starting task ${task.type}`);
5659
if (task.type === 'ABR_TRANSCODE') {
57-
await startAbrTranscodeTask(ctx, task, workOrder, opts);
60+
await startAbrTranscodeTask(
61+
ctx,
62+
task,
63+
workOrder,
64+
workOrderManager,
65+
opts
66+
);
5867
} else if (task.type === 'VOD_PACKAGE') {
59-
await startVodPackageTask(ctx, task, workOrder, opts);
68+
await startVodPackageTask(
69+
ctx,
70+
task,
71+
workOrder,
72+
workOrderManager,
73+
opts
74+
);
6075
} else if (task.type === 'TRANSCRIBE') {
61-
await startTranscribeTask(ctx, task, workOrder, opts);
76+
await startTranscribeTask(
77+
ctx,
78+
task,
79+
workOrder,
80+
workOrderManager,
81+
opts
82+
);
6283
} else if (task.type === 'CLEANUP') {
63-
await startCleanupTask(ctx, task, workOrder, opts);
84+
await startCleanupTask(
85+
ctx,
86+
task,
87+
workOrder,
88+
workOrderManager,
89+
opts
90+
);
6491
}
6592
} else if (task.status === 'IN_PROGRESS') {
6693
if (task.type === 'VOD_PACKAGE') {
67-
await updateVodPackageTask(ctx, task, workOrder, opts);
94+
await updateVodPackageTask(
95+
ctx,
96+
task,
97+
workOrder,
98+
workOrderManager,
99+
opts
100+
);
68101
}
69102
}
70103
} catch (error) {
71104
console.error(
72105
`[${workOrder.id}]: Error processing task ${task.type} for work order: `,
73-
error
106+
error,
107+
workOrder
74108
);
75109
}
76110
}

src/orchestrator/tasks/abr_transcode.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
import { Context } from '@osaas/client-core';
2-
import { WorkOrder, WorkOrderTask } from '../workorder';
2+
import { WorkOrder, WorkOrderManager, WorkOrderTask } from '../workorder';
33
import { OrchestratorOptions } from '../../orchestrator';
44
import { transcode } from '@osaas/client-transcode';
5+
import { EncoreJob } from '../encore';
56

67
export async function startAbrTranscodeTask(
78
ctx: Context,
89
task: WorkOrderTask,
910
workOrder: WorkOrder,
11+
workOrderManager: WorkOrderManager,
1012
opts: OrchestratorOptions
1113
) {
1214
const encoreServiceAccessToken = await ctx.getServiceAccessToken('encore');
@@ -25,5 +27,10 @@ export async function startAbrTranscodeTask(
2527
bearerToken: encoreServiceAccessToken
2628
}
2729
);
28-
task.status = 'IN_PROGRESS';
30+
await workOrderManager.updateWorkOrderTask(
31+
workOrder.id,
32+
task.type,
33+
'IN_PROGRESS',
34+
job as EncoreJob
35+
);
2936
}

src/orchestrator/tasks/cleanup.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { Context } from '@osaas/client-core';
2-
import { WorkOrder, WorkOrderTask } from '../workorder';
2+
import { WorkOrder, WorkOrderManager, WorkOrderTask } from '../workorder';
33
import { OrchestratorOptions } from '../../orchestrator';
44
import { removeDir, removeFile } from '../../storage/minio';
55

66
export async function startCleanupTask(
77
ctx: Context,
88
task: WorkOrderTask,
99
workOrder: WorkOrder,
10+
workOrderManager: WorkOrderManager,
1011
opts: OrchestratorOptions
1112
) {
1213
// Remove source file
@@ -32,5 +33,9 @@ export async function startCleanupTask(
3233
console.log(
3334
`[${workOrder.id}]: Removed ABR transcoded files and subtitles for work order`
3435
);
35-
task.status = 'COMPLETED';
36+
await workOrderManager.updateWorkOrderTask(
37+
workOrder.id,
38+
task.type,
39+
'COMPLETED'
40+
);
3641
}

src/orchestrator/tasks/transcribe.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import { Context } from '@osaas/client-core';
2-
import { WorkOrder, WorkOrderTask } from '../workorder';
2+
import { WorkOrder, WorkOrderManager, WorkOrderTask } from '../workorder';
33
import { OrchestratorOptions } from '../../orchestrator';
44
import { fileExists, readTextFile } from '../../storage/minio';
55

66
export async function startTranscribeTask(
77
ctx: Context,
88
task: WorkOrderTask,
99
workOrder: WorkOrder,
10+
workOrderManager: WorkOrderManager,
1011
opts: OrchestratorOptions
1112
) {
1213
const sourceWithoutExtension = workOrder.source
@@ -67,4 +68,9 @@ export async function startTranscribeTask(
6768
const data = await response.json();
6869
console.log(`[${workOrder.id}]: Transcription started for work order:`, data);
6970
task.status = 'IN_PROGRESS';
71+
await workOrderManager.updateWorkOrderTask(
72+
workOrder.id,
73+
task.type,
74+
task.status
75+
);
7076
}

src/orchestrator/tasks/vod_package.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ import {
44
EncoreJob,
55
parseInputsFromEncoreJob
66
} from '../encore';
7-
import { WorkOrder, WorkOrderTask } from '../workorder';
7+
import { WorkOrder, WorkOrderManager, WorkOrderTask } from '../workorder';
88
import { OrchestratorOptions } from '../../orchestrator';
99
import { ShakaJob } from '../shaka';
1010

1111
export async function startVodPackageTask(
1212
ctx: Context,
1313
task: WorkOrderTask,
1414
workOrder: WorkOrder,
15+
workOrderManager: WorkOrderManager,
1516
opts: OrchestratorOptions
1617
) {
1718
const workOrderTask = workOrder.tasks.find(
@@ -68,17 +69,33 @@ export async function startVodPackageTask(
6869
s3EndpointUrl: opts.s3EndpointUrl
6970
}
7071
);
71-
task.taskPayload = shakaJob;
72-
task.status = 'IN_PROGRESS';
72+
await workOrderManager.updateWorkOrderTask(
73+
workOrder.id,
74+
task.type,
75+
'IN_PROGRESS',
76+
shakaJob as ShakaJob
77+
);
7378
}
7479

7580
export async function updateVodPackageTask(
7681
ctx: Context,
7782
task: WorkOrderTask,
7883
workOrder: WorkOrder,
84+
workOrderManager: WorkOrderManager,
7985
opts: OrchestratorOptions
8086
) {
8187
const shakaTaskPayload = task.taskPayload as ShakaJob;
88+
if (!shakaTaskPayload) {
89+
console.error(
90+
`[${workOrder.id}]: No Shaka task payload found for task ${task.type}`
91+
);
92+
await workOrderManager.updateWorkOrderTask(
93+
workOrder.id,
94+
task.type,
95+
'FAILED'
96+
);
97+
return;
98+
}
8299
const shakaServiceAccessToken = await ctx.getServiceAccessToken(
83100
'eyevinn-shaka-packager-s3'
84101
);
@@ -99,4 +116,9 @@ export async function updateVodPackageTask(
99116
`[${workOrder.id}]: Shaka job ${shakaTaskPayload.name} failed: ${shakaJob.error}`
100117
);
101118
}
119+
await workOrderManager.updateWorkOrderTask(
120+
workOrder.id,
121+
task.type,
122+
task.status
123+
);
102124
}

0 commit comments

Comments
 (0)