Skip to content

Commit c894dfe

Browse files
committed
Include queue in diag ui
1 parent 3f0e192 commit c894dfe

File tree

6 files changed

+115
-14
lines changed

6 files changed

+115
-14
lines changed

backend/mysql/diagnostics.go

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at, i.queue
26+
`SELECT i.instance_id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.created_at, i.completed_at, i.queue
2727
FROM instances i
2828
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE instance_id = ? AND execution_id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
@@ -36,7 +36,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
3636
} else {
3737
rows, err = tx.QueryContext(
3838
ctx,
39-
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at, i.queue
39+
`SELECT i.instance_id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.created_at, i.completed_at, i.queue
4040
FROM instances i
4141
ORDER BY i.created_at DESC, i.instance_id DESC
4242
LIMIT ?`,
@@ -53,9 +53,11 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
5353

5454
for rows.Next() {
5555
var id, executionID, queue string
56+
var parentID, parentExecutionID *string
57+
var parentScheduleEventID *int64
5658
var createdAt time.Time
5759
var completedAt *time.Time
58-
err = rows.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
60+
err = rows.Scan(&id, &executionID, &parentID, &parentExecutionID, &parentScheduleEventID, &createdAt, &completedAt, &queue)
5961
if err != nil {
6062
return nil, err
6163
}
@@ -65,8 +67,16 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
6567
state = core.WorkflowInstanceStateFinished
6668
}
6769

70+
var instance *core.WorkflowInstance
71+
if parentID != nil {
72+
parentInstance := core.NewWorkflowInstance(*parentID, *parentExecutionID)
73+
instance = core.NewSubWorkflowInstance(id, executionID, parentInstance, *parentScheduleEventID)
74+
} else {
75+
instance = core.NewWorkflowInstance(id, executionID)
76+
}
77+
6878
instances = append(instances, &diag.WorkflowInstanceRef{
69-
Instance: core.NewWorkflowInstance(id, executionID),
79+
Instance: instance,
7080
CreatedAt: createdAt,
7181
CompletedAt: completedAt,
7282
State: state,
@@ -86,15 +96,17 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instance *core.
8696

8797
res := tx.QueryRowContext(
8898
ctx,
89-
`SELECT instance_id, execution_id, created_at, completed_at, queue
99+
`SELECT instance_id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, created_at, completed_at, queue
90100
FROM instances
91101
WHERE instance_id = ? AND execution_id = ?`, instance.InstanceID, instance.ExecutionID)
92102

93103
var id, executionID, queue string
104+
var parentID, parentExecutionID *string
105+
var parentScheduleEventID *int64
94106
var createdAt time.Time
95107
var completedAt *time.Time
96108

97-
err = res.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
109+
err = res.Scan(&id, &executionID, &parentID, &parentExecutionID, &parentScheduleEventID, &createdAt, &completedAt, &queue)
98110
if err != nil {
99111
if err == sql.ErrNoRows {
100112
return nil, nil
@@ -108,6 +120,13 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instance *core.
108120
state = core.WorkflowInstanceStateFinished
109121
}
110122

123+
if parentID != nil {
124+
parentInstance := core.NewWorkflowInstance(*parentID, *parentExecutionID)
125+
instance = core.NewSubWorkflowInstance(id, executionID, parentInstance, *parentScheduleEventID)
126+
} else {
127+
instance = core.NewWorkflowInstance(id, executionID)
128+
}
129+
111130
return &diag.WorkflowInstanceRef{
112131
Instance: core.NewWorkflowInstance(id, executionID),
113132
CreatedAt: createdAt,

backend/sqlite/diagnostics.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
2525
if afterInstanceID != "" {
2626
rows, err = tx.QueryContext(
2727
ctx,
28-
`SELECT i.id, i.execution_id, i.created_at, i.completed_at, i.queue
28+
`SELECT i.id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.created_at, i.completed_at, i.queue
2929
FROM instances i
3030
INNER JOIN (SELECT id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
3131
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.id < ii.id)
@@ -38,7 +38,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
3838
} else {
3939
rows, err = tx.QueryContext(
4040
ctx,
41-
`SELECT i.id, i.execution_id, i.created_at, i.completed_at, i.queue
41+
`SELECT i.id, i.execution_id, i.parent_instance_id, i.parent_execution_id, i.parent_schedule_event_id, i.created_at, i.completed_at, i.queue
4242
FROM instances i
4343
ORDER BY i.created_at DESC, i.id DESC
4444
LIMIT ?`,
@@ -55,9 +55,11 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
5555

5656
for rows.Next() {
5757
var id, executionID, queue string
58+
var parentID, parentExecutionID *string
59+
var parentScheduleEventID *int64
5860
var createdAt time.Time
5961
var completedAt *time.Time
60-
err = rows.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
62+
err = rows.Scan(&id, &executionID, &parentID, &parentExecutionID, &parentScheduleEventID, &createdAt, &completedAt, &queue)
6163
if err != nil {
6264
return nil, err
6365
}
@@ -67,8 +69,16 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
6769
state = core.WorkflowInstanceStateFinished
6870
}
6971

72+
var instance *core.WorkflowInstance
73+
if parentID != nil {
74+
parentInstance := core.NewWorkflowInstance(*parentID, *parentExecutionID)
75+
instance = core.NewSubWorkflowInstance(id, executionID, parentInstance, *parentScheduleEventID)
76+
} else {
77+
instance = core.NewWorkflowInstance(id, executionID)
78+
}
79+
7080
instances = append(instances, &diag.WorkflowInstanceRef{
71-
Instance: core.NewWorkflowInstance(id, executionID),
81+
Instance: instance,
7282
CreatedAt: createdAt,
7383
CompletedAt: completedAt,
7484
State: state,
@@ -92,15 +102,17 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instance *core
92102

93103
res := tx.QueryRowContext(
94104
ctx,
95-
`SELECT id, execution_id, created_at, completed_at, queue
105+
`SELECT id, execution_id, parent_instance_id, parent_execution_id, parent_schedule_event_id, created_at, completed_at, queue
96106
FROM instances WHERE id = ? AND execution_id = ?`,
97107
instance.InstanceID, instance.ExecutionID)
98108

99109
var id, executionID, queue string
110+
var parentID, parentExecutionID *string
111+
var parentScheduleEventID *int64
100112
var createdAt time.Time
101113
var completedAt *time.Time
102114

103-
err = res.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
115+
err = res.Scan(&id, &executionID, &parentID, &parentExecutionID, &parentScheduleEventID, &createdAt, &completedAt, &queue)
104116
if err != nil {
105117
if err == sql.ErrNoRows {
106118
return nil, nil
@@ -114,8 +126,15 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instance *core
114126
state = core.WorkflowInstanceStateFinished
115127
}
116128

129+
if parentID != nil {
130+
parentInstance := core.NewWorkflowInstance(*parentID, *parentExecutionID)
131+
instance = core.NewSubWorkflowInstance(id, executionID, parentInstance, *parentScheduleEventID)
132+
} else {
133+
instance = core.NewWorkflowInstance(id, executionID)
134+
}
135+
117136
return &diag.WorkflowInstanceRef{
118-
Instance: core.NewWorkflowInstance(id, executionID),
137+
Instance: instance,
119138
CreatedAt: createdAt,
120139
CompletedAt: completedAt,
121140
State: state,

diag/app/src/Instance.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ function Instance() {
112112
<dd className="col-sm-8">
113113
{!instance.completed_at ? <i>pending</i> : instance.completed_at}
114114
</dd>
115+
116+
<dt className="col-sm-4">Queue</dt>
117+
<dd className="col-sm-8">{instance.queue}</dd>
115118
</dl>
116119

117120
<Card>

diag/app/src/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export interface WorkflowInstanceRef {
1111
completed_at?: string;
1212

1313
state: number;
14+
queue: string;
1415
}
1516

1617
export type WorkflowInstanceInfo = WorkflowInstanceRef & {

samples/queues/queues.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,28 +4,50 @@ import (
44
"context"
55
"log"
66
"log/slog"
7+
"net/http"
8+
"os"
9+
"os/signal"
710
"time"
811

912
"github.com/cschleiden/go-workflows/backend"
1013
"github.com/cschleiden/go-workflows/client"
14+
"github.com/cschleiden/go-workflows/diag"
1115
"github.com/cschleiden/go-workflows/samples"
1216
"github.com/cschleiden/go-workflows/worker"
1317
"github.com/cschleiden/go-workflows/workflow"
1418

1519
"github.com/google/uuid"
1620
)
1721

18-
var CustomActivityQueue = workflow.Queue("custom-activity-queue")
22+
var (
23+
CustomActivityQueue = workflow.Queue("custom-activity-queue")
24+
CustomWorkflowQueue = workflow.Queue("custom-workflow-queue")
25+
)
1926

2027
func main() {
2128
ctx, cancel := context.WithCancel(context.Background())
2229

2330
b := samples.GetBackend("queues", backend.WithLogger(slog.Default()))
2431

32+
db, ok := b.(diag.Backend)
33+
if !ok {
34+
panic("backend does not implement diag.Backend")
35+
}
36+
37+
// Start diagnostic server under /diag
38+
m := http.NewServeMux()
39+
m.Handle("/diag/", http.StripPrefix("/diag", diag.NewServeMux(db)))
40+
go func() {
41+
if err := http.ListenAndServe(":3000", m); err != nil {
42+
panic(err)
43+
}
44+
}()
45+
2546
// Run worker
2647
w := RunDefaultWorker(ctx, b)
2748

2849
w.RegisterWorkflow(Workflow1)
50+
w.RegisterWorkflow(SubWorkflow)
2951
w.RegisterActivity(Activity1)
3052

3153
// This worker won't actually execute Activity2, but it still needs to be aware of its signature
@@ -42,6 +64,17 @@ func main() {
4264

4365
activityWorker.Start(ctx)
4466

67+
workflowWorker := worker.NewWorkflowWorker(b, &worker.WorkflowWorkerOptions{
68+
WorkflowPollers: 1,
69+
MaxParallelWorkflowTasks: 1,
70+
WorkflowQueues: []workflow.Queue{CustomWorkflowQueue},
71+
})
72+
73+
workflowWorker.RegisterWorkflow(SubWorkflow)
74+
workflowWorker.RegisterActivity(Activity2)
75+
76+
workflowWorker.Start(ctx)
77+
4578
if err := w.Start(ctx); err != nil {
4679
panic("could not start worker")
4780
}
@@ -51,6 +84,10 @@ func main() {
5184

5285
runWorkflow(ctx, c)
5386

87+
sigint := make(chan os.Signal, 1)
88+
signal.Notify(sigint, os.Interrupt)
89+
<-sigint
90+
5491
cancel()
5592

5693
if err := w.WaitForCompletion(); err != nil {

samples/queues/workflow.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,31 @@ func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int,
3232
}
3333
logger.Info("R2 result", "r2", r2)
3434

35+
// Queue sub workflow to separate queue
36+
workflow.CreateSubWorkflowInstance[any](ctx, workflow.SubWorkflowOptions{
37+
Queue: CustomWorkflowQueue,
38+
}, SubWorkflow).Get(ctx)
39+
3540
return r1 + r2, nil
3641
}
3742

43+
func SubWorkflow(ctx workflow.Context) error {
44+
logger := workflow.Logger(ctx)
45+
logger.Info("Entering SubWorkflow")
46+
defer logger.Info("Leaving SubWorkflow")
47+
48+
// Queue activity to separate queue
49+
r2, err := workflow.ExecuteActivity[int](ctx, workflow.ActivityOptions{
50+
Queue: CustomActivityQueue,
51+
}, Activity2).Get(ctx)
52+
if err != nil {
53+
panic("error getting activity 2 result")
54+
}
55+
logger.Info("R2 result", "r2", r2)
56+
57+
return nil
58+
}
59+
3860
func Activity1(ctx context.Context, a, b int) (int, error) {
3961
logger := activity.Logger(ctx)
4062
logger.Info("Entering Activity1")

0 commit comments

Comments
 (0)