Skip to content

Commit a80bf88

Browse files
authored
Merge pull request #59 from cschleiden/e2e-testing
Add some simple E2E tests
2 parents 30abdcc + aa73eca commit a80bf88

File tree

11 files changed

+213
-38
lines changed

11 files changed

+213
-38
lines changed

backend/mysql/mysql_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,3 +55,42 @@ func Test_MysqlBackend(t *testing.T) {
5555
}
5656
})
5757
}
58+
59+
func TestMySqlBackendE2E(t *testing.T) {
60+
if testing.Short() {
61+
t.Skip()
62+
}
63+
64+
var dbName string
65+
66+
test.EndToEndBackendTest(t, func() backend.Backend {
67+
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
68+
if err != nil {
69+
panic(err)
70+
}
71+
72+
dbName = "test_" + strings.Replace(uuid.NewString(), "-", "", -1)
73+
if _, err := db.Exec("CREATE DATABASE " + dbName); err != nil {
74+
panic(fmt.Errorf("creating database: %w", err))
75+
}
76+
77+
if err := db.Close(); err != nil {
78+
panic(err)
79+
}
80+
81+
return NewMysqlBackend("localhost", 3306, testUser, testPassword, dbName, backend.WithStickyTimeout(0))
82+
}, func(b backend.Backend) {
83+
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword))
84+
if err != nil {
85+
panic(err)
86+
}
87+
88+
if _, err := db.Exec("DROP DATABASE IF EXISTS " + dbName); err != nil {
89+
panic(fmt.Errorf("dropping database: %w", err))
90+
}
91+
92+
if err := db.Close(); err != nil {
93+
panic(err)
94+
}
95+
})
96+
}

backend/redis/redis_test.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,35 +8,45 @@ import (
88
"github.com/cschleiden/go-workflows/backend"
99
"github.com/cschleiden/go-workflows/backend/test"
1010
"github.com/go-redis/redis/v8"
11-
"github.com/stretchr/testify/require"
1211
)
1312

1413
func Test_RedisBackend(t *testing.T) {
1514
if testing.Short() {
1615
t.Skip()
1716
}
1817

19-
test.BackendTest(t, func() backend.Backend {
20-
address := "localhost:6379"
21-
user := ""
22-
password := "RedisPassw0rd"
23-
24-
// Flush database
25-
client := redis.NewUniversalClient(&redis.UniversalOptions{
26-
Addrs: []string{address},
27-
Username: user,
28-
Password: password,
29-
DB: 0,
30-
})
31-
32-
if err := client.FlushDB(context.Background()).Err(); err != nil {
33-
panic(err)
34-
}
35-
36-
// Disable sticky workflow behavior for the test execution
37-
b, err := NewRedisBackend(address, user, password, 0, WithBlockTimeout(time.Millisecond*2))
38-
require.NoError(t, err)
39-
40-
return b
41-
}, nil)
18+
test.BackendTest(t, createBackend, nil)
19+
}
20+
21+
func Test_EndToEndRedisBackend(t *testing.T) {
22+
if testing.Short() {
23+
t.Skip()
24+
}
25+
26+
test.EndToEndBackendTest(t, createBackend, nil)
27+
}
28+
29+
func createBackend() backend.Backend {
30+
address := "localhost:6379"
31+
user := ""
32+
password := "RedisPassw0rd"
33+
34+
// Flush database
35+
client := redis.NewUniversalClient(&redis.UniversalOptions{
36+
Addrs: []string{address},
37+
Username: user,
38+
Password: password,
39+
DB: 0,
40+
})
41+
42+
if err := client.FlushDB(context.Background()).Err(); err != nil {
43+
panic(err)
44+
}
45+
46+
b, err := NewRedisBackend(address, user, password, 0, WithBlockTimeout(time.Millisecond*2))
47+
if err != nil {
48+
panic(err)
49+
}
50+
51+
return b
4252
}

backend/sqlite/sqlite_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,10 @@ func Test_SqliteBackend(t *testing.T) {
1313
return NewInMemoryBackend(backend.WithStickyTimeout(0))
1414
}, nil)
1515
}
16+
17+
func Test_EndToEndSqliteBackend(t *testing.T) {
18+
test.EndToEndBackendTest(t, func() backend.Backend {
19+
// Disable sticky workflow behavior for the test execution
20+
return NewInMemoryBackend(backend.WithStickyTimeout(0))
21+
}, nil)
22+
}

backend/test/e2e.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/cschleiden/go-workflows/backend"
10+
"github.com/cschleiden/go-workflows/client"
11+
"github.com/cschleiden/go-workflows/worker"
12+
"github.com/cschleiden/go-workflows/workflow"
13+
"github.com/google/uuid"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func EndToEndBackendTest(t *testing.T, setup func() backend.Backend, teardown func(b backend.Backend)) {
18+
tests := []struct {
19+
name string
20+
f func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker)
21+
}{
22+
{
23+
name: "SimpleWorkflow",
24+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker) {
25+
wf := func(ctx workflow.Context, msg string) (string, error) {
26+
return msg + " world", nil
27+
}
28+
register(t, ctx, w, []interface{}{wf}, nil)
29+
30+
output, err := runWorkflowWithResult[string](t, ctx, c, wf, "hello")
31+
32+
require.Equal(t, "hello world", output)
33+
require.NoError(t, err)
34+
},
35+
},
36+
{
37+
name: "UnregisteredWorkflow",
38+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker) {
39+
wf := func(ctx workflow.Context, msg string) (string, error) {
40+
return msg + " world", nil
41+
}
42+
register(t, ctx, w, nil, nil)
43+
44+
output, err := runWorkflowWithResult[string](t, ctx, c, wf, "hello")
45+
46+
require.Zero(t, output)
47+
require.ErrorContains(t, err, "workflow 1 not found")
48+
},
49+
},
50+
{
51+
name: "UnregisteredActivity",
52+
f: func(t *testing.T, ctx context.Context, c client.Client, w worker.Worker) {
53+
a := func(context.Context) error { return nil }
54+
wf := func(ctx workflow.Context) (int, error) {
55+
return workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, a).Get(ctx)
56+
}
57+
register(t, ctx, w, []interface{}{wf}, nil)
58+
59+
output, err := runWorkflowWithResult[int](t, ctx, c, wf)
60+
61+
require.Zero(t, output)
62+
require.ErrorContains(t, err, "activity not found")
63+
},
64+
},
65+
}
66+
67+
for _, tt := range tests {
68+
t.Run(tt.name, func(t *testing.T) {
69+
b := setup()
70+
ctx := context.Background()
71+
ctx, cancel := context.WithCancel(ctx)
72+
73+
c := client.New(b)
74+
w := worker.New(b, &worker.DefaultWorkerOptions)
75+
76+
tt.f(t, ctx, c, w)
77+
78+
cancel()
79+
if err := w.WaitForCompletion(); err != nil {
80+
fmt.Println("Worker did not stop in time")
81+
t.FailNow()
82+
}
83+
84+
if teardown != nil {
85+
teardown(b)
86+
}
87+
})
88+
}
89+
}
90+
91+
func register(t *testing.T, ctx context.Context, w worker.Worker, workflows []interface{}, activities []interface{}) {
92+
for _, wf := range workflows {
93+
require.NoError(t, w.RegisterWorkflow(wf))
94+
}
95+
96+
for _, a := range activities {
97+
require.NoError(t, w.RegisterActivity(a))
98+
}
99+
100+
err := w.Start(ctx)
101+
require.NoError(t, err)
102+
}
103+
104+
func runWorkflow(t *testing.T, ctx context.Context, c client.Client, wf interface{}, inputs ...interface{}) *workflow.Instance {
105+
instance, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
106+
InstanceID: uuid.NewString(),
107+
}, wf, inputs...)
108+
require.NoError(t, err)
109+
110+
return instance
111+
}
112+
113+
func runWorkflowWithResult[T any](t *testing.T, ctx context.Context, c client.Client, wf interface{}, inputs ...interface{}) (T, error) {
114+
instance := runWorkflow(t, ctx, c, wf, inputs...)
115+
return client.GetWorkflowResult[T](ctx, c, instance, time.Second*10)
116+
}

internal/worker/activity.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616

1717
type ActivityWorker interface {
1818
Start(context.Context) error
19-
Stop() error
19+
WaitForCompletion() error
2020
}
2121

2222
type activityWorker struct {
@@ -61,7 +61,7 @@ func (aw *activityWorker) Start(ctx context.Context) error {
6161
return nil
6262
}
6363

64-
func (aw *activityWorker) Stop() error {
64+
func (aw *activityWorker) WaitForCompletion() error {
6565
aw.wg.Wait()
6666

6767
return nil

internal/worker/workflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
type WorkflowWorker interface {
1717
Start(context.Context) error
1818

19-
Stop() error
19+
WaitForCompletion() error
2020
}
2121

2222
type workflowWorker struct {
@@ -64,7 +64,7 @@ func (ww *workflowWorker) Start(ctx context.Context) error {
6464
return nil
6565
}
6666

67-
func (ww *workflowWorker) Stop() error {
67+
func (ww *workflowWorker) WaitForCompletion() error {
6868
ww.wg.Wait()
6969

7070
return nil

internal/workflow/executor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (e *executor) ExecuteTask(ctx context.Context, t *task.Workflow) (*Executio
127127
"instance_id", t.WorkflowInstance.InstanceID,
128128
"executed", len(executedEvents),
129129
"last_sequence_id", e.lastSequenceID,
130-
"completed", e.workflow.Completed(),
130+
"completed", completed,
131131
)
132132

133133
return &ExecutionResult{

samples/simple/simple.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func main() {
3434

3535
cancel()
3636

37-
if err := w.Stop(); err != nil {
37+
if err := w.WaitForCompletion(); err != nil {
3838
panic("could not stop worker" + err.Error())
3939
}
4040
}

samples/subworkflow/subworkflow.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func main() {
3636

3737
cancel()
3838

39-
if err := w.Stop(); err != nil {
39+
if err := w.WaitForCompletion(); err != nil {
4040
panic("could not stop worker" + err.Error())
4141
}
4242
}

samples/timer/timer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func main() {
3232
startWorkflow(ctx, c)
3333

3434
cancel()
35-
w.Stop()
35+
w.WaitForCompletion()
3636
}
3737

3838
func startWorkflow(ctx context.Context, c client.Client) {

0 commit comments

Comments
 (0)