|
7 | 7 | "fmt"
|
8 | 8 | "log"
|
9 | 9 | "sync"
|
| 10 | + "sync/atomic" |
10 | 11 | "time"
|
11 | 12 |
|
12 | 13 | "github.com/dapr/durabletask-go/backend"
|
@@ -102,56 +103,24 @@ func RetryActivityOrchestrator(ctx *task.OrchestrationContext) (any, error) {
|
102 | 103 | return nil, nil
|
103 | 104 | }
|
104 | 105 |
|
105 |
| -type Counter struct { |
106 |
| - c int32 |
107 |
| - lock sync.Mutex |
108 |
| -} |
109 |
| - |
110 |
| -func (c *Counter) Increment() { |
111 |
| - c.lock.Lock() |
112 |
| - defer c.lock.Unlock() |
113 |
| - c.c++ |
114 |
| -} |
115 |
| - |
116 |
| -func (c *Counter) GetValue() int32 { |
117 |
| - c.lock.Lock() |
118 |
| - defer c.lock.Unlock() |
119 |
| - return c.c |
120 |
| -} |
121 |
| - |
122 | 106 | var (
|
123 |
| - counters = make(map[string]*Counter) |
124 |
| - countersLock sync.RWMutex |
| 107 | + counters = sync.Map{} |
125 | 108 | )
|
126 | 109 |
|
127 | 110 | // getCounter returns a Counter instance for the specified taskExecutionId.
|
128 | 111 | // If no counter exists for the taskExecutionId, a new one is created.
|
129 |
| -func getCounter(taskExecutionId string) *Counter { |
130 |
| - countersLock.RLock() |
131 |
| - counter, exists := counters[taskExecutionId] |
132 |
| - countersLock.RUnlock() |
133 |
| - |
134 |
| - if !exists { |
135 |
| - countersLock.Lock() |
136 |
| - // Check again to handle race conditions |
137 |
| - counter, exists = counters[taskExecutionId] |
138 |
| - if !exists { |
139 |
| - counter = &Counter{} |
140 |
| - counters[taskExecutionId] = counter |
141 |
| - } |
142 |
| - countersLock.Unlock() |
143 |
| - } |
144 |
| - |
145 |
| - return counter |
| 112 | +func getCounter(taskExecutionId string) *atomic.Int32 { |
| 113 | + counter, _ := counters.LoadOrStore(taskExecutionId, &atomic.Int32{}) |
| 114 | + return counter.(*atomic.Int32) |
146 | 115 | }
|
147 | 116 |
|
148 | 117 | func RandomFailActivity(ctx task.ActivityContext) (any, error) {
|
149 | 118 | log.Println(fmt.Sprintf("#### [%v] activity %v failure", ctx.GetTaskExecutionId(), ctx.GetTaskID()))
|
150 |
| - |
| 119 | + counter := getCounter(ctx.GetTaskExecutionId()) |
151 | 120 | // The activity should fail 5 times before succeeding.
|
152 |
| - if getCounter(ctx.GetTaskExecutionId()).GetValue() != 5 { |
| 121 | + if counter.Load() != 5 { |
153 | 122 | log.Println("random activity failure")
|
154 |
| - getCounter(ctx.GetTaskExecutionId()).Increment() |
| 123 | + counter.Add(1) |
155 | 124 | return "", errors.New("random activity failure")
|
156 | 125 | }
|
157 | 126 |
|
|
0 commit comments