Skip to content

Commit 6b610fc

Browse files
committed
fix local task impl
Signed-off-by: Fabian Martinez <[email protected]>
1 parent f28f994 commit 6b610fc

File tree

1 file changed

+14
-14
lines changed

1 file changed

+14
-14
lines changed

backend/local/task.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package local
33
import (
44
"context"
55
"fmt"
6+
"strconv"
67
"sync"
78

89
"github.com/dapr/durabletask-go/api"
@@ -33,26 +34,24 @@ func NewTasksBackend() *TasksBackend {
3334
}
3435

3536
func (be *TasksBackend) CompleteActivityTask(ctx context.Context, response *protos.ActivityResponse) error {
36-
key := backend.GetActivityExecutionKey(response.GetInstanceId(), response.GetTaskId())
37-
if be.deletePendingActivityTask(key, response) {
37+
if be.deletePendingActivityTask(response.GetInstanceId(), response.GetTaskId(), response) {
3838
return nil
3939
}
40-
return fmt.Errorf("unknown instance ID/task ID combo: %s", key)
40+
return fmt.Errorf("unknown instance ID/task ID combo: %s", response.GetInstanceId()+"/"+strconv.FormatInt(int64(response.GetTaskId()), 10))
4141
}
4242

4343
func (be *TasksBackend) CancelActivityTask(ctx context.Context, instanceID api.InstanceID, taskID int32) error {
44-
key := backend.GetActivityExecutionKey(string(instanceID), taskID)
45-
if be.deletePendingActivityTask(key, nil) {
44+
if be.deletePendingActivityTask(string(instanceID), taskID, nil) {
4645
return nil
4746
}
48-
return fmt.Errorf("unknown instance ID/task ID combo: %s", key)
47+
return fmt.Errorf("unknown instance ID/task ID combo: %s", string(instanceID)+"/"+strconv.FormatInt(int64(taskID), 10))
4948
}
5049

5150
func (be *TasksBackend) WaitForActivityCompletion(ctx context.Context, request *protos.ActivityRequest) (*protos.ActivityResponse, error) {
52-
key := backend.GetActivityExecutionKey(string(request.GetOrchestrationInstance().GetInstanceId()), request.GetTaskId())
51+
key := backend.GetActivityExecutionKey(request.GetOrchestrationInstance().GetInstanceId(), request.GetTaskId())
5352
pending := &pendingActivity{
5453
response: nil,
55-
complete: make(chan struct{}),
54+
complete: make(chan struct{}, 1),
5655
}
5756
be.pendingActivities.Store(key, pending)
5857

@@ -68,14 +67,14 @@ func (be *TasksBackend) WaitForActivityCompletion(ctx context.Context, request *
6867
}
6968

7069
func (be *TasksBackend) CompleteOrchestratorTask(ctx context.Context, response *protos.OrchestratorResponse) error {
71-
if be.deletePendingOrchestrator(api.InstanceID(response.GetInstanceId()), response) {
70+
if be.deletePendingOrchestrator(response.GetInstanceId(), response) {
7271
return nil
7372
}
7473
return fmt.Errorf("unknown instance ID: %s", response.GetInstanceId())
7574
}
7675

7776
func (be *TasksBackend) CancelOrchestratorTask(ctx context.Context, instanceID api.InstanceID) error {
78-
if be.deletePendingOrchestrator(instanceID, nil) {
77+
if be.deletePendingOrchestrator(string(instanceID), nil) {
7978
return nil
8079
}
8180
return fmt.Errorf("unknown instance ID: %s", instanceID)
@@ -84,7 +83,7 @@ func (be *TasksBackend) CancelOrchestratorTask(ctx context.Context, instanceID a
8483
func (be *TasksBackend) WaitForOrchestratorCompletion(ctx context.Context, request *protos.OrchestratorRequest) (*protos.OrchestratorResponse, error) {
8584
pending := &pendingOrchestrator{
8685
response: nil,
87-
complete: make(chan struct{}),
86+
complete: make(chan struct{}, 1),
8887
}
8988
be.pendingOrchestrators.Store(request.GetInstanceId(), pending)
9089

@@ -99,7 +98,8 @@ func (be *TasksBackend) WaitForOrchestratorCompletion(ctx context.Context, reque
9998
}
10099
}
101100

102-
func (be *TasksBackend) deletePendingActivityTask(key string, res *protos.ActivityResponse) bool {
101+
func (be *TasksBackend) deletePendingActivityTask(iid string, taskID int32, res *protos.ActivityResponse) bool {
102+
key := backend.GetActivityExecutionKey(iid, taskID)
103103
p, ok := be.pendingActivities.LoadAndDelete(key)
104104
if !ok {
105105
return false
@@ -112,8 +112,8 @@ func (be *TasksBackend) deletePendingActivityTask(key string, res *protos.Activi
112112
return true
113113
}
114114

115-
func (be *TasksBackend) deletePendingOrchestrator(iid api.InstanceID, res *protos.OrchestratorResponse) bool {
116-
p, ok := be.pendingOrchestrators.LoadAndDelete(iid)
115+
func (be *TasksBackend) deletePendingOrchestrator(instanceID string, res *protos.OrchestratorResponse) bool {
116+
p, ok := be.pendingOrchestrators.LoadAndDelete(instanceID)
117117
if !ok {
118118
return false
119119
}

0 commit comments

Comments
 (0)