Skip to content

Commit d2d51c0

Browse files
committed
chore: New sample for task execution id
Signed-off-by: Javier Aliaga <[email protected]>
1 parent 98168bd commit d2d51c0

File tree

2 files changed

+173
-78
lines changed

2 files changed

+173
-78
lines changed

samples/retries/retries.go

Lines changed: 8 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,8 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
"fmt"
87
"log"
9-
"sync"
8+
"math/rand"
109
"time"
1110

1211
"github.com/dapr/durabletask-go/backend"
@@ -17,18 +16,16 @@ import (
1716
func main() {
1817
// Create a new task registry and add the orchestrator and activities
1918
r := task.NewTaskRegistry()
20-
must(r.AddOrchestrator(RetryActivityOrchestrator))
21-
must(r.AddActivity(RandomFailActivity))
19+
r.AddOrchestrator(RetryActivityOrchestrator)
20+
r.AddActivity(RandomFailActivity)
2221

2322
// Init the client
2423
ctx := context.Background()
2524
client, worker, err := Init(ctx, r)
2625
if err != nil {
2726
log.Fatalf("Failed to initialize the client: %v", err)
2827
}
29-
defer func() {
30-
must(worker.Shutdown(ctx))
31-
}()
28+
defer worker.Shutdown(ctx)
3229

3330
// Start a new orchestration
3431
id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator)
@@ -77,89 +74,22 @@ func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, bac
7774
}
7875

7976
func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
80-
t := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
77+
if err := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
8178
MaxAttempts: 10,
8279
InitialRetryInterval: 100 * time.Millisecond,
8380
BackoffCoefficient: 2,
8481
MaxRetryInterval: 3 * time.Second,
85-
}))
86-
87-
t1 := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
88-
MaxAttempts: 10,
89-
InitialRetryInterval: 100 * time.Millisecond,
90-
BackoffCoefficient: 2,
91-
MaxRetryInterval: 3 * time.Second,
92-
}))
93-
94-
if err := t.Await(nil); err != nil {
82+
})).Await(nil); err != nil {
9583
return nil, err
9684
}
97-
98-
if err := t1.Await(nil); err != nil {
99-
return nil, err
100-
}
101-
10285
return nil, nil
10386
}
10487

105-
type Counter struct {
106-
c int32
107-
lock sync.Mutex
108-
}
109-
110-
func (c *Counter) Increment() {
111-
c.lock.Lock()
112-
defer c.lock.Unlock()
113-
c.c++
114-
}
115-
116-
func (c *Counter) GetValue() int32 {
117-
c.lock.Lock()
118-
defer c.lock.Unlock()
119-
return c.c
120-
}
121-
122-
var (
123-
counters = make(map[string]*Counter)
124-
countersLock sync.RWMutex
125-
)
126-
127-
// getCounter returns a Counter instance for the specified taskExecutionId.
128-
// If no counter exists for the taskExecutionId, a new one is created.
129-
func getCounter(taskExecutionId string) *Counter {
130-
countersLock.RLock()
131-
counter, exists := counters[taskExecutionId]
132-
countersLock.RUnlock()
133-
134-
if !exists {
135-
countersLock.Lock()
136-
// Check again to handle race conditions
137-
counter, exists = counters[taskExecutionId]
138-
if !exists {
139-
counter = &Counter{}
140-
counters[taskExecutionId] = counter
141-
}
142-
countersLock.Unlock()
143-
}
144-
145-
return counter
146-
}
147-
14888
func RandomFailActivity(ctx task.ActivityContext) (any, error) {
149-
log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
150-
151-
// The activity should fail 5 times before succeeding.
152-
if getCounter(ctx.GetTaskExecutionId()).GetValue() != 5 {
89+
// 70% possibility for activity failure
90+
if rand.Intn(100) <= 70 {
15391
log.Println("random activity failure")
154-
getCounter(ctx.GetTaskExecutionId()).Increment()
15592
return "", errors.New("random activity failure")
15693
}
157-
15894
return "ok", nil
15995
}
160-
161-
func must(err error) {
162-
if err != nil {
163-
panic(err)
164-
}
165-
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"log"
9+
"sync"
10+
"time"
11+
12+
"github.com/dapr/durabletask-go/backend"
13+
"github.com/dapr/durabletask-go/backend/sqlite"
14+
"github.com/dapr/durabletask-go/task"
15+
)
16+
17+
func main() {
18+
// Create a new task registry and add the orchestrator and activities
19+
r := task.NewTaskRegistry()
20+
must(r.AddOrchestrator(RetryActivityOrchestrator))
21+
must(r.AddActivity(RandomFailActivity))
22+
23+
// Init the client
24+
ctx := context.Background()
25+
client, worker, err := Init(ctx, r)
26+
if err != nil {
27+
log.Fatalf("Failed to initialize the client: %v", err)
28+
}
29+
defer func() {
30+
must(worker.Shutdown(ctx))
31+
}()
32+
33+
// Start a new orchestration
34+
id, err := client.ScheduleNewOrchestration(ctx, RetryActivityOrchestrator)
35+
if err != nil {
36+
log.Fatalf("Failed to schedule new orchestration: %v", err)
37+
}
38+
39+
// Wait for the orchestration to complete
40+
metadata, err := client.WaitForOrchestrationCompletion(ctx, id)
41+
if err != nil {
42+
log.Fatalf("Failed to wait for orchestration to complete: %v", err)
43+
}
44+
45+
// Print the results
46+
metadataEnc, err := json.MarshalIndent(metadata, "", " ")
47+
if err != nil {
48+
log.Fatalf("Failed to encode result to JSON: %v", err)
49+
}
50+
log.Printf("Orchestration completed: %v", string(metadataEnc))
51+
}
52+
53+
// Init creates and initializes an in-memory client and worker pair with default configuration.
54+
func Init(ctx context.Context, r *task.TaskRegistry) (backend.TaskHubClient, backend.TaskHubWorker, error) {
55+
logger := backend.DefaultLogger()
56+
57+
// Create an executor
58+
executor := task.NewTaskExecutor(r)
59+
60+
// Create a new backend
61+
// Use the in-memory sqlite provider by specifying ""
62+
be := sqlite.NewSqliteBackend(sqlite.NewSqliteOptions(""), logger)
63+
orchestrationWorker := backend.NewOrchestrationWorker(be, executor, logger)
64+
activityWorker := backend.NewActivityTaskWorker(be, executor, logger)
65+
taskHubWorker := backend.NewTaskHubWorker(be, orchestrationWorker, activityWorker, logger)
66+
67+
// Start the worker
68+
err := taskHubWorker.Start(ctx)
69+
if err != nil {
70+
return nil, nil, err
71+
}
72+
73+
// Get the client to the backend
74+
taskHubClient := backend.NewTaskHubClient(be)
75+
76+
return taskHubClient, taskHubWorker, nil
77+
}
78+
79+
func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
80+
t := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
81+
MaxAttempts: 10,
82+
InitialRetryInterval: 100 * time.Millisecond,
83+
BackoffCoefficient: 2,
84+
MaxRetryInterval: 3 * time.Second,
85+
}))
86+
87+
t1 := ctx.CallActivity(RandomFailActivity, task.WithActivityRetryPolicy(&task.RetryPolicy{
88+
MaxAttempts: 10,
89+
InitialRetryInterval: 100 * time.Millisecond,
90+
BackoffCoefficient: 2,
91+
MaxRetryInterval: 3 * time.Second,
92+
}))
93+
94+
if err := t.Await(nil); err != nil {
95+
return nil, err
96+
}
97+
98+
if err := t1.Await(nil); err != nil {
99+
return nil, err
100+
}
101+
102+
return nil, nil
103+
}
104+
105+
type Counter struct {
106+
c int32
107+
lock sync.Mutex
108+
}
109+
110+
func (c *Counter) Increment() {
111+
c.lock.Lock()
112+
defer c.lock.Unlock()
113+
c.c++
114+
}
115+
116+
func (c *Counter) GetValue() int32 {
117+
c.lock.Lock()
118+
defer c.lock.Unlock()
119+
return c.c
120+
}
121+
122+
var (
123+
counters = make(map[string]*Counter)
124+
countersLock sync.RWMutex
125+
)
126+
127+
// getCounter returns a Counter instance for the specified taskExecutionId.
128+
// If no counter exists for the taskExecutionId, a new one is created.
129+
func getCounter(taskExecutionId string) *Counter {
130+
countersLock.RLock()
131+
counter, exists := counters[taskExecutionId]
132+
countersLock.RUnlock()
133+
134+
if !exists {
135+
countersLock.Lock()
136+
// Check again to handle race conditions
137+
counter, exists = counters[taskExecutionId]
138+
if !exists {
139+
counter = &Counter{}
140+
counters[taskExecutionId] = counter
141+
}
142+
countersLock.Unlock()
143+
}
144+
145+
return counter
146+
}
147+
148+
func RandomFailActivity(ctx task.ActivityContext) (any, error) {
149+
log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
150+
151+
// The activity should fail 5 times before succeeding.
152+
if getCounter(ctx.GetTaskExecutionId()).GetValue() != 5 {
153+
log.Println("random activity failure")
154+
getCounter(ctx.GetTaskExecutionId()).Increment()
155+
return "", errors.New("random activity failure")
156+
}
157+
158+
return "ok", nil
159+
}
160+
161+
func must(err error) {
162+
if err != nil {
163+
panic(err)
164+
}
165+
}

0 commit comments

Comments
 (0)