Skip to content

Commit d404770

Browse files
authored
Merge pull request #344 from cschleiden/fix-diag-paging
Fix mysql pagination
2 parents 9c96c3d + 6984332 commit d404770

File tree

5 files changed

+59
-5
lines changed

5 files changed

+59
-5
lines changed

backend/mysql/diagnostics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ func (mb *mysqlBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
2525
ctx,
2626
`SELECT i.instance_id, i.execution_id, i.created_at, i.completed_at
2727
FROM instances i
28-
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE id = ? AND execution_id = ?) ii
28+
INNER JOIN (SELECT instance_id, created_at FROM instances WHERE instance_id = ? AND execution_id = ?) ii
2929
ON i.created_at < ii.created_at OR (i.created_at = ii.created_at AND i.instance_id < ii.instance_id)
3030
ORDER BY i.created_at DESC, i.instance_id DESC
3131
LIMIT ?`,

backend/redis/diagnostics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ func (rb *redisBackend) GetWorkflowInstances(ctx context.Context, afterInstanceI
1717
max := "+inf"
1818

1919
if afterInstanceID != "" {
20-
afterID := instanceSegment(core.NewWorkflowInstance(afterInstanceID, afterExecutionID))
21-
scores, err := rb.rdb.ZMScore(ctx, rb.keys.instancesByCreation(), afterID).Result()
20+
afterSegmentID := instanceSegment(core.NewWorkflowInstance(afterInstanceID, afterExecutionID))
21+
scores, err := rb.rdb.ZMScore(ctx, rb.keys.instancesByCreation(), afterSegmentID).Result()
2222
if err != nil {
23-
return nil, fmt.Errorf("getting instance score for %v: %w", afterID, err)
23+
return nil, fmt.Errorf("getting instance score for %v: %w", afterSegmentID, err)
2424
}
2525

2626
if len(scores) == 0 {

backend/redis/instance.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, instance *wo
5858
event.ID,
5959
eventData,
6060
payloadData,
61-
time.Now().UTC().Unix(),
61+
time.Now().UTC().UnixNano(),
6262
).Result()
6363

6464
if err != nil {

backend/test/e2e.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,7 @@ func EndToEndBackendTest(t *testing.T, setup func(options ...backend.BackendOpti
781781
tests = append(tests, e2eActivityTests...)
782782
tests = append(tests, e2eTimerTests...)
783783
tests = append(tests, e2eStatsTests...)
784+
tests = append(tests, e2eDiagTests...)
784785

785786
run := func(suffix string, workerOptions worker.Options) {
786787
for _, tt := range tests {

backend/test/e2e_diag.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/cschleiden/go-workflows/client"
8+
"github.com/cschleiden/go-workflows/diag"
9+
"github.com/cschleiden/go-workflows/worker"
10+
"github.com/cschleiden/go-workflows/workflow"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
var e2eDiagTests = []backendTest{
15+
{
16+
name: "Diag_Paging",
17+
f: func(t *testing.T, ctx context.Context, c *client.Client, w *worker.Worker, b TestBackend) {
18+
diagBackend, ok := b.(diag.Backend)
19+
if !ok {
20+
t.Skip("Backend does not implement diag.Backend")
21+
}
22+
23+
wf := func(ctx workflow.Context) (bool, error) {
24+
return true, nil
25+
}
26+
27+
register(t, ctx, w, []interface{}{wf}, nil)
28+
29+
for i := 0; i < 50; i++ {
30+
runWorkflow(t, ctx, c, wf)
31+
}
32+
33+
afterInstanceID := ""
34+
afterExecutionID := ""
35+
36+
// Fetch 5 pages
37+
for i := 0; i < 5; i++ {
38+
refs, err := diagBackend.GetWorkflowInstances(ctx, afterInstanceID, afterExecutionID, 10)
39+
require.NoError(t, err)
40+
require.Len(t, refs, 10)
41+
42+
require.NotEqual(t, afterInstanceID, refs[len(refs)-1].Instance.InstanceID)
43+
44+
afterInstanceID = refs[len(refs)-1].Instance.InstanceID
45+
afterExecutionID = refs[len(refs)-1].Instance.ExecutionID
46+
}
47+
48+
refs, err := diagBackend.GetWorkflowInstances(ctx, afterInstanceID, afterExecutionID, 10)
49+
require.NoError(t, err)
50+
require.Len(t, refs, 0)
51+
},
52+
},
53+
}

0 commit comments

Comments
 (0)