Skip to content

Commit 95f4442

Browse files
JoshVanLmsfussell
andauthored
[1.16] Update Go workflow examples to use vanity client (#4874)
* [1.16] Update Go workflow examples to use vanity client Signed-off-by: joshvanl <[email protected]> * Rename `StartWorkflow` to `ScheduleWorkflow` Signed-off-by: joshvanl <[email protected]> * Updates types for workflow multi-app example Signed-off-by: joshvanl <[email protected]> --------- Signed-off-by: joshvanl <[email protected]> Co-authored-by: Mark Fussell <[email protected]>
1 parent 79c6613 commit 95f4442

File tree

3 files changed

+165
-95
lines changed

3 files changed

+165
-95
lines changed

daprdocs/content/en/developing-applications/building-blocks/workflow/howto-author-workflow.md

Lines changed: 111 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -874,86 +874,98 @@ package main
874874

875875
import (
876876
"context"
877+
"errors"
877878
"fmt"
878879
"log"
879880
"time"
880881

881-
"github.com/dapr/durabletask-go/api"
882-
"github.com/dapr/durabletask-go/backend"
883-
"github.com/dapr/durabletask-go/client"
884-
"github.com/dapr/durabletask-go/task"
885-
dapr "github.com/dapr/go-sdk/client"
882+
"github.com/dapr/durabletask-go/workflow"
883+
"github.com/dapr/go-sdk/client"
886884
)
887885

888886
var stage = 0
889-
890-
const (
891-
workflowComponent = "dapr"
892-
)
887+
var failActivityTries = 0
893888

894889
func main() {
895-
registry := task.NewTaskRegistry()
890+
r := workflow.NewRegistry()
896891

897-
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
892+
if err := r.AddWorkflow(TestWorkflow); err != nil {
898893
log.Fatal(err)
899894
}
900895
fmt.Println("TestWorkflow registered")
901896

902-
if err := registry.AddActivity(TestActivity); err != nil {
897+
if err := r.AddActivity(TestActivity); err != nil {
903898
log.Fatal(err)
904899
}
905900
fmt.Println("TestActivity registered")
906901

907-
daprClient, err := dapr.NewClient()
908-
if err != nil {
909-
log.Fatalf("failed to create Dapr client: %v", err)
902+
if err := r.AddActivity(FailActivity); err != nil {
903+
log.Fatal(err)
910904
}
905+
fmt.Println("FailActivity registered")
911906

912-
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
913-
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
914-
log.Fatalf("failed to start work item listener: %v", err)
907+
wclient, err := client.NewWorkflowClient()
908+
if err != nil {
909+
log.Fatal(err)
915910
}
911+
fmt.Println("Worker initialized")
916912

913+
ctx, cancel := context.WithCancel(context.Background())
914+
if err = wclient.StartWorker(ctx, r); err != nil {
915+
log.Fatal(err)
916+
}
917917
fmt.Println("runner started")
918918

919-
ctx := context.Background()
920-
921919
// Start workflow test
922-
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
920+
// Set the start time to the current time to not wait for the workflow to
921+
// "start". This is useful for increasing the throughput of creating
922+
// workflows.
923+
// workflow.WithStartTime(time.Now())
924+
instanceID, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
923925
if err != nil {
924926
log.Fatalf("failed to start workflow: %v", err)
925927
}
926-
fmt.Printf("workflow started with id: %v\n", id)
928+
fmt.Printf("workflow started with id: %v\n", instanceID)
927929

928930
// Pause workflow test
929-
err = client.PurgeOrchestrationState(ctx, id)
931+
err = wclient.SuspendWorkflow(ctx, instanceID, "")
930932
if err != nil {
931933
log.Fatalf("failed to pause workflow: %v", err)
932934
}
933935

934-
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
936+
respFetch, err := wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
935937
if err != nil {
936-
log.Fatalf("failed to get workflow: %v", err)
938+
log.Fatalf("failed to fetch workflow: %v", err)
937939
}
938-
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)
940+
941+
if respFetch.RuntimeStatus != workflow.StatusSuspended {
942+
log.Fatalf("workflow not paused: %s: %v", respFetch.RuntimeStatus, respFetch)
943+
}
944+
945+
fmt.Printf("workflow paused\n")
939946

940947
// Resume workflow test
941-
err = client.ResumeOrchestration(ctx, id, "")
948+
err = wclient.ResumeWorkflow(ctx, instanceID, "")
942949
if err != nil {
943950
log.Fatalf("failed to resume workflow: %v", err)
944951
}
945-
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)
946952

947-
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
953+
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
948954
if err != nil {
949955
log.Fatalf("failed to get workflow: %v", err)
950956
}
951-
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)
957+
958+
if respFetch.RuntimeStatus != workflow.StatusRunning {
959+
log.Fatalf("workflow not running")
960+
}
961+
962+
fmt.Println("workflow resumed")
952963

953964
fmt.Printf("stage: %d\n", stage)
954965

955966
// Raise Event Test
956-
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
967+
968+
err = wclient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
957969
if err != nil {
958970
fmt.Printf("failed to raise event: %v", err)
959971
}
@@ -964,44 +976,99 @@ func main() {
964976

965977
fmt.Printf("stage: %d\n", stage)
966978

967-
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
979+
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
980+
_, err = wclient.WaitForWorkflowCompletion(waitCtx, instanceID)
981+
cancel()
982+
if err != nil {
983+
log.Fatalf("failed to wait for workflow: %v", err)
984+
}
985+
986+
fmt.Printf("fail activity executions: %d\n", failActivityTries)
987+
988+
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
968989
if err != nil {
969990
log.Fatalf("failed to get workflow: %v", err)
970991
}
971992

972-
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
993+
fmt.Printf("workflow status: %v\n", respFetch.String())
973994

974995
// Purge workflow test
975-
err = client.PurgeOrchestrationState(ctx, id)
996+
err = wclient.PurgeWorkflowState(ctx, instanceID)
976997
if err != nil {
977998
log.Fatalf("failed to purge workflow: %v", err)
978999
}
1000+
1001+
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
1002+
if err == nil || respFetch != nil {
1003+
log.Fatalf("failed to purge workflow: %v", err)
1004+
}
1005+
9791006
fmt.Println("workflow purged")
1007+
1008+
fmt.Printf("stage: %d\n", stage)
1009+
1010+
// Terminate workflow test
1011+
id, err := wclient.ScheduleWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
1012+
if err != nil {
1013+
log.Fatalf("failed to start workflow: %v", err)
1014+
}
1015+
fmt.Printf("workflow started with id: %v\n", instanceID)
1016+
1017+
metadata, err := wclient.WaitForWorkflowStart(ctx, id)
1018+
if err != nil {
1019+
log.Fatalf("failed to get workflow: %v", err)
1020+
}
1021+
fmt.Printf("workflow status: %s\n", metadata.String())
1022+
1023+
err = wclient.TerminateWorkflow(ctx, id)
1024+
if err != nil {
1025+
log.Fatalf("failed to terminate workflow: %v", err)
1026+
}
1027+
fmt.Println("workflow terminated")
1028+
1029+
err = wclient.PurgeWorkflowState(ctx, id)
1030+
if err != nil {
1031+
log.Fatalf("failed to purge workflow: %v", err)
1032+
}
1033+
fmt.Println("workflow purged")
1034+
1035+
cancel()
1036+
1037+
fmt.Println("workflow worker successfully shutdown")
9801038
}
9811039

982-
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
1040+
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
9831041
var input int
9841042
if err := ctx.GetInput(&input); err != nil {
9851043
return nil, err
9861044
}
9871045
var output string
988-
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
1046+
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
9891047
return nil, err
9901048
}
9911049

992-
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
1050+
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
9931051
if err != nil {
9941052
return nil, err
9951053
}
9961054

997-
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
1055+
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
9981056
return nil, err
9991057
}
10001058

1059+
if err := ctx.CallActivity(FailActivity, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
1060+
MaxAttempts: 3,
1061+
InitialRetryInterval: 100 * time.Millisecond,
1062+
BackoffCoefficient: 2,
1063+
MaxRetryInterval: 1 * time.Second,
1064+
})).Await(nil); err == nil {
1065+
return nil, fmt.Errorf("unexpected no error executing fail activity")
1066+
}
1067+
10011068
return output, nil
10021069
}
10031070

1004-
func TestActivity(ctx task.ActivityContext) (any, error) {
1071+
func TestActivity(ctx workflow.ActivityContext) (any, error) {
10051072
var input int
10061073
if err := ctx.GetInput(&input); err != nil {
10071074
return "", err
@@ -1011,6 +1078,11 @@ func TestActivity(ctx task.ActivityContext) (any, error) {
10111078

10121079
return fmt.Sprintf("Stage: %d", stage), nil
10131080
}
1081+
1082+
func FailActivity(ctx workflow.ActivityContext) (any, error) {
1083+
failActivityTries += 1
1084+
return nil, errors.New("dummy activity error")
1085+
}
10141086
```
10151087
10161088
[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

daprdocs/content/en/developing-applications/building-blocks/workflow/workflow-multi-app.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,11 @@ The following example shows how to execute the activity `ActivityA` on the targe
6363
{{% tab "Go" %}}
6464

6565
```go
66-
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
66+
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
6767
var output string
6868
err := ctx.CallActivity("ActivityA",
69-
task.WithActivityInput("my-input"),
70-
task.WithActivityAppID("App2"), // Here we set the target app ID which will execute this activity.
69+
workflow.WithActivityInput("my-input"),
70+
workflow.WithActivityAppID("App2"), // Here we set the target app ID which will execute this activity.
7171
).Await(&output)
7272

7373
if err != nil {
@@ -115,11 +115,11 @@ The following example shows how to execute the child workflow `Workflow2` on the
115115
{{% tab "Go" %}}
116116

117117
```go
118-
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
118+
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
119119
var output string
120-
err := ctx.CallSubOrchestrator("Workflow2",
121-
task.WithSubOrchestratorInput("my-input"),
122-
task.WithSubOrchestratorAppID("App2"), // Here we set the target app ID which will execute this child workflow.
120+
err := ctx.CallChildWorkflow("Workflow2",
121+
workflow.WithChildWorkflowInput("my-input"),
122+
workflow.WithChildWorkflowAppID("App2"), // Here we set the target app ID which will execute this child workflow.
123123
).Await(&output)
124124

125125
if err != nil {

0 commit comments

Comments
 (0)