Skip to content

Commit c200f4b

Browse files
authored
feat: child workflows (#9)
1 parent bf09340 commit c200f4b

File tree

80 files changed

+8736
-972
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+8736
-972
lines changed

api-contracts/openapi/components/schemas/v2/task.yaml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
V2TaskSummary:
1+
V2TaskSummarySingle:
22
properties:
33
metadata:
44
$ref: ".././metadata.yaml#/APIResourceMeta"
@@ -38,6 +38,12 @@ V2TaskSummary:
3838
workflowId:
3939
type: string
4040
format: uuid
41+
output:
42+
type: object
43+
description: The output of the task run (for the latest run)
44+
errorMessage:
45+
type: string
46+
description: The error message of the task run (for the latest run)
4147
required:
4248
- metadata
4349
- id
@@ -47,6 +53,23 @@ V2TaskSummary:
4753
- tenantId
4854
- displayName
4955
- workflowId
56+
- output
57+
58+
V2TaskSummary:
59+
properties:
60+
metadata:
61+
$ref: ".././metadata.yaml#/APIResourceMeta"
62+
parent:
63+
$ref: "#/V2TaskSummarySingle"
64+
children:
65+
type: array
66+
items:
67+
$ref: "#/V2TaskSummarySingle"
68+
description: The list of child tasks
69+
required:
70+
- metadata
71+
- parent
72+
- children
5073

5174
V2TaskSummaryList:
5275
type: object
@@ -108,6 +131,9 @@ V2Task:
108131
output:
109132
type: string
110133
description: The output of the task run (for the latest run)
134+
errorMessage:
135+
type: string
136+
description: The error message of the task run (for the latest run)
111137
required:
112138
- metadata
113139
- id
@@ -188,6 +214,7 @@ V2TaskEventType:
188214
- ACKNOWLEDGED
189215
- CREATED
190216
- QUEUED
217+
- SKIPPED
191218

192219
V2TaskRunMetrics:
193220
type: array

api-contracts/openapi/paths/v2/tasks/tasks.yaml

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,22 @@ listTasks:
4242
schema:
4343
type: string
4444
format: date-time
45-
- description: The workflow id to find runs for
45+
- description: The earliest date to filter by
46+
in: query
47+
name: until
48+
required: false
49+
schema:
50+
type: string
51+
format: date-time
52+
- description: Additional metadata k-v pairs to filter by
53+
in: query
54+
name: additional_metadata
55+
required: false
56+
schema:
57+
type: array
58+
items:
59+
type: string
60+
- description: The workflow ids to find runs for
4661
in: query
4762
name: workflow_ids
4863
required: false

api/v1/server/handlers/tenants/get_step_run_queue_metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
func (t *TenantService) TenantGetStepRunQueueMetrics(ctx echo.Context, request gen.TenantGetStepRunQueueMetricsRequestObject) (gen.TenantGetStepRunQueueMetricsResponseObject, error) {
1111
tenant := ctx.Get("tenant").(*db.TenantModel)
1212

13-
stepRunQueueCounts, err := t.config.EngineRepository.StepRun().GetQueueCounts(ctx.Request().Context(), tenant.ID)
13+
stepRunQueueCounts, err := t.config.V2.Tasks().GetQueueCounts(ctx.Request().Context(), tenant.ID)
1414

1515
if err != nil {
1616
return nil, err

api/v1/server/handlers/v2/tasks/list.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package tasks
22

33
import (
4+
"strings"
5+
46
"github.com/google/uuid"
57
"github.com/labstack/echo/v4"
68

@@ -50,17 +52,36 @@ func (t *TasksService) V2TaskList(ctx echo.Context, request gen.V2TaskListReques
5052
workerId = request.Params.WorkerId
5153
}
5254

55+
opts := repository.ListTaskRunOpts{
56+
CreatedAfter: since,
57+
Statuses: statuses,
58+
WorkflowIds: workflowIds,
59+
WorkerId: workerId,
60+
Limit: limit,
61+
Offset: offset,
62+
}
63+
64+
additionalMetadataFilters := make(map[string]interface{})
65+
66+
if request.Params.AdditionalMetadata != nil {
67+
for _, v := range *request.Params.AdditionalMetadata {
68+
kv_pairs := strings.Split(v, ":")
69+
if len(kv_pairs) == 2 {
70+
additionalMetadataFilters[kv_pairs[0]] = kv_pairs[1]
71+
}
72+
}
73+
74+
opts.AdditionalMetadata = additionalMetadataFilters
75+
}
76+
77+
if request.Params.Until != nil {
78+
opts.FinishedBefore = request.Params.Until
79+
}
80+
5381
tasks, total, err := t.config.EngineRepository.OLAP().ListTaskRuns(
5482
ctx.Request().Context(),
5583
tenant.ID,
56-
repository.ListTaskRunOpts{
57-
CreatedAfter: since,
58-
Statuses: statuses,
59-
WorkflowIds: workflowIds,
60-
WorkerId: workerId,
61-
Limit: limit,
62-
Offset: offset,
63-
},
84+
opts,
6485
)
6586

6687
if err != nil {

api/v1/server/oas/gen/openapi.gen.go

Lines changed: 260 additions & 220 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v1/server/oas/transformers/v2/tasks.go

Lines changed: 71 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func jsonToMap(jsonBytes []byte) map[string]interface{} {
1919
return result
2020
}
2121

22-
func ToTaskSummary(task *timescalev2.PopulateTaskRunDataRow) gen.V2TaskSummary {
22+
func WorkflowRunRowToTaskSummaryUnit(task *timescalev2.ListWorkflowRunsRow) gen.V2TaskSummarySingle {
2323
additionalMetadata := jsonToMap(task.AdditionalMetadata)
2424

2525
var finishedAt *time.Time
@@ -41,27 +41,92 @@ func ToTaskSummary(task *timescalev2.PopulateTaskRunDataRow) gen.V2TaskSummary {
4141
durationPtr = &duration
4242
}
4343

44-
return gen.V2TaskSummary{
44+
output := jsonToMap(task.Output)
45+
46+
return gen.V2TaskSummarySingle{
47+
Metadata: gen.APIResourceMeta{
48+
Id: sqlchelpers.UUIDToStr(task.ExternalID),
49+
CreatedAt: task.InsertedAt.Time,
50+
UpdatedAt: task.InsertedAt.Time,
51+
},
52+
TaskId: int(task.RunID),
53+
TaskInsertedAt: task.InsertedAt.Time,
54+
DisplayName: task.DisplayName,
55+
Duration: durationPtr,
56+
StartedAt: startedAt,
57+
FinishedAt: finishedAt,
58+
AdditionalMetadata: &additionalMetadata,
59+
Status: gen.V2TaskStatus(task.ReadableStatus),
60+
TenantId: uuid.MustParse(sqlchelpers.UUIDToStr(task.TenantID)),
61+
WorkflowId: uuid.MustParse(sqlchelpers.UUIDToStr(task.WorkflowID)),
62+
Output: output,
63+
ErrorMessage: &task.ErrorMessage.String,
64+
}
65+
}
66+
67+
func WorkflowRunChildToTaskSummaryUnit(task *timescalev2.ListDAGChildrenRow) gen.V2TaskSummarySingle {
68+
additionalMetadata := jsonToMap(task.AdditionalMetadata)
69+
70+
var finishedAt *time.Time
71+
72+
if task.FinishedAt.Valid {
73+
finishedAt = &task.FinishedAt.Time
74+
}
75+
76+
var startedAt *time.Time
77+
78+
if task.StartedAt.Valid {
79+
startedAt = &task.StartedAt.Time
80+
}
81+
82+
var durationPtr *int
83+
84+
if task.FinishedAt.Valid && task.StartedAt.Valid {
85+
duration := int(task.FinishedAt.Time.Sub(task.StartedAt.Time).Milliseconds())
86+
durationPtr = &duration
87+
}
88+
89+
return gen.V2TaskSummarySingle{
4590
Metadata: gen.APIResourceMeta{
4691
Id: sqlchelpers.UUIDToStr(task.ExternalID),
4792
CreatedAt: task.InsertedAt.Time,
4893
UpdatedAt: task.InsertedAt.Time,
4994
},
50-
TaskId: int(task.ID),
95+
TaskId: int(task.RunID),
5196
TaskInsertedAt: task.InsertedAt.Time,
5297
DisplayName: task.DisplayName,
5398
Duration: durationPtr,
5499
StartedAt: startedAt,
55100
FinishedAt: finishedAt,
56101
AdditionalMetadata: &additionalMetadata,
57-
Status: gen.V2TaskStatus(task.Status),
102+
Status: gen.V2TaskStatus(task.ReadableStatus),
58103
TenantId: uuid.MustParse(sqlchelpers.UUIDToStr(task.TenantID)),
59104
WorkflowId: uuid.MustParse(sqlchelpers.UUIDToStr(task.WorkflowID)),
60105
}
61106
}
62107

108+
func ToTaskSummary(task *olap.TaskRunDataRow) gen.V2TaskSummary {
109+
parent := WorkflowRunRowToTaskSummaryUnit(task.Parent)
110+
111+
children := make([]gen.V2TaskSummarySingle, len(task.Children))
112+
113+
for i, child := range task.Children {
114+
children[i] = WorkflowRunChildToTaskSummaryUnit(child)
115+
}
116+
117+
return gen.V2TaskSummary{
118+
Metadata: gen.APIResourceMeta{
119+
Id: sqlchelpers.UUIDToStr(task.Parent.ExternalID),
120+
CreatedAt: task.Parent.InsertedAt.Time,
121+
UpdatedAt: task.Parent.InsertedAt.Time,
122+
},
123+
Parent: parent,
124+
Children: children,
125+
}
126+
}
127+
63128
func ToTaskSummaryMany(
64-
tasks []*timescalev2.PopulateTaskRunDataRow,
129+
tasks []*olap.TaskRunDataRow,
65130
total int, limit, offset int64,
66131
) gen.V2TaskSummaryList {
67132
toReturn := make([]gen.V2TaskSummary, len(tasks))
@@ -197,5 +262,6 @@ func ToTask(taskWithData *timescalev2.PopulateSingleTaskRunDataRow) gen.V2Task {
197262
Input: string(taskWithData.Input),
198263
TenantId: uuid.MustParse(sqlchelpers.UUIDToStr(taskWithData.TenantID)),
199264
WorkflowId: uuid.MustParse(sqlchelpers.UUIDToStr(taskWithData.WorkflowID)),
265+
ErrorMessage: &taskWithData.ErrorMessage.String,
200266
}
201267
}

api/v1/server/oas/transformers/worker.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,20 @@ func ToSlotState(slots []*dbsqlc.ListSemaphoreSlotsWithStateForWorkerRow, remain
1919
slot := slots[i]
2020

2121
var stepRunId uuid.UUID
22-
23-
if slot.StepRunId.Valid {
24-
stepRunId = uuid.MustParse(sqlchelpers.UUIDToStr(slot.StepRunId))
25-
}
26-
2722
var workflowRunId uuid.UUID
2823

29-
if slot.WorkflowRunId.Valid {
30-
workflowRunId = uuid.MustParse(sqlchelpers.UUIDToStr(slot.WorkflowRunId))
24+
if slot.ExternalID.Valid {
25+
stepRunId = uuid.MustParse(sqlchelpers.UUIDToStr(slot.ExternalID))
26+
workflowRunId = uuid.MustParse(sqlchelpers.UUIDToStr(slot.ExternalID))
3127
}
3228

3329
resp[i] = gen.SemaphoreSlots{
3430
StepRunId: stepRunId,
35-
Status: gen.StepRunStatus(slot.Status),
36-
ActionId: slot.ActionId,
31+
Status: gen.StepRunStatusRUNNING,
32+
ActionId: slot.ActionID,
3733
WorkflowRunId: workflowRunId,
3834
TimeoutAt: &slot.TimeoutAt.Time,
39-
StartedAt: &slot.StartedAt.Time,
35+
StartedAt: &slot.InsertedAt.Time,
4036
}
4137
}
4238

api/v1/server/run/run.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ func (t *APIServer) registerSpec(g *echo.Group, spec *openapi3.T) (*populator.Po
340340
defer cancel()
341341

342342
task, err := config.OLAPRepository.ReadTaskRun(ctx, id)
343+
343344
if err != nil {
344345
return nil, "", err
345346
}

frontend/app/src/components/molecules/data-table/data-table-row-actions.tsx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { DotsHorizontalIcon } from '@radix-ui/react-icons';
1+
import { DotsVerticalIcon } from '@radix-ui/react-icons';
22
import { Row } from '@tanstack/react-table';
33

44
import { Button } from '@/components/ui/button';
@@ -17,7 +17,7 @@ import {
1717
TooltipContent,
1818
} from '@/components/ui/tooltip';
1919

20-
interface DataTableRowActionsProps<TData extends IDGetter> {
20+
interface DataTableRowActionsProps<TData extends IDGetter<TData>> {
2121
row: Row<TData>;
2222
actions?: {
2323
label: string;
@@ -26,7 +26,7 @@ interface DataTableRowActionsProps<TData extends IDGetter> {
2626
}[];
2727
}
2828

29-
export function DataTableRowActions<TData extends IDGetter>({
29+
export function DataTableRowActions<TData extends IDGetter<TData>>({
3030
row,
3131
actions,
3232
}: DataTableRowActionsProps<TData>) {
@@ -41,7 +41,7 @@ export function DataTableRowActions<TData extends IDGetter>({
4141
variant="ghost"
4242
className="flex h-8 w-8 p-0 data-[state=open]:bg-muted"
4343
>
44-
<DotsHorizontalIcon className="h-4 w-4" />
44+
<DotsVerticalIcon className="h-4 w-4" />
4545
<span className="sr-only">Open menu</span>
4646
</Button>
4747
</DropdownMenuTrigger>
@@ -53,7 +53,7 @@ export function DataTableRowActions<TData extends IDGetter>({
5353
<DropdownMenuItem
5454
onClick={() => action.onClick(row.original)}
5555
disabled={!!action.disabled}
56-
className="w-full"
56+
className="w-full hover:cursor-pointer"
5757
>
5858
{action.label}
5959
</DropdownMenuItem>

0 commit comments

Comments
 (0)