Skip to content

Commit 6316aa6

Browse files
committed
fixes lack of idempotency in WriteToSpiceDB
I started all of this because of flakes that were trivially to reproduce, and the underlying issue was WriteToSpiceDB was not idempotent. It was used by both the optimistic and pesimistic locking workflows, and the lack of idempotency meant the workflows couldn't recover by themselves from transient errors. When the tests failed for the wrong reasons, they flaked. We introduced `failpoints` as a mechanism to simulate panics similar to hardware or network failures. However, some recent changes meant the failpoints were no longer unique and were causing failures _in the wrong places_. - when the pessimistic wrote an exclusive lock, retrying it due to failure wouldn't work, because the precondition prevented it (_make sure no one has a lock on this resource_!) - when the optimistic wrote with CREATE semantics, a subsequent retry after a successful write due to failure response failure would fail because the tuples already exist. This commit proposes making WriteToSpiceDB truly idempotent by introducing idempotency keys in the SpiceDB schema. All writes will include a relationships that identify the workflow and the hash of the payload as the idempotency key. The flow is as follows: - perform write - if failure happens, check if idempotency key was written in previous request - if exists, assume operation was successful - if it does not, bubble up the error The cost of the extra ReadRelationships is only paid in the even of a retry due to a failure. The tests were written with the assumption that the system would bubble up errors after recovery. This goes against the expectations of a durable workflow engine, which embraces idempotency and is expected to retry on errors, rather than have the client retry, unless those are unrecoverable. This was all by design: the workflow wouldn't be responsible to retry things, but rather execute compensatory operations after an operation failed. This meant the client had to retry those errors, and it turns out troubleshooting what happened on a transient error is not that trivial for folks building on top of the spicedb-kubeapi-proxy.
1 parent 6b04cf9 commit 6316aa6

File tree

4 files changed

+129
-46
lines changed

4 files changed

+129
-46
lines changed

e2e/proxy_test.go

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -793,56 +793,45 @@ var _ = Describe("Proxy", func() {
793793
Expect(owners[0].Relationship.Subject.Object.ObjectId).To(Equal("chani"))
794794
})
795795

796-
It("recovers writes when spicedb write succeeds but crashes", func(ctx context.Context) {
796+
It("recovers a `create` operation after a SpiceDB write followed by crash, retries and succeeds", func(ctx context.Context) {
797797
// paul creates his namespace
798798
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())
799799

800-
// make spicedb write crash on chani's namespace write
801-
failpoints.EnableFailPoint("panicSpiceDBReadResp", 1)
800+
// make spicedb write crash on chani's namespace write, it should be retried idempotently and succeed
801+
failpoints.EnableFailPoint("panicSpiceDBWriteResp", 1)
802802
err := CreateNamespace(ctx, chaniClient, chaniNamespace)
803-
Expect(err).ToNot(BeNil())
804-
// pessimistic locking reports a conflict, optimistic locking reports already exists
805-
Expect(k8serrors.IsConflict(err) || k8serrors.IsAlreadyExists(err)).To(BeTrue())
803+
Expect(err).To(Succeed())
806804

807-
// paul creates chani's namespace so that the namespace exists
808-
Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())
805+
// check that chani can get her namespace
806+
Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed())
809807

810-
// check that chani can't get her namespace, indirectly showing
811-
// that the spicedb write was rolled back
812-
Expect(k8serrors.IsUnauthorized(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue())
808+
// check that paul can't read chani's namespace
809+
Expect(k8serrors.IsUnauthorized(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue())
813810

814811
// confirm the relationship doesn't exist
815812
Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{
816813
ResourceType: "namespace",
817814
OptionalResourceId: chaniNamespace,
818815
OptionalRelation: "creator",
819816
OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"},
820-
}))).To(BeZero())
821-
822-
// confirm paul can get the namespace
823-
Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())
817+
}))).To(Equal(1))
824818
})
825819

826-
It("recovers deletes when spicedb write succeeds but crashes", func(ctx context.Context) {
820+
It("recovers a `delete` operation after a SpiceDB write followed by crash, retries and succeeds", func(ctx context.Context) {
827821
// paul creates his namespace
828822
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())
829823
Expect(CreatePod(ctx, paulClient, paulNamespace, paulPod)).To(Succeed())
830824

831825
// chani can't create the same pod
832826
Expect(k8serrors.IsAlreadyExists(CreatePod(ctx, chaniClient, paulNamespace, paulPod))).To(BeTrue())
833-
fmt.Println(GetPod(ctx, chaniClient, paulNamespace, paulPod))
834-
Expect(k8serrors.IsUnauthorized(GetPod(ctx, chaniClient, paulNamespace, paulPod))).To(BeTrue())
827+
// chani isn't authorized to get the pod
828+
err := GetPod(ctx, chaniClient, paulNamespace, paulPod)
829+
Expect(k8serrors.IsUnauthorized(err)).To(BeTrue())
835830

836831
// make spicedb write crash on pod delete
837-
failpoints.EnableFailPoint("panicSpiceDBReadResp", 1)
838-
err := DeletePod(ctx, paulClient, paulNamespace, paulPod)
839-
if lockMode == proxyrule.OptimisticLockMode {
840-
Expect(err).To(Succeed())
841-
} else {
842-
Expect(k8serrors.IsConflict(err) || k8serrors.IsUnauthorized(err)).To(BeTrue())
843-
// paul sees the request fail, so he tries again:
844-
Expect(DeletePod(ctx, paulClient, paulNamespace, paulPod)).To(Succeed())
845-
}
832+
failpoints.EnableFailPoint("panicSpiceDBWriteResp", 1)
833+
err = DeletePod(ctx, paulClient, paulNamespace, paulPod)
834+
Expect(err).To(Succeed())
846835

847836
// chani can now re-create paul's pod and take ownership
848837
Expect(CreatePod(ctx, chaniClient, paulNamespace, paulPod)).To(Succeed())

pkg/authz/distributedtx/activity.go

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import (
77
"io"
88
"net/http"
99

10+
"github.com/cespare/xxhash/v2"
1011
k8serrors "k8s.io/apimachinery/pkg/api/errors"
1112
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1213
"k8s.io/apiserver/pkg/endpoints/request"
1314
"k8s.io/client-go/rest"
15+
"k8s.io/klog/v2"
1416

1517
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
1618

@@ -38,11 +40,97 @@ type ActivityHandler struct {
3840
}
3941

4042
// WriteToSpiceDB writes relationships to spicedb and returns any errors.
41-
func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) {
43+
func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest, workflowID string) (struct{}, error) {
4244
failpoints.FailPoint("panicWriteSpiceDB")
43-
out, err := h.PermissionClient.WriteRelationships(ctx, input)
44-
failpoints.FailPoint("panicSpiceDBReadResp")
45-
return out, err
45+
idempotencyKey, err := idempotencyKeyForPayload(input, workflowID)
46+
if err != nil {
47+
return struct{}{}, fmt.Errorf("failed to create idempotency key for payload: %w", err)
48+
}
49+
cloned := input
50+
cloned.Updates = append(cloned.Updates, &v1.RelationshipUpdate{
51+
Operation: v1.RelationshipUpdate_OPERATION_CREATE,
52+
Relationship: idempotencyKey,
53+
})
54+
55+
_, err = h.PermissionClient.WriteRelationships(ctx, input)
56+
failpoints.FailPoint("panicSpiceDBWriteResp")
57+
if err != nil {
58+
exists, err := isRelExists(ctx, h.PermissionClient, idempotencyKey)
59+
if err != nil {
60+
return struct{}{}, err
61+
}
62+
63+
if exists {
64+
klog.Infof("idempotency key already exists, relationships were already written: %v", idempotencyKey)
65+
return struct{}{}, nil // idempotent write, key already exists
66+
}
67+
68+
return struct{}{}, err
69+
}
70+
71+
return struct{}{}, nil
72+
}
73+
74+
// idempotencyKeyForPayload computes an idempotency key off a proto payload for a request, and the workflow ID that executed it.
75+
func idempotencyKeyForPayload(input *v1.WriteRelationshipsRequest, workflowID string) (*v1.Relationship, error) {
76+
bytes, err := input.MarshalVT()
77+
if err != nil {
78+
return nil, err
79+
}
80+
81+
return &v1.Relationship{
82+
Resource: &v1.ObjectReference{
83+
ObjectType: "workflow",
84+
ObjectId: workflowID,
85+
},
86+
Relation: "idempotency_key",
87+
Subject: &v1.SubjectReference{
88+
Object: &v1.ObjectReference{
89+
ObjectType: "activity",
90+
ObjectId: fmt.Sprintf("%x", xxhash.Sum64(bytes)),
91+
},
92+
},
93+
}, nil
94+
}
95+
96+
func isRelExists(ctx context.Context, client v1.PermissionsServiceClient, toCheck *v1.Relationship) (bool, error) {
97+
req := readRequestForRel(toCheck)
98+
99+
resp, err := client.ReadRelationships(ctx, req)
100+
if err != nil {
101+
klog.ErrorS(err, "failed to check existence of relationship", "rel", toCheck)
102+
return false, fmt.Errorf("failed to determine if relationship exists: %w", err)
103+
}
104+
105+
var exists bool
106+
_, err = resp.Recv()
107+
if err != nil {
108+
if !errors.Is(err, io.EOF) {
109+
return false, fmt.Errorf("failed to determine existence of relationship: %w", err)
110+
}
111+
} else {
112+
exists = true
113+
}
114+
115+
return exists, nil
116+
}
117+
118+
func readRequestForRel(lock *v1.Relationship) *v1.ReadRelationshipsRequest {
119+
req := &v1.ReadRelationshipsRequest{
120+
RelationshipFilter: &v1.RelationshipFilter{
121+
ResourceType: lock.Resource.ObjectType,
122+
OptionalResourceId: lock.Resource.ObjectId,
123+
OptionalRelation: lock.Relation,
124+
OptionalSubjectFilter: &v1.SubjectFilter{
125+
SubjectType: lock.Subject.Object.ObjectType,
126+
OptionalSubjectId: lock.Subject.Object.ObjectId,
127+
OptionalRelation: &v1.SubjectFilter_RelationFilter{
128+
Relation: lock.Subject.OptionalRelation,
129+
},
130+
},
131+
},
132+
}
133+
return req
46134
}
47135

48136
// ReadRelationships reads relationships from spicedb and returns any errors.

pkg/authz/distributedtx/workflow.go

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (r *RollbackRelationships) WithRels(relationships ...*v1.RelationshipUpdate
8383
return r
8484
}
8585

86-
func (r *RollbackRelationships) Cleanup(ctx workflow.Context, reason string) {
86+
func (r *RollbackRelationships) Cleanup(ctx workflow.Context, workflowID, reason string) {
8787
invert := func(op v1.RelationshipUpdate_Operation) v1.RelationshipUpdate_Operation {
8888
switch op {
8989
case v1.RelationshipUpdate_OPERATION_CREATE:
@@ -106,10 +106,10 @@ func (r *RollbackRelationships) Cleanup(ctx workflow.Context, reason string) {
106106
}
107107

108108
for {
109-
f := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx,
109+
f := workflow.ExecuteActivity[struct{}](ctx,
110110
workflow.DefaultActivityOptions,
111111
activityHandler.WriteToSpiceDB,
112-
&v1.WriteRelationshipsRequest{Updates: updates})
112+
&v1.WriteRelationshipsRequest{Updates: updates}, workflowID)
113113

114114
if _, err := f.Get(ctx); err != nil {
115115
if s, ok := status.FromError(err); ok {
@@ -181,18 +181,18 @@ func PessimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput
181181
Updates: append(updates, resourceLockRel),
182182
}
183183

184-
_, err := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx,
184+
_, err := workflow.ExecuteActivity[struct{}](ctx,
185185
workflow.DefaultActivityOptions,
186186
activityHandler.WriteToSpiceDB,
187-
arg).Get(ctx)
187+
arg, instance.InstanceID).Get(ctx)
188188
if err != nil {
189189
// request failed for some reason
190190
klog.ErrorS(err, "spicedb write failed")
191191
for _, u := range updates {
192192
klog.V(3).InfoS("update details", "update", u.String(), "relationship", u)
193193
}
194194

195-
rollback.WithRels(updates...).Cleanup(ctx, "rollback due to failed SpiceDB write")
195+
rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to failed SpiceDB write")
196196

197197
// if the spicedb write fails, report it as a kube conflict error
198198
// we return this for any error, not just lock conflicts, so that the
@@ -231,21 +231,21 @@ func PessimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput
231231
isSuccessful, err := isSuccessfulKuberentesOperation(input, out)
232232
if err != nil {
233233
klog.V(1).ErrorS(err, "error checking kube response", "response", out, "verb", input.RequestInfo.Verb)
234-
rollback.WithRels(updates...).Cleanup(ctx, "rollback due to failed kube operation after max attempts")
234+
rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to failed kube operation after max attempts")
235235
return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts: %w", MaxKubeAttempts, err)
236236
}
237237

238238
if isSuccessful {
239-
rollback.Cleanup(ctx, fmt.Sprintf("cleanup after successful kube operation: %s", input.RequestInfo.Verb))
239+
rollback.Cleanup(ctx, instance.InstanceID, fmt.Sprintf("cleanup after successful kube operation: %s", input.RequestInfo.Verb))
240240
return out, nil
241241
}
242242

243243
klog.V(3).ErrorS(err, "unsuccessful Kube API operation on PessimisticWriteToSpiceDBAndKube", "response", out, "verb", input.RequestInfo.Verb)
244-
rollback.WithRels(updates...).Cleanup(ctx, "rollback due to unsuccessful kube operation")
244+
rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to unsuccessful kube operation")
245245
return out, nil
246246
}
247247

248-
rollback.WithRels(updates...).Cleanup(ctx, "rollback due to failed kube operation after max attempts")
248+
rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to failed kube operation after max attempts")
249249
return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts", MaxKubeAttempts)
250250
}
251251

@@ -310,15 +310,16 @@ func OptimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput)
310310
return nil, fmt.Errorf("failed to append deletes from filters: %w", err)
311311
}
312312

313+
instance := workflow.WorkflowInstance(ctx)
313314
rollback := NewRollbackRelationships(updates...)
314-
_, err := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx,
315+
_, err := workflow.ExecuteActivity[struct{}](ctx,
315316
workflow.DefaultActivityOptions,
316317
activityHandler.WriteToSpiceDB,
317318
&v1.WriteRelationshipsRequest{
318319
Updates: updates,
319-
}).Get(ctx)
320+
}, instance.InstanceID).Get(ctx)
320321
if err != nil {
321-
rollback.Cleanup(ctx, "rollback due to failed SpiceDB write")
322+
rollback.Cleanup(ctx, instance.InstanceID, "rollback due to failed SpiceDB write")
322323
klog.ErrorS(err, "SpiceDB write failed")
323324
// report spicedb write errors as conflicts
324325
return KubeConflict(err, input), nil
@@ -342,7 +343,7 @@ func OptimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput)
342343

343344
// if the object doesn't exist, clean up the spicedb write
344345
if !exists {
345-
rollback.Cleanup(ctx, "rollback due to failed Kube write")
346+
rollback.Cleanup(ctx, instance.InstanceID, "rollback due to failed Kube write")
346347
return nil, err
347348
}
348349
}

pkg/spicedb/bootstrap.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ schema: |-
2828
definition lock {
2929
relation workflow: workflow
3030
}
31-
definition workflow {}
31+
32+
definition workflow {
33+
relation idempotency_key: activity
34+
}
35+
36+
definition activity{}
3237
relationships: |
3338
namespace:spicedb-kubeapi-proxy#viewer@user:rakis

0 commit comments

Comments
 (0)