Skip to content

Commit ced2ac7

Browse files
committed
Add custom converter sample
1 parent a179f10 commit ced2ac7

File tree

4 files changed

+182
-0
lines changed

4 files changed

+182
-0
lines changed

samples/converter/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Shows how a custom `Converter` can be used for inputs/result serialization. This example uses `gob` binary encoding instead of the default JSON one.

samples/converter/converter.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/gob"
7+
"log"
8+
"os"
9+
"os/signal"
10+
"time"
11+
12+
"github.com/cschleiden/go-workflows/backend"
13+
"github.com/cschleiden/go-workflows/client"
14+
"github.com/cschleiden/go-workflows/internal/converter"
15+
"github.com/cschleiden/go-workflows/internal/payload"
16+
"github.com/cschleiden/go-workflows/samples"
17+
"github.com/cschleiden/go-workflows/worker"
18+
19+
"github.com/google/uuid"
20+
)
21+
22+
type CustomConverter struct {
23+
}
24+
25+
var _ converter.Converter = (*CustomConverter)(nil)
26+
27+
func (*CustomConverter) From(data payload.Payload, v interface{}) error {
28+
return gob.NewDecoder(bytes.NewReader(data)).Decode(v)
29+
}
30+
31+
func (*CustomConverter) To(v interface{}) (payload.Payload, error) {
32+
var buf bytes.Buffer
33+
34+
if err := gob.NewEncoder(&buf).Encode(v); err != nil {
35+
return nil, err
36+
}
37+
38+
return buf.Bytes(), nil
39+
}
40+
41+
func main() {
42+
ctx, cancel := context.WithCancel(context.Background())
43+
44+
b := samples.GetBackend("converter", backend.WithConverter(&CustomConverter{}))
45+
46+
// Run worker
47+
w := RunWorker(ctx, b)
48+
49+
// Start workflow via client
50+
c := client.New(b)
51+
52+
runWorkflow(ctx, c)
53+
54+
cancel()
55+
56+
if err := w.WaitForCompletion(); err != nil {
57+
panic("could not stop worker" + err.Error())
58+
}
59+
60+
// wait for sigint signal
61+
sigint := make(chan os.Signal, 1)
62+
signal.Notify(sigint, os.Interrupt)
63+
<-sigint
64+
}
65+
66+
func runWorkflow(ctx context.Context, c client.Client) {
67+
wf, err := c.CreateWorkflowInstance(ctx, client.WorkflowInstanceOptions{
68+
InstanceID: uuid.NewString(),
69+
}, Workflow1, "Hello world"+uuid.NewString(), 42, Inputs{
70+
Msg: "",
71+
Times: 0,
72+
})
73+
if err != nil {
74+
log.Fatal(err)
75+
panic("could not start workflow")
76+
}
77+
78+
result, err := client.GetWorkflowResult[int](ctx, c, wf, time.Second*10)
79+
if err != nil {
80+
log.Fatal(err)
81+
}
82+
83+
log.Println("Workflow finished. Result:", result)
84+
}
85+
86+
func RunWorker(ctx context.Context, mb backend.Backend) worker.Worker {
87+
w := worker.New(mb, nil)
88+
89+
w.RegisterWorkflow(Workflow1)
90+
91+
w.RegisterActivity(Activity1)
92+
w.RegisterActivity(Activity2)
93+
94+
if err := w.Start(ctx); err != nil {
95+
panic("could not start worker")
96+
}
97+
98+
return w
99+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
6+
"github.com/cschleiden/go-workflows/tester"
7+
"github.com/google/uuid"
8+
"github.com/stretchr/testify/mock"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func Test_Workflow(t *testing.T) {
13+
tester := tester.NewWorkflowTester[int](Workflow1)
14+
15+
tester.OnActivity(Activity1, mock.Anything, 35, 12).Return(47, nil)
16+
tester.OnActivity(Activity2, mock.Anything, mock.Anything, mock.Anything).Return(12, nil)
17+
18+
tester.Execute("Hello world"+uuid.NewString(), 42, Inputs{
19+
Msg: "",
20+
Times: 0,
21+
})
22+
23+
require.True(t, tester.WorkflowFinished())
24+
25+
wr, werr := tester.WorkflowResult()
26+
require.Equal(t, 59, wr)
27+
require.Empty(t, werr)
28+
tester.AssertExpectations(t)
29+
}

samples/converter/workflow.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"github.com/cschleiden/go-workflows/activity"
7+
"github.com/cschleiden/go-workflows/workflow"
8+
)
9+
10+
type Inputs struct {
11+
Msg string
12+
Times int
13+
}
14+
15+
func Workflow1(ctx workflow.Context, msg string, times int, inputs Inputs) (int, error) {
16+
logger := workflow.Logger(ctx)
17+
logger.Debug("Entering Workflow1", "msg", msg, "times", times, "inputs", inputs)
18+
defer logger.Debug("Leaving Workflow1")
19+
20+
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
21+
if err != nil {
22+
panic("error getting activity 1 result")
23+
}
24+
logger.Debug("R1 result", "r1", r1)
25+
26+
r2, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
27+
if err != nil {
28+
panic("error getting activity 1 result")
29+
}
30+
logger.Debug("R2 result", "r2", r2)
31+
32+
return r1 + r2, nil
33+
}
34+
35+
func Activity1(ctx context.Context, a, b int) (int, error) {
36+
logger := activity.Logger(ctx)
37+
logger.Debug("Entering Activity1")
38+
defer logger.Debug("Leaving Activity1")
39+
40+
// time.Sleep(5 * time.Second)
41+
42+
return a + b, nil
43+
}
44+
45+
func Activity2(ctx context.Context) (int, error) {
46+
logger := activity.Logger(ctx)
47+
logger.Debug("Entering Activity2")
48+
defer logger.Debug("Leaving Activity2")
49+
50+
// time.Sleep(1 * time.Second)
51+
52+
return 12, nil
53+
}

0 commit comments

Comments
 (0)