Skip to content

Commit 1eaf789

Browse files
authored
Merge pull request #192 from ShipSecAI/betterclever/subworkflow
feat: Sub-workflow (core.workflow.call) with parent/child navigation
2 parents 1315481 + 43dc952 commit 1eaf789

File tree

28 files changed

+1532
-7
lines changed

28 files changed

+1532
-7
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
ALTER TABLE workflow_runs
2+
ADD COLUMN parent_run_id text REFERENCES workflow_runs(run_id) ON DELETE SET NULL;
3+
4+
ALTER TABLE workflow_runs
5+
ADD COLUMN parent_node_ref text;
6+
7+
CREATE INDEX IF NOT EXISTS workflow_runs_parent_run_idx ON workflow_runs(parent_run_id);
8+

backend/src/database/schema/workflow-runs.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ export const workflowRunsTable = pgTable('workflow_runs', {
88
workflowVersionId: uuid('workflow_version_id'),
99
workflowVersion: integer('workflow_version'),
1010
temporalRunId: text('temporal_run_id'),
11+
parentRunId: text('parent_run_id'),
12+
parentNodeRef: text('parent_node_ref'),
1113
totalActions: integer('total_actions').notNull().default(0),
1214
inputs: jsonb('inputs').$type<Record<string, unknown>>().notNull().default({}),
1315
triggerType: text('trigger_type').notNull().default('manual'),

backend/src/workflows/dto/workflow-graph.dto.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ export const PrepareRunRequestSchema = BaseRunWorkflowRequestSchema.extend({
106106
trigger: ExecutionTriggerMetadataSchema.optional(),
107107
runId: z.string().optional(),
108108
idempotencyKey: z.string().trim().min(1).max(128).optional(),
109+
parentRunId: z.string().optional(),
110+
parentNodeRef: z.string().optional(),
109111
}).refine(validateVersionSelection, 'Provide either version or versionId, not both');
110112

111113
export class PrepareRunRequestDto extends createZodDto(PrepareRunRequestSchema) {}

backend/src/workflows/internal-runs.controller.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ export class InternalRunsController {
4242
nodeOverrides: body.nodeOverrides ?? undefined,
4343
runId: body.runId,
4444
idempotencyKey: body.idempotencyKey,
45+
parentRunId: body.parentRunId,
46+
parentNodeRef: body.parentNodeRef,
4547
},
4648
);
4749

backend/src/workflows/repository/workflow-run.repository.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Inject, Injectable } from '@nestjs/common';
2-
import { and, desc, eq, sql } from 'drizzle-orm';
2+
import { and, desc, eq, sql, type SQL } from 'drizzle-orm';
33
import { NodePgDatabase } from 'drizzle-orm/node-postgres';
44

55
import { DRIZZLE_TOKEN } from '../../database/database.module';
@@ -17,6 +17,8 @@ interface CreateWorkflowRunInput {
1717
workflowVersionId: string;
1818
workflowVersion: number;
1919
temporalRunId?: string | null;
20+
parentRunId?: string | null;
21+
parentNodeRef?: string | null;
2022
totalActions: number;
2123
inputs: Record<string, unknown>;
2224
organizationId?: string | null;
@@ -48,6 +50,12 @@ export class WorkflowRunRepository {
4850
updatedAt: new Date(),
4951
organizationId: input.organizationId ?? null,
5052
};
53+
if (input.parentRunId !== undefined) {
54+
values.parentRunId = input.parentRunId ?? null;
55+
}
56+
if (input.parentNodeRef !== undefined) {
57+
values.parentNodeRef = input.parentNodeRef ?? null;
58+
}
5159

5260
if (input.temporalRunId !== undefined) {
5361
values.temporalRunId = input.temporalRunId;
@@ -66,6 +74,12 @@ export class WorkflowRunRepository {
6674
updatedAt: new Date(),
6775
organizationId: input.organizationId ?? null,
6876
};
77+
if (input.parentRunId !== undefined) {
78+
updateValues.parentRunId = input.parentRunId ?? null;
79+
}
80+
if (input.parentNodeRef !== undefined) {
81+
updateValues.parentNodeRef = input.parentNodeRef ?? null;
82+
}
6983

7084
if (input.temporalRunId !== undefined) {
7185
updateValues.temporalRunId = input.temporalRunId;
@@ -123,6 +137,23 @@ export class WorkflowRunRepository {
123137
.limit(options.limit ?? 50);
124138
}
125139

140+
async listChildren(
141+
parentRunId: string,
142+
options: { organizationId?: string | null; limit?: number } = {},
143+
): Promise<WorkflowRunRecord[]> {
144+
const conditions: SQL[] = [eq(workflowRunsTable.parentRunId, parentRunId)];
145+
if (options.organizationId) {
146+
conditions.push(eq(workflowRunsTable.organizationId, options.organizationId));
147+
}
148+
149+
return this.db
150+
.select()
151+
.from(workflowRunsTable)
152+
.where(and(...conditions))
153+
.orderBy(desc(workflowRunsTable.createdAt))
154+
.limit(options.limit ?? 200);
155+
}
156+
126157
async hasPendingInputs(runId: string): Promise<boolean> {
127158
const [result] = await this.db
128159
.select({ count: sql<number>`count(*)` })

backend/src/workflows/workflows.controller.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,37 @@ export class WorkflowsController {
407407
return this.workflowsService.getRun(runId, auth);
408408
}
409409

410+
@Get('/runs/:runId/children')
411+
@ApiOkResponse({
412+
description: 'List direct child workflow runs spawned by a parent run',
413+
schema: {
414+
type: 'object',
415+
properties: {
416+
runs: {
417+
type: 'array',
418+
items: {
419+
type: 'object',
420+
properties: {
421+
runId: { type: 'string' },
422+
workflowId: { type: 'string' },
423+
workflowName: { type: 'string' },
424+
parentNodeRef: { type: 'string', nullable: true },
425+
status: { type: 'string' },
426+
startedAt: { type: 'string', format: 'date-time' },
427+
completedAt: { type: 'string', format: 'date-time', nullable: true },
428+
},
429+
},
430+
},
431+
},
432+
},
433+
})
434+
async listChildRuns(
435+
@CurrentAuth() auth: AuthContext | null,
436+
@Param('runId') runId: string,
437+
) {
438+
return this.workflowsService.listChildRuns(runId, auth);
439+
}
440+
410441
@Get(':id')
411442
@ApiOkResponse({ type: WorkflowResponseDto })
412443
async findOne(

backend/src/workflows/workflows.service.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ export interface WorkflowRunSummary {
7373
triggerSource?: string | null;
7474
triggerLabel?: string | null;
7575
inputPreview: ExecutionInputPreview;
76+
parentRunId?: string | null;
77+
parentNodeRef?: string | null;
7678
}
7779

7880
const SHIPSEC_WORKFLOW_TYPE = 'shipsecWorkflowRun';
@@ -494,6 +496,8 @@ export class WorkflowsService {
494496
triggerSource,
495497
triggerLabel,
496498
inputPreview,
499+
parentRunId: run.parentRunId ?? null,
500+
parentNodeRef: run.parentNodeRef ?? null,
497501
};
498502
}
499503

@@ -519,6 +523,44 @@ export class WorkflowsService {
519523
return { runs: summaries };
520524
}
521525

526+
async listChildRuns(
527+
parentRunId: string,
528+
auth?: AuthContext | null,
529+
options: { limit?: number } = {},
530+
): Promise<{
531+
runs: Array<{
532+
runId: string;
533+
workflowId: string;
534+
workflowName: string;
535+
parentNodeRef: string | null;
536+
status: ExecutionStatus;
537+
startedAt: string;
538+
completedAt?: string;
539+
}>;
540+
}> {
541+
const { organizationId } = await this.requireRunAccess(parentRunId, auth);
542+
const children = await this.runRepository.listChildren(parentRunId, {
543+
organizationId,
544+
limit: options.limit,
545+
});
546+
547+
const summaries = await Promise.all(
548+
children.map((run) => this.buildRunSummary(run, organizationId)),
549+
);
550+
551+
const runs = summaries.map((summary, index) => ({
552+
runId: summary.id,
553+
workflowId: summary.workflowId,
554+
workflowName: summary.workflowName,
555+
parentNodeRef: children[index]?.parentNodeRef ?? null,
556+
status: summary.status,
557+
startedAt: new Date(summary.startTime).toISOString(),
558+
completedAt: summary.endTime ? new Date(summary.endTime).toISOString() : undefined,
559+
}));
560+
561+
return { runs };
562+
}
563+
522564
async getRun(runId: string, auth?: AuthContext | null): Promise<WorkflowRunSummary> {
523565
const organizationId = this.requireOrganizationId(auth);
524566
const run = await this.runRepository.findByRunId(runId, { organizationId });
@@ -713,6 +755,8 @@ export class WorkflowsService {
713755
nodeOverrides?: Record<string, Record<string, unknown>>;
714756
runId?: string;
715757
idempotencyKey?: string;
758+
parentRunId?: string;
759+
parentNodeRef?: string;
716760
} = {},
717761
): Promise<PreparedRunPayload> {
718762
const organizationId = this.requireOrganizationId(auth);
@@ -763,6 +807,8 @@ export class WorkflowsService {
763807
triggerSource: triggerMetadata.sourceId,
764808
triggerLabel: triggerMetadata.label,
765809
inputPreview,
810+
parentRunId: options.parentRunId,
811+
parentNodeRef: options.parentNodeRef,
766812
});
767813

768814
this.analyticsService.trackWorkflowStarted({

0 commit comments

Comments
 (0)