Skip to content

Commit f28f994

Browse files
committed
add tasks backend
Signed-off-by: Fabian Martinez <[email protected]>
1 parent cd7032e commit f28f994

File tree

1 file changed

+126
-0
lines changed

1 file changed

+126
-0
lines changed

backend/local/task.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package local
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
8+
"github.com/dapr/durabletask-go/api"
9+
"github.com/dapr/durabletask-go/api/protos"
10+
"github.com/dapr/durabletask-go/backend"
11+
)
12+
13+
type pendingOrchestrator struct {
14+
response *protos.OrchestratorResponse
15+
complete chan struct{}
16+
}
17+
18+
type pendingActivity struct {
19+
response *protos.ActivityResponse
20+
complete chan struct{}
21+
}
22+
23+
type TasksBackend struct {
24+
pendingOrchestrators *sync.Map
25+
pendingActivities *sync.Map
26+
}
27+
28+
func NewTasksBackend() *TasksBackend {
29+
return &TasksBackend{
30+
pendingOrchestrators: &sync.Map{},
31+
pendingActivities: &sync.Map{},
32+
}
33+
}
34+
35+
func (be *TasksBackend) CompleteActivityTask(ctx context.Context, response *protos.ActivityResponse) error {
36+
key := backend.GetActivityExecutionKey(response.GetInstanceId(), response.GetTaskId())
37+
if be.deletePendingActivityTask(key, response) {
38+
return nil
39+
}
40+
return fmt.Errorf("unknown instance ID/task ID combo: %s", key)
41+
}
42+
43+
func (be *TasksBackend) CancelActivityTask(ctx context.Context, instanceID api.InstanceID, taskID int32) error {
44+
key := backend.GetActivityExecutionKey(string(instanceID), taskID)
45+
if be.deletePendingActivityTask(key, nil) {
46+
return nil
47+
}
48+
return fmt.Errorf("unknown instance ID/task ID combo: %s", key)
49+
}
50+
51+
func (be *TasksBackend) WaitForActivityCompletion(ctx context.Context, request *protos.ActivityRequest) (*protos.ActivityResponse, error) {
52+
key := backend.GetActivityExecutionKey(string(request.GetOrchestrationInstance().GetInstanceId()), request.GetTaskId())
53+
pending := &pendingActivity{
54+
response: nil,
55+
complete: make(chan struct{}),
56+
}
57+
be.pendingActivities.Store(key, pending)
58+
59+
select {
60+
case <-ctx.Done():
61+
return nil, ctx.Err()
62+
case <-pending.complete:
63+
if pending.response == nil {
64+
return nil, api.ErrTaskCancelled
65+
}
66+
return pending.response, nil
67+
}
68+
}
69+
70+
func (be *TasksBackend) CompleteOrchestratorTask(ctx context.Context, response *protos.OrchestratorResponse) error {
71+
if be.deletePendingOrchestrator(api.InstanceID(response.GetInstanceId()), response) {
72+
return nil
73+
}
74+
return fmt.Errorf("unknown instance ID: %s", response.GetInstanceId())
75+
}
76+
77+
func (be *TasksBackend) CancelOrchestratorTask(ctx context.Context, instanceID api.InstanceID) error {
78+
if be.deletePendingOrchestrator(instanceID, nil) {
79+
return nil
80+
}
81+
return fmt.Errorf("unknown instance ID: %s", instanceID)
82+
}
83+
84+
func (be *TasksBackend) WaitForOrchestratorCompletion(ctx context.Context, request *protos.OrchestratorRequest) (*protos.OrchestratorResponse, error) {
85+
pending := &pendingOrchestrator{
86+
response: nil,
87+
complete: make(chan struct{}),
88+
}
89+
be.pendingOrchestrators.Store(request.GetInstanceId(), pending)
90+
91+
select {
92+
case <-ctx.Done():
93+
return nil, ctx.Err()
94+
case <-pending.complete:
95+
if pending.response == nil {
96+
return nil, api.ErrTaskCancelled
97+
}
98+
return pending.response, nil
99+
}
100+
}
101+
102+
func (be *TasksBackend) deletePendingActivityTask(key string, res *protos.ActivityResponse) bool {
103+
p, ok := be.pendingActivities.LoadAndDelete(key)
104+
if !ok {
105+
return false
106+
}
107+
108+
// Note that res can be nil in case of certain failures
109+
pending := p.(*pendingActivity)
110+
pending.response = res
111+
close(pending.complete)
112+
return true
113+
}
114+
115+
func (be *TasksBackend) deletePendingOrchestrator(iid api.InstanceID, res *protos.OrchestratorResponse) bool {
116+
p, ok := be.pendingOrchestrators.LoadAndDelete(iid)
117+
if !ok {
118+
return false
119+
}
120+
121+
// Note that res can be nil in case of certain failures
122+
pending := p.(*pendingOrchestrator)
123+
pending.response = res
124+
close(pending.complete)
125+
return true
126+
}

0 commit comments

Comments
 (0)