Skip to content

Commit 3f0e192

Browse files
committed
Return queue in diag endpoints
1 parent b4854e2 commit 3f0e192

File tree

5 files changed

+61
-14
lines changed

5 files changed

+61
-14
lines changed

backend/mysql/diagnostics.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2323
if afterInstanceID != "" {
2424
rows, err = tx.QueryContext(
2525
ctx,
26-
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
26+
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at, i.queue
2727
FROM instances i
2828
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE instance_id = ? AND execution_id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
@@ -36,7 +36,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
3636
} else {
3737
rows, err = tx.QueryContext(
3838
ctx,
39-
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
39+
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at, i.queue
4040
FROM instances i
4141
ORDER BY i.created_at DESC, i.instance_id DESC
4242
LIMIT ?`,
@@ -52,10 +52,10 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
5252
var instances []*diag.WorkflowInstanceRef
5353

5454
for rows.Next() {
55-
var id, executionID string
55+
var id, executionID, queue string
5656
var createdAt time.Time
5757
var completedAt *time.Time
58-
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
58+
err = rows.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
5959
if err != nil {
6060
return nil, err
6161
}
@@ -70,6 +70,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
7070
CreatedAt: createdAt,
7171
CompletedAt: completedAt,
7272
State: state,
73+
Queue: queue,
7374
})
7475
}
7576

@@ -85,13 +86,15 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instance *core.
8586

8687
res := tx.QueryRowContext(
8788
ctx,
88-
"SELECT instance_id, execution_id, created_at, completed_at FROM instances WHERE instance_id = ? AND execution_id = ?", instance.InstanceID, instance.ExecutionID)
89+
`SELECT instance_id, execution_id, created_at, completed_at, queue
90+
FROM instances
91+
WHERE instance_id = ? AND execution_id = ?`, instance.InstanceID, instance.ExecutionID)
8992

90-
var id, executionID string
93+
var id, executionID, queue string
9194
var createdAt time.Time
9295
var completedAt *time.Time
9396

94-
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
97+
err = res.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
9598
if err != nil {
9699
if err == sql.ErrNoRows {
97100
return nil, nil
@@ -110,6 +113,7 @@ func (mb *mysqlBackend) GetWorkflowInstance(ctx context.Context, instance *core.
110113
CreatedAt: createdAt,
111114
CompletedAt: completedAt,
112115
State: state,
116+
Queue: queue,
113117
}, nil
114118
}
115119

backend/redis/diagnostics.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
7878
CreatedAt: state.CreatedAt,
7979
CompletedAt: state.CompletedAt,
8080
State: state.State,
81+
Queue: state.Queue,
8182
})
8283
}
8384

@@ -104,5 +105,6 @@ func mapWorkflowInstance(instance *instanceState) *diag.WorkflowInstanceRef {
104105
CreatedAt: instance.CreatedAt,
105106
CompletedAt: instance.CompletedAt,
106107
State: instance.State,
108+
Queue: instance.Queue,
107109
}
108110
}

backend/sqlite/diagnostics.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
2525
if afterInstanceID != "" {
2626
rows, err = tx.QueryContext(
2727
ctx,
28-
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
28+
`SELECT i.id, i.execution_id, i.created_at, i.completed_at, i.queue
2929
FROM instances i
3030
INNER JOIN (SELECT id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
3131
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.id < ii.id)
@@ -38,7 +38,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
3838
} else {
3939
rows, err = tx.QueryContext(
4040
ctx,
41-
`SELECT i.id, i.execution_id, i.created_at, i.completed_at
41+
`SELECT i.id, i.execution_id, i.created_at, i.completed_at, i.queue
4242
FROM instances i
4343
ORDER BY i.created_at DESC, i.id DESC
4444
LIMIT ?`,
@@ -54,10 +54,10 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
5454
var instances []*diag.WorkflowInstanceRef
5555

5656
for rows.Next() {
57-
var id, executionID string
57+
var id, executionID, queue string
5858
var createdAt time.Time
5959
var completedAt *time.Time
60-
err = rows.Scan(&id, &executionID, &createdAt, &completedAt)
60+
err = rows.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
6161
if err != nil {
6262
return nil, err
6363
}
@@ -72,6 +72,7 @@ func (sb *sqliteBackend) GetWorkflowInstances(ctx context.Context, afterInstance
7272
CreatedAt: createdAt,
7373
CompletedAt: completedAt,
7474
State: state,
75+
Queue: queue,
7576
})
7677
}
7778

@@ -89,13 +90,17 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instance *core
8990
}
9091
defer tx.Rollback()
9192

92-
res := tx.QueryRowContext(ctx, "SELECT id, execution_id, created_at, completed_at FROM instances WHERE id = ? AND execution_id = ?", instance.InstanceID, instance.ExecutionID)
93+
res := tx.QueryRowContext(
94+
ctx,
95+
`SELECT id, execution_id, created_at, completed_at, queue
96+
FROM instances WHERE id = ? AND execution_id = ?`,
97+
instance.InstanceID, instance.ExecutionID)
9398

94-
var id, executionID string
99+
var id, executionID, queue string
95100
var createdAt time.Time
96101
var completedAt *time.Time
97102

98-
err = res.Scan(&id, &executionID, &createdAt, &completedAt)
103+
err = res.Scan(&id, &executionID, &createdAt, &completedAt, &queue)
99104
if err != nil {
100105
if err == sql.ErrNoRows {
101106
return nil, nil
@@ -114,6 +119,7 @@ func (sb *sqliteBackend) GetWorkflowInstance(ctx context.Context, instance *core
114119
CreatedAt: createdAt,
115120
CompletedAt: completedAt,
116121
State: state,
122+
Queue: queue,
117123
}, nil
118124
}
119125

backend/test/e2e_diag.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/cschleiden/go-workflows/diag"
99
"github.com/cschleiden/go-workflows/worker"
1010
"github.com/cschleiden/go-workflows/workflow"
11+
"github.com/google/uuid"
1112
"github.com/stretchr/testify/require"
1213
)
1314

@@ -50,4 +51,37 @@ var e2eDiagTests = []backendTest{
5051
require.Len(t, refs, 0)
5152
},
5253
},
54+
{
55+
name: "Diag_GetWorkflowInstance_ReturnsQueue",
56+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
57+
diagBackend, ok := b.(diag.Backend)
58+
if !ok {
59+
t.Skip("Backend does not implement diag.Backend")
60+
}
61+
62+
wf := func(ctx workflow.Context) (bool, error) {
63+
return true, nil
64+
}
65+
66+
register(t, ctx, w, []interface{}{wf}, nil)
67+
68+
wfi := runWorkflow(t, ctx, c, wf)
69+
70+
wfi2, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
71+
InstanceID: uuid.NewString(),
72+
Queue: "custom-queue",
73+
}, wf)
74+
require.NoError(t, err)
75+
76+
wfState, err := diagBackend.GetWorkflowInstance(ctx, wfi)
77+
require.NoError(t, err)
78+
require.NotNil(t, wfState)
79+
require.Equal(t, string(workflow.QueueDefault), wfState.Queue)
80+
81+
wfState2, err := diagBackend.GetWorkflowInstance(ctx, wfi2)
82+
require.NoError(t, err)
83+
require.NotNil(t, wfState2)
84+
require.Equal(t, "custom-queue", wfState2.Queue)
85+
},
86+
},
5387
}

diag/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type WorkflowInstanceRef struct {
1515
CreatedAt time.Time `json:"created_at,omitempty"`
1616
CompletedAt *time.Time `json:"completed_at,omitempty"`
1717
State core.WorkflowInstanceState `json:"state"`
18+
Queue string `json:"queue"`
1819
}
1920

2021
type Event struct {

0 commit comments

Comments
 (0)