Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/workflow-definition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Workflow Definition file

A workflow definition file is a YAML file describing the steps to perform when a file create event is received. This example defines that an `ABR_TRANSCODE` and `TRANSCRIBE` will be executed in parallell and as it has no dependencies they will be executed immediately. The step `VOD_PACKAGE` depends on that both `ABR_TRANSCODE` and `TRANSCRIBE` have completed. Last step is the `CLEANUP` step which depends on the `VOD_PACKAGE` to be completed.

```yml
steps:
- type: ABR_TRANSCODE
dependsOn: []
- type: VOD_PACKAGE
dependsOn:
- ABR_TRANSCODE
- TRANSCRIBE
- type: TRANSCRIBE
dependsOn: []
- type: CLEANUP
dependsOn:
- VOD_PACKAGE
```

If you want to skip transcription you write the following:

```yml
steps:
- type: ABR_TRANSCODE
dependsOn: []
- type: VOD_PACKAGE
dependsOn:
- ABR_TRANSCODE
- type: CLEANUP
dependsOn:
- VOD_PACKAGE
```

## Steps

Supported steps are shown in the table below.

| Type | Description |
| --------------- | ----------------------------------------------------------------------------------- |
| `ABR_TRANSCODE` | Transcode source file to a set of variants with different bitrates and resolutions. |
| `TRANSCRIBE` | From the source file transcribe and generates subtitles. |
| `VOD_PACKAGE` | Create a VOD package for streaming. |
| `CLEANUP` | Remove source file from input bucket and files created on temporary storage. |
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"minio": "^8.0.5",
"nodemon": "^2.0.20",
"redis": "^5.5.6",
"ts-node": "^10.9.1"
"ts-node": "^10.9.1",
"yaml": "^2.8.0"
},
"devDependencies": {
"@commitlint/cli": "^17.4.2",
Expand Down
8 changes: 8 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,18 @@ To ensure that the orchestrator maintains state during a restart you can provide
redis://<orchestrator-valkey-ip>:<orchestrator-valkey-port>
```

Then add the following to the configuration:

```
REDIS_URL=redis://<orchestrator-valkey-ip>:<orchestrator-valkey-port>
```

If you want to trigger a custom workflow you can provide a URL to a [workflow definition file](docs/workflow-definition.md). Add the following to the configuration.

```
WORKFLOW_DEFINITION_URL=https://raw.githubusercontent.com/Eyevinn/media-supply-orchestrator/refs/heads/main/workflows/vod-transcribe.yml
```

For deployment of this repository you need to first create a GitHub personal access token. Follow the instructions in this [guide](https://docs.osaas.io/osaas.wiki/Service%3A-Web-Runner.html) to create it.

Navigate to the [Web Runner service](https://app.osaas.io/dashboard/service/eyevinn-web-runner) in Open Source Cloud web console and create a service secret `ghtoken` to store the GitHub personal access token and another service secret `osctoken` for your OSC personal access token.
Expand Down
89 changes: 58 additions & 31 deletions src/orchestrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import { FastifyInstance } from 'fastify';
import { encoreCallbackApi } from './orchestrator/callbacks/encore';
import { EncoreJob } from './orchestrator/encore';
import {
VOD_PACKAGING_TASKS,
WorkOrderManager
} from './orchestrator/workorder';
import { WorkOrderManager } from './orchestrator/workorder';
import { startAbrTranscodeTask } from './orchestrator/tasks/abr_transcode';
import {
startVodPackageTask,
Expand All @@ -17,6 +14,7 @@
import { startTranscribeTask } from './orchestrator/tasks/transcribe';
import { transcribeCallbackApi } from './orchestrator/callbacks/transcribe';
import { startCleanupTask } from './orchestrator/tasks/cleanup';
import { Workflow } from './orchestrator/workflow';

export interface OrchestratorOptions {
publicBaseUrl: string;
Expand All @@ -30,15 +28,17 @@
s3SecretAccessKey: string;
api: FastifyInstance;
redisUrl?: string;
workflowDefinitionUrl: URL;
}

export default (opts: OrchestratorOptions) => {
const workOrderManager = new WorkOrderManager({
redisUrl: opts.redisUrl ? new URL(opts.redisUrl) : undefined
});
const ctx = new Context();
const workflow = new Workflow(opts.workflowDefinitionUrl);

const timer = setInterval(async () => {

Check warning on line 41 in src/orchestrator.ts

View workflow job for this annotation

GitHub Actions / lint

'timer' is assigned a value but never used
const openWorkOrders = await workOrderManager.getOpenWorkOrders();
for (const workOrder of openWorkOrders) {
if (workOrder.tasks.every((task) => task.status === 'COMPLETED')) {
Expand Down Expand Up @@ -112,46 +112,73 @@
}
}, 5000);

const handleCreateNotification = async (record: any): Promise<void> => {

Check warning on line 115 in src/orchestrator.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
console.log(
`File created: s3://${record.s3.bucket.name}/${record.s3.object.key}`
);
const filename =
record.s3.object.key.split('/').pop() || record.s3.object.key;
const externalId = createUniqueSlug(filename);
await workOrderManager.createWorkOrder(
externalId,
new URL(`s3://${record.s3.bucket.name}/${record.s3.object.key}`),
VOD_PACKAGING_TASKS
);
try {
const filename =
record.s3.object.key.split('/').pop() || record.s3.object.key;
const externalId = createUniqueSlug(filename);
const tasks = await workflow.getTasks();
await workOrderManager.createWorkOrder(
externalId,
new URL(`s3://${record.s3.bucket.name}/${record.s3.object.key}`),
tasks
);
} catch (error) {
console.error(
`Error creating work order for file s3://${record.s3.bucket.name}/${record.s3.object.key}: `,
error
);
}
};

const handleEncoreSuccess = async (jobProgress: any): Promise<void> => {

Check warning on line 137 in src/orchestrator.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
console.debug(`Encore job successful: ${JSON.stringify(jobProgress)}`);
const ctx = new Context();
const job = (await getTranscodeJob(ctx, 'mediasupply', jobProgress.jobId, {
endpointUrl: new URL(opts.encoreUrl),
bearerToken: await ctx.getServiceAccessToken('encore')
})) as EncoreJob;
if (!job.externalId) {
throw new Error(`Encore job ${jobProgress.jobId} has no externalId`);
try {
const ctx = new Context();
const job = (await getTranscodeJob(
ctx,
'mediasupply',
jobProgress.jobId,
{
endpointUrl: new URL(opts.encoreUrl),
bearerToken: await ctx.getServiceAccessToken('encore')
}
)) as EncoreJob;
if (!job.externalId) {
throw new Error(`Encore job ${jobProgress.jobId} has no externalId`);
}
await workOrderManager.updateWorkOrderTask(
job.externalId,
'ABR_TRANSCODE',
'COMPLETED',
job
);
} catch (error) {
console.error(
`Error processing Encore job success for job ${jobProgress.jobId}: `,
error
);
}
await workOrderManager.updateWorkOrderTask(
job.externalId,
'ABR_TRANSCODE',
'COMPLETED',
job
);
};

const handleTranscribeSuccess = async (jobProgress: any): Promise<void> => {

Check warning on line 167 in src/orchestrator.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
console.debug(`Transcribe job successful: ${JSON.stringify(jobProgress)}`);
await workOrderManager.updateWorkOrderTask(
jobProgress.externalId,
'TRANSCRIBE',
'COMPLETED',
jobProgress
);
try {
await workOrderManager.updateWorkOrderTask(
jobProgress.externalId,
'TRANSCRIBE',
'COMPLETED',
jobProgress
);
} catch (error) {
console.error(
`Error processing Transcribe job success for job ${jobProgress.externalId}: `,
error
);
}
};

setupListener(
Expand Down
53 changes: 53 additions & 0 deletions src/orchestrator/workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { parse } from 'yaml';
import { WorkOrderTask } from './workorder';

export class Workflow {
private tasks?: WorkOrderTask[];
private workflowDefinitionUrl: URL;

constructor(workflowDefinitionUrl: URL) {
this.tasks = undefined;
this.workflowDefinitionUrl = workflowDefinitionUrl;
}

private async loadWorkflowDefinition(url: URL): Promise<WorkOrderTask[]> {
let data;
if (url.protocol === 'file:') {
const fs = await import('fs/promises');
const filePath = url.pathname;
console.debug(`Loading workflow definition from file: ${filePath}`);
data = await fs.readFile(filePath, 'utf-8');
} else if (url.protocol === 'http:' || url.protocol === 'https:') {
const response = await fetch(url.toString());
if (!response.ok) {
throw new Error(
`Failed to fetch workflow definition: ${response.statusText}`
);
}
data = await response.text();
} else {
throw new Error(
`Unsupported protocol to fetch workflow definition: ${url.protocol}`
);
}
const parsed = parse(data);
const steps = parsed.steps;
steps.forEach((step: WorkOrderTask) => {
step.status = 'PENDING'; // Ensure all tasks start with PENDING status
});
return steps;
}

public async getTasks(): Promise<WorkOrderTask[]> {
if (this.tasks === undefined) {
this.tasks = await this.loadWorkflowDefinition(
this.workflowDefinitionUrl
);
console.debug(
`Loaded workflow definition from ${this.workflowDefinitionUrl}: `,
this.tasks
);
}
return this.tasks;
}
}
27 changes: 2 additions & 25 deletions src/orchestrator/workorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,6 @@ export interface WorkOrderManagerOpts {
redisUrl?: URL;
}

export const VOD_PACKAGING_TASKS: WorkOrderTask[] = [
{
type: 'ABR_TRANSCODE',
dependsOn: [],
status: 'PENDING'
},
{
type: 'VOD_PACKAGE',
dependsOn: ['ABR_TRANSCODE', 'TRANSCRIBE'],
status: 'PENDING'
},
{
type: 'TRANSCRIBE',
dependsOn: [],
status: 'PENDING'
},
{
type: 'CLEANUP',
dependsOn: ['VOD_PACKAGE'],
status: 'PENDING'
}
];

export function serializeWorkOrder(workOrder: WorkOrder): string {
return JSON.stringify({
...workOrder,
Expand Down Expand Up @@ -104,14 +81,14 @@ export class WorkOrderManager {
async createWorkOrder(
id: string,
source: URL,
tasks?: WorkOrderTask[]
tasks: WorkOrderTask[]
): Promise<WorkOrder> {
await this.connect();
const workOrder: WorkOrder = {
id,
source,
status: 'OPEN',
tasks: tasks || VOD_PACKAGING_TASKS,
tasks,
createdAt: new Date(),
updatedAt: new Date()
};
Expand Down
3 changes: 3 additions & 0 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ orchestrator({
s3AccessKeyId: process.env.S3_ACCESS_KEY_ID,
s3SecretAccessKey: process.env.S3_SECRET_ACCESS_KEY,
redisUrl: process.env.REDIS_URL,
workflowDefinitionUrl: process.env.WORKFLOW_DEFINITION_URL
? new URL(process.env.WORKFLOW_DEFINITION_URL)
: new URL(`file://${process.cwd()}/workflows/vod-transcribe.yml`),
api: server
});

Expand Down
12 changes: 12 additions & 0 deletions workflows/vod-transcribe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
steps:
- type: ABR_TRANSCODE
dependsOn: []
- type: VOD_PACKAGE
dependsOn:
- ABR_TRANSCODE
- TRANSCRIBE
- type: TRANSCRIBE
dependsOn: []
- type: CLEANUP
dependsOn:
- VOD_PACKAGE
Loading