Skip to content

Commit 7fa2686

Browse files
JoshVanLcicoyledapr-bot
authored
Fix Workflow state store contention (dapr#8767)
Running Workflows in high throughput scenarios would see contention on processing or blocking Workflow state store operations. Degraded performance of Workflow operations, leading to increased latency and potential timeouts. Circular Placement locking of Actor Workflow state operations. Remove Placement locking in Workflow state operations as it is already locked at the Actor level. Signed-off-by: joshvanl <me@joshvanl.dev> Co-authored-by: Cassie Coyle <cassie@diagrid.io> Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
1 parent 81ed15a commit 7fa2686

File tree

8 files changed

+61
-48
lines changed

8 files changed

+61
-48
lines changed

pkg/actors/state/fake/fake.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,48 +20,48 @@ import (
2020
)
2121

2222
type Fake struct {
23-
getFn func(ctx context.Context, req *api.GetStateRequest) (*api.StateResponse, error)
24-
getBulkFn func(ctx context.Context, req *api.GetBulkStateRequest) (api.BulkStateResponse, error)
25-
transactionalStateOperationFn func(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest) error
23+
getFn func(ctx context.Context, req *api.GetStateRequest, lock bool) (*api.StateResponse, error)
24+
getBulkFn func(ctx context.Context, req *api.GetBulkStateRequest, lock bool) (api.BulkStateResponse, error)
25+
transactionalStateOperationFn func(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest, lock bool) error
2626
}
2727

2828
func New() *Fake {
2929
return &Fake{
30-
getFn: func(ctx context.Context, req *api.GetStateRequest) (*api.StateResponse, error) {
30+
getFn: func(ctx context.Context, req *api.GetStateRequest, lock bool) (*api.StateResponse, error) {
3131
return nil, nil
3232
},
33-
getBulkFn: func(ctx context.Context, req *api.GetBulkStateRequest) (api.BulkStateResponse, error) {
33+
getBulkFn: func(ctx context.Context, req *api.GetBulkStateRequest, lock bool) (api.BulkStateResponse, error) {
3434
return api.BulkStateResponse{}, nil
3535
},
36-
transactionalStateOperationFn: func(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest) error {
36+
transactionalStateOperationFn: func(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest, lock bool) error {
3737
return nil
3838
},
3939
}
4040
}
4141

42-
func (f *Fake) WithGetFn(fn func(ctx context.Context, req *api.GetStateRequest) (*api.StateResponse, error)) *Fake {
42+
func (f *Fake) WithGetFn(fn func(ctx context.Context, req *api.GetStateRequest, lock bool) (*api.StateResponse, error)) *Fake {
4343
f.getFn = fn
4444
return f
4545
}
4646

47-
func (f *Fake) WithGetBulkFn(fn func(ctx context.Context, req *api.GetBulkStateRequest) (api.BulkStateResponse, error)) *Fake {
47+
func (f *Fake) WithGetBulkFn(fn func(ctx context.Context, req *api.GetBulkStateRequest, lock bool) (api.BulkStateResponse, error)) *Fake {
4848
f.getBulkFn = fn
4949
return f
5050
}
5151

52-
func (f *Fake) WithTransactionalStateOperationFn(fn func(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest) error) *Fake {
52+
func (f *Fake) WithTransactionalStateOperationFn(fn func(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest, lock bool) error) *Fake {
5353
f.transactionalStateOperationFn = fn
5454
return f
5555
}
5656

57-
func (f *Fake) Get(ctx context.Context, req *api.GetStateRequest) (*api.StateResponse, error) {
58-
return f.getFn(ctx, req)
57+
func (f *Fake) Get(ctx context.Context, req *api.GetStateRequest, lock bool) (*api.StateResponse, error) {
58+
return f.getFn(ctx, req, lock)
5959
}
6060

61-
func (f *Fake) GetBulk(ctx context.Context, req *api.GetBulkStateRequest) (api.BulkStateResponse, error) {
62-
return f.getBulkFn(ctx, req)
61+
func (f *Fake) GetBulk(ctx context.Context, req *api.GetBulkStateRequest, lock bool) (api.BulkStateResponse, error) {
62+
return f.getBulkFn(ctx, req, lock)
6363
}
6464

65-
func (f *Fake) TransactionalStateOperation(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest) error {
66-
return f.transactionalStateOperationFn(ctx, ignoreHosted, req)
65+
func (f *Fake) TransactionalStateOperation(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest, lock bool) error {
66+
return f.transactionalStateOperationFn(ctx, ignoreHosted, req, lock)
6767
}

pkg/actors/state/state.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ var ErrTransactionsTooManyOperations = errors.New("the transaction contains more
4040

4141
type Interface interface {
4242
// Get retrieves actor state.
43-
Get(ctx context.Context, req *api.GetStateRequest) (*api.StateResponse, error)
43+
Get(ctx context.Context, req *api.GetStateRequest, lock bool) (*api.StateResponse, error)
4444

4545
// GetBulk retrieves actor state in bulk.
46-
GetBulk(ctx context.Context, req *api.GetBulkStateRequest) (api.BulkStateResponse, error)
46+
GetBulk(ctx context.Context, req *api.GetBulkStateRequest, lock bool) (api.BulkStateResponse, error)
4747

4848
// TransactionalStateOperation performs a transactional state operation with the actor state store.
49-
TransactionalStateOperation(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest) error
49+
TransactionalStateOperation(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest, lock bool) error
5050
}
5151

5252
type Backend interface {
@@ -90,12 +90,16 @@ func New(opts Options) Interface {
9090
}
9191
}
9292

93-
func (s *state) Get(ctx context.Context, req *api.GetStateRequest) (*api.StateResponse, error) {
94-
ctx, cancel, err := s.placement.Lock(ctx)
95-
if err != nil {
96-
return nil, err
93+
func (s *state) Get(ctx context.Context, req *api.GetStateRequest, lock bool) (*api.StateResponse, error) {
94+
if lock {
95+
var cancel context.CancelFunc
96+
var err error
97+
ctx, cancel, err = s.placement.Lock(ctx)
98+
if err != nil {
99+
return nil, err
100+
}
101+
defer cancel()
97102
}
98-
defer cancel()
99103

100104
storeName, store, err := s.stateStore()
101105
if err != nil {
@@ -133,12 +137,16 @@ func (s *state) Get(ctx context.Context, req *api.GetStateRequest) (*api.StateRe
133137
}, nil
134138
}
135139

136-
func (s *state) GetBulk(ctx context.Context, req *api.GetBulkStateRequest) (api.BulkStateResponse, error) {
137-
ctx, cancel, err := s.placement.Lock(ctx)
138-
if err != nil {
139-
return nil, err
140+
func (s *state) GetBulk(ctx context.Context, req *api.GetBulkStateRequest, lock bool) (api.BulkStateResponse, error) {
141+
if lock {
142+
var cancel context.CancelFunc
143+
var err error
144+
ctx, cancel, err = s.placement.Lock(ctx)
145+
if err != nil {
146+
return nil, err
147+
}
148+
defer cancel()
140149
}
141-
defer cancel()
142150

143151
storeName, store, err := s.stateStore()
144152
if err != nil {
@@ -183,12 +191,16 @@ func (s *state) GetBulk(ctx context.Context, req *api.GetBulkStateRequest) (api.
183191
return bulkRes, nil
184192
}
185193

186-
func (s *state) TransactionalStateOperation(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest) error {
187-
ctx, cancel, err := s.placement.Lock(ctx)
188-
if err != nil {
189-
return err
194+
func (s *state) TransactionalStateOperation(ctx context.Context, ignoreHosted bool, req *api.TransactionalRequest, lock bool) error {
195+
if lock {
196+
var cancel context.CancelFunc
197+
var err error
198+
ctx, cancel, err = s.placement.Lock(ctx)
199+
if err != nil {
200+
return err
201+
}
202+
defer cancel()
190203
}
191-
defer cancel()
192204

193205
if !ignoreHosted {
194206
if _, ok := s.table.HostedTarget(req.ActorType, req.ActorID); !ok {
@@ -200,6 +212,7 @@ func (s *state) TransactionalStateOperation(ctx context.Context, ignoreHosted bo
200212
baseKey := key.ConstructComposite(s.appID, req.ActorKey())
201213
metadata := map[string]string{metadataPartitionKey: baseKey}
202214
baseKey += api.DaprSeparator
215+
var err error
203216
for i, o := range req.Operations {
204217
operations[i], err = o.StateOperation(baseKey, api.StateOperationOpts{
205218
Metadata: metadata,

pkg/actors/targets/workflow/state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func (w *workflow) saveInternalState(ctx context.Context, state *wfenginestate.S
6565

6666
log.Debugf("Workflow actor '%s': saving %d keys to actor state store", w.actorID, len(req.Operations))
6767

68-
if err = w.actorState.TransactionalStateOperation(ctx, true, req); err != nil {
68+
if err = w.actorState.TransactionalStateOperation(ctx, true, req, false); err != nil {
6969
return err
7070
}
7171

@@ -97,7 +97,7 @@ func (w *workflow) cleanupWorkflowStateInternal(ctx context.Context, state *wfen
9797
}
9898

9999
// This will do the purging
100-
err = w.actorState.TransactionalStateOperation(ctx, true, req)
100+
err = w.actorState.TransactionalStateOperation(ctx, true, req, false)
101101
if err != nil {
102102
return err
103103
}

pkg/api/grpc/actor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestGetActorState(t *testing.T) {
3535

3636
actors := actorsfake.New()
3737
actors.WithState(func(context.Context) (state.Interface, error) {
38-
return statefake.New().WithGetFn(func(ctx context.Context, req *actorsapi.GetStateRequest) (*actorsapi.StateResponse, error) {
38+
return statefake.New().WithGetFn(func(ctx context.Context, req *actorsapi.GetStateRequest, _ bool) (*actorsapi.StateResponse, error) {
3939
return &actorsapi.StateResponse{
4040
Data: data,
4141
Metadata: map[string]string{
@@ -80,7 +80,7 @@ func TestExecuteActorStateTransaction(t *testing.T) {
8080

8181
actors := actorsfake.New()
8282
actors.WithState(func(context.Context) (state.Interface, error) {
83-
return statefake.New().WithTransactionalStateOperationFn(func(ctx context.Context, _ bool, req *actorsapi.TransactionalRequest) error {
83+
return statefake.New().WithTransactionalStateOperationFn(func(ctx context.Context, _ bool, req *actorsapi.TransactionalRequest, _ bool) error {
8484
return nil
8585
}), nil
8686
})

pkg/api/grpc/grpc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,7 +1071,7 @@ func (a *api) GetActorState(ctx context.Context, in *runtimev1pb.GetActorStateRe
10711071
Key: key,
10721072
}
10731073

1074-
resp, err := astate.Get(ctx, &req)
1074+
resp, err := astate.Get(ctx, &req, true)
10751075
if err != nil {
10761076
if _, ok := status.FromError(err); ok {
10771077
apiServerLogger.Debug(err)
@@ -1143,7 +1143,7 @@ func (a *api) ExecuteActorStateTransaction(ctx context.Context, in *runtimev1pb.
11431143
Operations: actorOps,
11441144
}
11451145

1146-
err = astate.TransactionalStateOperation(ctx, false, &req)
1146+
err = astate.TransactionalStateOperation(ctx, false, &req, true)
11471147
if err != nil {
11481148
if _, ok := status.FromError(err); ok {
11491149
apiServerLogger.Debug(err)

pkg/api/http/actors.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ func (a *api) onActorStateTransaction(w http.ResponseWriter, r *http.Request) {
284284
return
285285
}
286286

287-
err = state.TransactionalStateOperation(ctx, false, req)
287+
err = state.TransactionalStateOperation(ctx, false, req, true)
288288
if err != nil {
289289
if errors.As(err, new(messages.APIError)) {
290290
respondWithError(w, err)
@@ -449,7 +449,7 @@ func (a *api) onGetActorState(w http.ResponseWriter, r *http.Request) {
449449
ActorType: actorType,
450450
ActorID: actorID,
451451
Key: key,
452-
})
452+
}, true)
453453
if err != nil {
454454
if errors.As(err, new(messages.APIError)) {
455455
respondWithError(w, err)

pkg/api/http/http_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,7 @@ func TestV1ActorEndpoints(t *testing.T) {
10811081

10821082
t.Run("Get actor state - 200 OK", func(t *testing.T) {
10831083
actors.WithState(func(context.Context) (actorsstate.Interface, error) {
1084-
return statefake.New().WithGetFn(func(context.Context, *actorsapi.GetStateRequest) (*actorsapi.StateResponse, error) {
1084+
return statefake.New().WithGetFn(func(context.Context, *actorsapi.GetStateRequest, bool) (*actorsapi.StateResponse, error) {
10851085
return &actorsapi.StateResponse{
10861086
Data: fakeData,
10871087
Metadata: map[string]string{
@@ -1104,7 +1104,7 @@ func TestV1ActorEndpoints(t *testing.T) {
11041104

11051105
t.Run("Get actor state - 204 No Content", func(t *testing.T) {
11061106
actors.WithState(func(context.Context) (actorsstate.Interface, error) {
1107-
return statefake.New().WithGetFn(func(context.Context, *actorsapi.GetStateRequest) (*actorsapi.StateResponse, error) {
1107+
return statefake.New().WithGetFn(func(context.Context, *actorsapi.GetStateRequest, bool) (*actorsapi.StateResponse, error) {
11081108
return nil, nil
11091109
}), nil
11101110
})
@@ -1122,7 +1122,7 @@ func TestV1ActorEndpoints(t *testing.T) {
11221122

11231123
t.Run("Get actor state - 500 on GetState failure", func(t *testing.T) {
11241124
actors.WithState(func(context.Context) (actorsstate.Interface, error) {
1125-
return statefake.New().WithGetFn(func(context.Context, *actorsapi.GetStateRequest) (*actorsapi.StateResponse, error) {
1125+
return statefake.New().WithGetFn(func(context.Context, *actorsapi.GetStateRequest, bool) (*actorsapi.StateResponse, error) {
11261126
return nil, errors.New("UPSTREAM_ERROR")
11271127
}), nil
11281128
})
@@ -1243,7 +1243,7 @@ func TestV1ActorEndpoints(t *testing.T) {
12431243
}
12441244

12451245
actors.WithState(func(context.Context) (actorsstate.Interface, error) {
1246-
return statefake.New().WithTransactionalStateOperationFn(func(context.Context, bool, *actorsapi.TransactionalRequest) error {
1246+
return statefake.New().WithTransactionalStateOperationFn(func(context.Context, bool, *actorsapi.TransactionalRequest, bool) error {
12471247
return errors.New("UPSTREAM_ERROR")
12481248
}), nil
12491249
})
@@ -1735,7 +1735,7 @@ func TestV1ActorEndpointsWithTracer(t *testing.T) {
17351735
buffer = ""
17361736
apiPath := "v1.0/actors/fakeActorType/fakeActorID/state/key1"
17371737
actors.WithState(func(context.Context) (actorsstate.Interface, error) {
1738-
return astate.WithGetFn(func(ctx context.Context, req *actorsapi.GetStateRequest) (*actorsapi.StateResponse, error) {
1738+
return astate.WithGetFn(func(ctx context.Context, req *actorsapi.GetStateRequest, _ bool) (*actorsapi.StateResponse, error) {
17391739
called.Add(1)
17401740
return &actorsapi.StateResponse{
17411741
Data: fakeData,

pkg/runtime/wfengine/state/state.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func LoadWorkflowState(ctx context.Context, state state.Interface, actorID strin
264264
ActorID: actorID,
265265
Key: metadataKey,
266266
}
267-
res, err := state.Get(ctx, &req)
267+
res, err := state.Get(ctx, &req, false)
268268
if err != nil {
269269
return nil, fmt.Errorf("failed to load workflow metadata: %w", err)
270270
}
@@ -312,7 +312,7 @@ func LoadWorkflowState(ctx context.Context, state state.Interface, actorID strin
312312
}
313313

314314
// Perform the request
315-
bulkRes, err := state.GetBulk(ctx, bulkReq)
315+
bulkRes, err := state.GetBulk(ctx, bulkReq, false)
316316
if err != nil {
317317
return nil, fmt.Errorf("failed to load workflow state: %w", err)
318318
}

0 commit comments

Comments
 (0)