Skip to content

Commit ebf99fe

Browse files
committed
Add async activity example
Include a very basic async activity example. This is the same example from the samples-java repository, ported to Go.
1 parent ea23602 commit ebf99fe

File tree

6 files changed

+178
-1
lines changed

6 files changed

+178
-1
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ Each sample demonstrates one feature of the SDK, together with tests.
3030
- [**Basic mTLS hello world**](./helloworldmtls): Simple example of a
3131
Workflow Definition and an Activity Definition using mTLS like Temporal Cloud.
3232

33+
- [**Basic async activity execution**](./async-activity/): Simple example of a
34+
Workflow Definition starting activities asynchronously.
35+
3336
### API demonstrations
3437

3538
- **Async activity completion**: Example of
@@ -230,7 +233,6 @@ resource waiting its successful completion
230233

231234
Mostly examples we haven't yet ported from https://github.com/temporalio/samples-java/
232235

233-
- Async activity calling: *Example to be completed*
234236
- Async lambda: *Example to be completed*
235237
- Periodic Workflow: Workflow that executes some logic periodically. *Example to be completed*
236238
- Exception propagation and wrapping: *Example to be completed*

async-activity/README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
### Steps to run this sample:
2+
1) Run a [Temporal service](https://github.com/temporalio/samples-go/tree/main/#how-to-use).
3+
2) Run the following command to start the worker (assuming you are in the async-activity directory)
4+
```
5+
go run ./worker/main.go
6+
```
7+
3) Run the following command to start the example
8+
```
9+
go run ./starter/main.go
10+
```

async-activity/starter/main.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"log"
6+
7+
"go.temporal.io/sdk/client"
8+
9+
"github.com/google/uuid"
10+
asyncactivity "github.com/temporalio/samples-go/async-activity"
11+
)
12+
13+
func main() {
14+
// The client is a heavyweight object that should be created once per process.
15+
c, err := client.Dial(client.Options{})
16+
if err != nil {
17+
log.Fatalln("Unable to create client", err)
18+
}
19+
defer c.Close()
20+
21+
workflowOptions := client.StartWorkflowOptions{
22+
ID: "async-activity-" + uuid.NewString(),
23+
TaskQueue: "async-activity",
24+
}
25+
26+
we, err := c.ExecuteWorkflow(context.Background(), workflowOptions, asyncactivity.AsyncActivityWorkflow, "Temporal")
27+
if err != nil {
28+
log.Fatalln("Unable to execute workflow", err)
29+
}
30+
31+
log.Println("Started workflow", "WorkflowID", we.GetID(), "RunID", we.GetRunID())
32+
33+
// Synchronously wait for the workflow completion.
34+
var result string
35+
err = we.Get(context.Background(), &result)
36+
if err != nil {
37+
log.Fatalln("Unable get workflow result", err)
38+
}
39+
log.Println("Workflow result:", result)
40+
}

async-activity/worker/main.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"log"
5+
6+
"go.temporal.io/sdk/client"
7+
"go.temporal.io/sdk/worker"
8+
9+
asyncactivity "github.com/temporalio/samples-go/async-activity"
10+
)
11+
12+
func main() {
13+
// The client and worker are heavyweight objects that should be created once per process.
14+
c, err := client.Dial(client.Options{})
15+
if err != nil {
16+
log.Fatalln("Unable to create client", err)
17+
}
18+
defer c.Close()
19+
20+
// Useful events to look for: timestamp of ActivityTaskScheduled,
21+
// ActivityTaskStarted and ActivityTaskCompleted (not that they
22+
// may not be in the correct timestamp order in the event history).
23+
w := worker.New(c, "async-activity", worker.Options{
24+
// Set this to 1 to make the activities run one after the other (note
25+
// how both are scheduled at the same time, but ActivityTaskStarted
26+
// differs).
27+
MaxConcurrentActivityExecutionSize: 2,
28+
// Set this to 0.5 to create some delay between when activities are
29+
// started. Note that in this case, the started time does not differ.
30+
// Only the completed time is different.
31+
WorkerActivitiesPerSecond: 2,
32+
})
33+
34+
w.RegisterWorkflow(asyncactivity.AsyncActivityWorkflow)
35+
w.RegisterActivity(asyncactivity.HelloActivity)
36+
w.RegisterActivity(asyncactivity.ByeActivity)
37+
38+
err = w.Run(worker.InterruptCh())
39+
if err != nil {
40+
log.Fatalln("Unable to start worker", err)
41+
}
42+
}

async-activity/workflow.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package asyncactivity
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"go.temporal.io/sdk/activity"
9+
"go.temporal.io/sdk/workflow"
10+
)
11+
12+
// AsyncActivityWorkflow is a workflow definition starting two activities
13+
// asynchronously.
14+
func AsyncActivityWorkflow(ctx workflow.Context, name string) (string, error) {
15+
ao := workflow.ActivityOptions{
16+
StartToCloseTimeout: 10 * time.Second,
17+
}
18+
ctx = workflow.WithActivityOptions(ctx, ao)
19+
20+
// Start activities asynchronously.
21+
var helloResult, byeResult string
22+
helloFuture := workflow.ExecuteActivity(ctx, HelloActivity, name)
23+
byeFuture := workflow.ExecuteActivity(ctx, ByeActivity, name)
24+
25+
// This can be done alternatively by creating a workflow selector. See
26+
// "pickfirst" example.
27+
err := helloFuture.Get(ctx, &helloResult)
28+
if err != nil {
29+
return "", fmt.Errorf("hello activity error: %s", err.Error())
30+
}
31+
err = byeFuture.Get(ctx, &byeResult)
32+
if err != nil {
33+
return "", fmt.Errorf("bye activity error: %s", err.Error())
34+
}
35+
36+
return helloResult + "\n" + byeResult, nil
37+
}
38+
39+
// Each of these activities will sleep for 5 seconds, but see in the temporal
40+
// dashboard that they were created immediately one after the other.
41+
42+
func HelloActivity(ctx context.Context, name string) (string, error) {
43+
logger := activity.GetLogger(ctx)
44+
logger.Info("Hello activity", "name", name)
45+
time.Sleep(5 * time.Second)
46+
return "Hello " + name + "!", nil
47+
}
48+
49+
func ByeActivity(ctx context.Context, name string) (string, error) {
50+
logger := activity.GetLogger(ctx)
51+
logger.Info("Bye activity", "name", name)
52+
time.Sleep(5 * time.Second)
53+
return "Bye " + name + "!", nil
54+
}

async-activity/workflow_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package asyncactivity
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/mock"
7+
"github.com/stretchr/testify/require"
8+
"go.temporal.io/sdk/testsuite"
9+
)
10+
11+
func Test_Workflow(t *testing.T) {
12+
testSuite := &testsuite.WorkflowTestSuite{}
13+
env := testSuite.NewTestWorkflowEnvironment()
14+
env.RegisterActivity(HelloActivity)
15+
env.RegisterActivity(ByeActivity)
16+
17+
// Mock the activities to skip the timers (and avoid test timeout).
18+
env.OnActivity(HelloActivity, mock.Anything, "Temporal").Return("Hello Temporal!", nil)
19+
env.OnActivity(ByeActivity, mock.Anything, "Temporal").Return("Bye Temporal!", nil)
20+
21+
env.ExecuteWorkflow(AsyncActivityWorkflow, "Temporal")
22+
23+
require.True(t, env.IsWorkflowCompleted())
24+
require.NoError(t, env.GetWorkflowError())
25+
26+
var result string
27+
require.NoError(t, env.GetWorkflowResult(&result))
28+
require.Equal(t, "Hello Temporal!\nBye Temporal!", result)
29+
}

0 commit comments

Comments
 (0)