Skip to content

Commit 0972c2d

Browse files
authored
Partial sub-workflow support
1 parent 06b3bef commit 0972c2d

File tree

4 files changed

+31
-44
lines changed

4 files changed

+31
-44
lines changed

backend/redis/instance.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,8 @@ import (
1414
)
1515

1616
func (rb *redisBackend) CreateWorkflowInstance(ctx context.Context, event history.WorkflowEvent) error {
17-
// Store instance with its state
18-
if err := createInstance(ctx, rb.rdb, event.WorkflowInstance, &instanceState{
19-
InstanceID: event.WorkflowInstance.GetInstanceID(),
20-
ExecutionID: event.WorkflowInstance.GetExecutionID(),
21-
State: backend.WorkflowStateActive,
22-
CreatedAt: time.Now(),
23-
}); err != nil {
24-
return errors.Wrap(err, "could not create workflow instance")
17+
if err := createInstance(ctx, rb.rdb, event.WorkflowInstance, false); err != nil {
18+
return err
2519
}
2620

2721
// Create event stream
@@ -91,10 +85,15 @@ type instanceState struct {
9185
CompletedAt *time.Time `json:"completed_at,omitempty"`
9286
}
9387

94-
func createInstance(ctx context.Context, rdb redis.UniversalClient, instance core.WorkflowInstance, state *instanceState) error {
88+
func createInstance(ctx context.Context, rdb redis.UniversalClient, instance core.WorkflowInstance, ignoreDuplicate bool) error {
9589
key := instanceKey(instance.GetInstanceID())
9690

97-
b, err := json.Marshal(state)
91+
b, err := json.Marshal(&instanceState{
92+
InstanceID: instance.GetInstanceID(),
93+
ExecutionID: instance.GetExecutionID(),
94+
State: backend.WorkflowStateActive,
95+
CreatedAt: time.Now(),
96+
})
9897
if err != nil {
9998
return errors.Wrap(err, "could not marshal instance state")
10099
}
@@ -104,7 +103,7 @@ func createInstance(ctx context.Context, rdb redis.UniversalClient, instance cor
104103
return errors.Wrap(err, "could not store instance")
105104
}
106105

107-
if !ok {
106+
if !ignoreDuplicate && !ok {
108107
return errors.New("workflow instance already exists")
109108
}
110109

backend/redis/workflow.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,13 +103,14 @@ func (rb *redisBackend) CompleteWorkflowTask(ctx context.Context, taskID string,
103103
for targetInstance, events := range groupedEvents {
104104
if instance.GetInstanceID() != targetInstance.GetInstanceID() {
105105
// Create new instance
106-
// TODO: Support creating sub-workflows
107-
panic("not implemented")
106+
if err := createInstance(ctx, rb.rdb, targetInstance, true); err != nil {
107+
return err
108+
}
108109
}
109110

110111
// Insert pending events for target instance
111112
for _, event := range events {
112-
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(instance.GetInstanceID()), &event); err != nil {
113+
if err := addEventToStream(ctx, rb.rdb, pendingEventsKey(targetInstance.GetInstanceID()), &event); err != nil {
113114
return err
114115
}
115116
}

samples/signal/signal.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func Workflow1(ctx workflow.Context, msg string, subID string) (string, error) {
101101
func SubWorkflow1(ctx workflow.Context) (string, error) {
102102
samples.Trace(ctx, "Waiting for signal from sub-worflow")
103103

104-
c := workflow.NewSignalChannel[string](ctx, "sub-signal")
104+
c := workflow.NewSignalChannel[int](ctx, "sub-signal")
105105
c.Receive(ctx)
106106

107107
samples.Trace(ctx, "Received sub-workflow signal")

samples/subworkflow/subworkflow.go

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ import (
77
"os/signal"
88

99
"github.com/cschleiden/go-workflows/backend"
10-
"github.com/cschleiden/go-workflows/backend/sqlite"
10+
"github.com/cschleiden/go-workflows/backend/redis"
1111
"github.com/cschleiden/go-workflows/client"
12+
"github.com/cschleiden/go-workflows/samples"
1213
"github.com/cschleiden/go-workflows/worker"
1314
"github.com/cschleiden/go-workflows/workflow"
1415
"github.com/google/uuid"
@@ -18,8 +19,12 @@ import (
1819
func main() {
1920
ctx := context.Background()
2021

21-
b := sqlite.NewInMemoryBackend()
22+
// b := sqlite.NewInMemoryBackend()
2223
// b := sqlite.NewSqliteBackend("subworkflow.sqlite")
24+
b, err := redis.NewRedisBackend("localhost:6379", "", "RedisPassw0rd", 0)
25+
if err != nil {
26+
panic(err)
27+
}
2328

2429
// Run worker
2530
go RunWorker(ctx, b)
@@ -60,66 +65,48 @@ func RunWorker(ctx context.Context, mb backend.Backend) {
6065
}
6166

6267
func Workflow1(ctx workflow.Context, msg string) error {
63-
log.Println("Entering Workflow1")
64-
log.Println("\tWorkflow instance input:", msg)
65-
log.Println("\tIsReplaying:", workflow.Replaying(ctx))
66-
67-
defer func() {
68-
log.Println("Leaving Workflow1")
69-
}()
68+
samples.Trace(ctx, "Entering Workflow1")
69+
samples.Trace(ctx, "\tWorkflow instance input:", msg)
7070

7171
wr, err := workflow.CreateSubWorkflowInstance[string](ctx, workflow.DefaultSubWorkflowOptions, Workflow2, "some input").Get(ctx)
7272
if err != nil {
7373
return errors.Wrap(err, "could not get sub workflow result")
7474
}
7575

76-
log.Println("Sub workflow result:", wr)
76+
samples.Trace(ctx, "Sub workflow result:", wr)
7777

7878
return nil
7979
}
8080

8181
func Workflow2(ctx workflow.Context, msg string) (string, error) {
82-
log.Println("Entering Workflow2")
83-
log.Println("\tWorkflow instance input:", msg)
84-
log.Println("\tIsReplaying:", workflow.Replaying(ctx))
85-
86-
defer func() {
87-
log.Println("Leaving Workflow2")
88-
}()
82+
samples.Trace(ctx, "Entering Workflow2")
83+
samples.Trace(ctx, "\tWorkflow instance input:", msg)
8984

9085
r1, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12).Get(ctx)
9186
if err != nil {
9287
panic("error getting activity 1 result")
9388
}
94-
log.Println("R1 result:", r1)
95-
log.Println("\tIsReplaying:", workflow.Replaying(ctx))
89+
samples.Trace(ctx, "R1 result:", r1)
9690

9791
r2, err := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2).Get(ctx)
9892
if err != nil {
9993
panic("error getting activity 1 result")
10094
}
101-
log.Println("R2 result:", r2)
102-
log.Println("\tIsReplaying:", workflow.Replaying(ctx))
95+
samples.Trace(ctx, "R2 result:", r2)
10396

10497
return "W2 Result", nil
10598
}
10699

107100
func Activity1(ctx context.Context, a, b int) (int, error) {
108101
log.Println("Entering Activity1")
109-
110-
defer func() {
111-
log.Println("Leaving Activity1")
112-
}()
102+
defer log.Println("Leaving Activity1")
113103

114104
return a + b, nil
115105
}
116106

117107
func Activity2(ctx context.Context) (int, error) {
118108
log.Println("Entering Activity2")
119-
120-
defer func() {
121-
log.Println("Leaving Activity2")
122-
}()
109+
defer log.Println("Leaving Activity2")
123110

124111
return 12, nil
125112
}

0 commit comments

Comments
 (0)