diff --git a/e2e/proxy_test.go b/e2e/proxy_test.go index bca85b3..15468fe 100644 --- a/e2e/proxy_test.go +++ b/e2e/proxy_test.go @@ -792,23 +792,20 @@ var _ = Describe("Proxy", func() { Expect(owners[0].Relationship.Subject.Object.ObjectId).To(Equal("chani")) }) - It("recovers writes when spicedb write succeeds but crashes", func(ctx context.Context) { + It("recovers a `create` operation after a SpiceDB write followed by crash, retries and succeeds", func(ctx context.Context) { // paul creates his namespace Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) - // make spicedb write crash on chani's namespace write - failpoints.EnableFailPoint("panicSpiceDBReadResp", 1) + // make spicedb write crash on chani's namespace write, it should be retried idempotently and succeed + failpoints.EnableFailPoint("panicSpiceDBWriteResp", 1) err := CreateNamespace(ctx, chaniClient, chaniNamespace) - Expect(err).ToNot(BeNil()) - // pessimistic locking reports a conflict, optimistic locking reports already exists - Expect(k8serrors.IsConflict(err) || k8serrors.IsAlreadyExists(err)).To(BeTrue()) + Expect(err).To(Succeed()) - // paul creates chani's namespace so that the namespace exists - Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + // check that chani can get her namespace + Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed()) - // check that chani can't get her namespace, indirectly showing - // that the spicedb write was rolled back - Expect(k8serrors.IsUnauthorized(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue()) + // check that paul can't read chani's namespace + Expect(k8serrors.IsUnauthorized(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue()) // confirm the relationship doesn't exist Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{ @@ -816,33 +813,24 @@ var _ = Describe("Proxy", func() { OptionalResourceId: chaniNamespace, OptionalRelation: "creator", OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"}, - }))).To(BeZero()) - - // confirm paul can get the namespace - Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed()) + }))).To(Equal(1)) }) - It("recovers deletes when spicedb write succeeds but crashes", func(ctx context.Context) { + It("recovers a `delete` operation after a SpiceDB write followed by crash, retries and succeeds", func(ctx context.Context) { // paul creates his namespace Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed()) Expect(CreatePod(ctx, paulClient, paulNamespace, paulPod)).To(Succeed()) // chani can't create the same pod Expect(k8serrors.IsAlreadyExists(CreatePod(ctx, chaniClient, paulNamespace, paulPod))).To(BeTrue()) - fmt.Println(GetPod(ctx, chaniClient, paulNamespace, paulPod)) - Expect(k8serrors.IsUnauthorized(GetPod(ctx, chaniClient, paulNamespace, paulPod))).To(BeTrue()) + // chani isn't authorized to get the pod + err := GetPod(ctx, chaniClient, paulNamespace, paulPod) + Expect(k8serrors.IsUnauthorized(err)).To(BeTrue()) // make spicedb write crash on pod delete - failpoints.EnableFailPoint("panicSpiceDBReadResp", 1) - err := DeletePod(ctx, paulClient, paulNamespace, paulPod) - if lockMode == proxyrule.OptimisticLockMode { - Expect(err).To(Succeed()) - } else { - fmt.Println(err) - Expect(k8serrors.IsUnauthorized(err)).To(BeTrue()) - // paul sees the request fail, so he tries again: - Expect(DeletePod(ctx, paulClient, paulNamespace, paulPod)).To(Succeed()) - } + failpoints.EnableFailPoint("panicSpiceDBWriteResp", 1) + err = DeletePod(ctx, paulClient, paulNamespace, paulPod) + Expect(err).To(Succeed()) // chani can now re-create paul's pod and take ownership Expect(CreatePod(ctx, chaniClient, paulNamespace, paulPod)).To(Succeed()) diff --git a/go.mod b/go.mod index 5db6548..50f08de 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/warpstreamlabs/bento v1.8.2 golang.org/x/sync v0.15.0 google.golang.org/grpc v1.73.0 + google.golang.org/protobuf v1.36.6 k8s.io/apimachinery v0.33.1 k8s.io/apiserver v0.33.1 k8s.io/client-go v0.33.1 @@ -411,7 +412,6 @@ require ( google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/protobuf v1.36.6 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/go-jose/go-jose.v2 v2.6.3 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/magefiles/test.go b/magefiles/test.go index 79027a7..0dae0cd 100644 --- a/magefiles/test.go +++ b/magefiles/test.go @@ -27,6 +27,17 @@ func (t Test) Unit() error { // E2e runs the end-to-end tests against a real apiserver. func (t Test) E2e() error { + args := append(e2eArgs(), "../e2e") + return RunSh("go", Tool())(args...) +} + +// E2eUntilItFails runs the end-to-end tests indefinitely against a real apiserver until it fails. +func (t Test) E2eUntilItFails() error { + args := append(e2eArgs(), "--until-it-fails", "../e2e") + return RunSh("go", Tool())(args...) +} + +func e2eArgs() []string { args := []string{"run", "github.com/onsi/ginkgo/v2/ginkgo"} args = append(args, "--tags=e2e,failpoints", @@ -41,6 +52,5 @@ func (t Test) E2e() error { "-vv", "--fail-fast", "--randomize-all") - args = append(args, "../e2e") - return RunSh("go", Tool())(args...) + return args } diff --git a/pkg/authz/distributedtx/activity.go b/pkg/authz/distributedtx/activity.go index 2545c93..c2f67ac 100644 --- a/pkg/authz/distributedtx/activity.go +++ b/pkg/authz/distributedtx/activity.go @@ -6,17 +6,23 @@ import ( "fmt" "io" "net/http" + "time" + "github.com/cespare/xxhash/v2" + "google.golang.org/protobuf/types/known/timestamppb" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/rest" + "k8s.io/klog/v2" v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints" ) +const defaultIdempotencyKeyExpiration = 24 * time.Hour + type KubeReqInput struct { RequestURI string RequestInfo *request.RequestInfo @@ -38,11 +44,108 @@ type ActivityHandler struct { } // WriteToSpiceDB writes relationships to spicedb and returns any errors. -func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) { +func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest, workflowID string) (*v1.ZedToken, error) { failpoints.FailPoint("panicWriteSpiceDB") - out, err := h.PermissionClient.WriteRelationships(ctx, input) - failpoints.FailPoint("panicSpiceDBReadResp") - return out, err + idempotencyKey, err := idempotencyKeyForPayload(input, workflowID) + if err != nil { + return nil, fmt.Errorf("failed to create idempotency key for payload: %w", err) + } + + cloned := input.CloneVT() + cloned.Updates = append(cloned.Updates, &v1.RelationshipUpdate{ + Operation: v1.RelationshipUpdate_OPERATION_CREATE, + Relationship: idempotencyKey, + }) + + response, err := h.PermissionClient.WriteRelationships(ctx, cloned) + failpoints.FailPoint("panicSpiceDBWriteResp") + if err != nil { + exists, evaluatedAt, relErr := isRelExists(ctx, h.PermissionClient, idempotencyKey) + if relErr != nil { + return nil, relErr + } + + if exists { + klog.Infof("idempotency key already exists, relationships were already written: %v", idempotencyKey) + return evaluatedAt, nil // idempotent write, key already exists + } + + return nil, err + } + + return response.WrittenAt, nil +} + +// idempotencyKeyForPayload computes an idempotency key off a proto payload for a request, and the workflow ID that executed it. +func idempotencyKeyForPayload(input *v1.WriteRelationshipsRequest, workflowID string) (*v1.Relationship, error) { + bytes, err := input.MarshalVT() + if err != nil { + return nil, err + } + + return &v1.Relationship{ + Resource: &v1.ObjectReference{ + ObjectType: "workflow", + ObjectId: workflowID, + }, + Relation: "idempotency_key", + Subject: &v1.SubjectReference{ + Object: &v1.ObjectReference{ + ObjectType: "activity", + ObjectId: fmt.Sprintf("%x", xxhash.Sum64(bytes)), + }, + }, + OptionalExpiresAt: ×tamppb.Timestamp{ + Seconds: time.Now().Add(defaultIdempotencyKeyExpiration).Unix(), + }, + }, nil +} + +func isRelExists(ctx context.Context, client v1.PermissionsServiceClient, toCheck *v1.Relationship) (bool, *v1.ZedToken, error) { + req := readRequestForRel(toCheck) + + resp, err := client.ReadRelationships(ctx, req) + if err != nil { + klog.ErrorS(err, "failed to check existence of relationship", "rel", toCheck) + return false, nil, fmt.Errorf("failed to determine if relationship exists: %w", err) + } + + var exists bool + var token *v1.ZedToken + res, err := resp.Recv() + if err != nil { + if !errors.Is(err, io.EOF) { + return false, nil, fmt.Errorf("failed to determine existence of relationship: %w", err) + } + } else { + token = res.ReadAt + exists = true + } + + return exists, token, nil +} + +func readRequestForRel(lock *v1.Relationship) *v1.ReadRelationshipsRequest { + req := &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_FullyConsistent{ + FullyConsistent: true, + }, + }, + RelationshipFilter: &v1.RelationshipFilter{ + ResourceType: lock.Resource.ObjectType, + OptionalResourceId: lock.Resource.ObjectId, + OptionalRelation: lock.Relation, + OptionalSubjectFilter: &v1.SubjectFilter{ + SubjectType: lock.Subject.Object.ObjectType, + OptionalSubjectId: lock.Subject.Object.ObjectId, + OptionalRelation: &v1.SubjectFilter_RelationFilter{ + Relation: lock.Subject.OptionalRelation, + }, + }, + }, + } + return req } // ReadRelationships reads relationships from spicedb and returns any errors. diff --git a/pkg/authz/distributedtx/activity_test.go b/pkg/authz/distributedtx/activity_test.go index 1ba5cf7..6502134 100644 --- a/pkg/authz/distributedtx/activity_test.go +++ b/pkg/authz/distributedtx/activity_test.go @@ -12,6 +12,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/rest/fake" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" ) type testRoundTripper struct { @@ -105,3 +107,18 @@ func TestCheckKubeResourceError(t *testing.T) { require.Error(t, err) } + +func TestIdempotencyKey(t *testing.T) { + payload := &v1.WriteRelationshipsRequest{} + key, err := idempotencyKeyForPayload(payload, "test-key") + require.NoError(t, err) + require.NotNil(t, key) + require.Equal(t, "test-key", key.Resource.ObjectId) + require.NotNil(t, key.OptionalExpiresAt) + + sameKey, err := idempotencyKeyForPayload(payload, "test-key") + require.NoError(t, err) + key.OptionalExpiresAt = nil + sameKey.OptionalExpiresAt = nil + require.True(t, key.EqualMessageVT(sameKey)) +} diff --git a/pkg/authz/distributedtx/workflow.go b/pkg/authz/distributedtx/workflow.go index 58bbd72..8ede34f 100644 --- a/pkg/authz/distributedtx/workflow.go +++ b/pkg/authz/distributedtx/workflow.go @@ -83,7 +83,7 @@ func (r *RollbackRelationships) WithRels(relationships ...*v1.RelationshipUpdate return r } -func (r *RollbackRelationships) Cleanup(ctx workflow.Context, reason string) { +func (r *RollbackRelationships) Cleanup(ctx workflow.Context, workflowID, reason string) { invert := func(op v1.RelationshipUpdate_Operation) v1.RelationshipUpdate_Operation { switch op { case v1.RelationshipUpdate_OPERATION_CREATE: @@ -106,10 +106,10 @@ func (r *RollbackRelationships) Cleanup(ctx workflow.Context, reason string) { } for { - f := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx, + f := workflow.ExecuteActivity[*v1.ZedToken](ctx, workflow.DefaultActivityOptions, activityHandler.WriteToSpiceDB, - &v1.WriteRelationshipsRequest{Updates: updates}) + &v1.WriteRelationshipsRequest{Updates: updates}, workflowID) if _, err := f.Get(ctx); err != nil { if s, ok := status.FromError(err); ok { @@ -181,10 +181,10 @@ func PessimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput Updates: append(updates, resourceLockRel), } - _, err := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx, + _, err := workflow.ExecuteActivity[*v1.ZedToken](ctx, workflow.DefaultActivityOptions, activityHandler.WriteToSpiceDB, - arg).Get(ctx) + arg, instance.InstanceID).Get(ctx) if err != nil { // request failed for some reason klog.ErrorS(err, "spicedb write failed") @@ -192,7 +192,7 @@ func PessimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput klog.V(3).InfoS("update details", "update", u.String(), "relationship", u) } - rollback.WithRels(updates...).Cleanup(ctx, "rollback due to failed SpiceDB write") + rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to failed SpiceDB write") // if the spicedb write fails, report it as a kube conflict error // we return this for any error, not just lock conflicts, so that the @@ -231,21 +231,21 @@ func PessimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput isSuccessful, err := isSuccessfulKuberentesOperation(input, out) if err != nil { klog.V(1).ErrorS(err, "error checking kube response", "response", out, "verb", input.RequestInfo.Verb) - rollback.WithRels(updates...).Cleanup(ctx, "rollback due to failed kube operation after max attempts") + rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to failed kube operation after max attempts") return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts: %w", MaxKubeAttempts, err) } if isSuccessful { - rollback.Cleanup(ctx, fmt.Sprintf("cleanup after successful kube operation: %s", input.RequestInfo.Verb)) + rollback.Cleanup(ctx, instance.InstanceID, fmt.Sprintf("cleanup after successful kube operation: %s", input.RequestInfo.Verb)) return out, nil } klog.V(3).ErrorS(err, "unsuccessful Kube API operation on PessimisticWriteToSpiceDBAndKube", "response", out, "verb", input.RequestInfo.Verb) - rollback.WithRels(updates...).Cleanup(ctx, "rollback due to unsuccessful kube operation") + rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to unsuccessful kube operation") return out, nil } - rollback.WithRels(updates...).Cleanup(ctx, "rollback due to failed kube operation after max attempts") + rollback.WithRels(updates...).Cleanup(ctx, instance.InstanceID, "rollback due to failed kube operation after max attempts") return nil, fmt.Errorf("failed to communicate with kubernetes after %d attempts", MaxKubeAttempts) } @@ -310,15 +310,16 @@ func OptimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput) return nil, fmt.Errorf("failed to append deletes from filters: %w", err) } + instance := workflow.WorkflowInstance(ctx) rollback := NewRollbackRelationships(updates...) - _, err := workflow.ExecuteActivity[*v1.WriteRelationshipsResponse](ctx, + _, err := workflow.ExecuteActivity[*v1.ZedToken](ctx, workflow.DefaultActivityOptions, activityHandler.WriteToSpiceDB, &v1.WriteRelationshipsRequest{ Updates: updates, - }).Get(ctx) + }, instance.InstanceID).Get(ctx) if err != nil { - rollback.Cleanup(ctx, "rollback due to failed SpiceDB write") + rollback.Cleanup(ctx, instance.InstanceID, "rollback due to failed SpiceDB write") klog.ErrorS(err, "SpiceDB write failed") // report spicedb write errors as conflicts return KubeConflict(err, input), nil @@ -342,7 +343,7 @@ func OptimisticWriteToSpiceDBAndKube(ctx workflow.Context, input *WriteObjInput) // if the object doesn't exist, clean up the spicedb write if !exists { - rollback.Cleanup(ctx, "rollback due to failed Kube write") + rollback.Cleanup(ctx, instance.InstanceID, "rollback due to failed Kube write") return nil, err } } @@ -420,11 +421,29 @@ func ResourceLockRel(input *WriteObjInput, workflowID string) *v1.RelationshipUp // KubeConflict wraps an error and turns it into a standard kube conflict // response. func KubeConflict(err error, input *WriteObjInput) *KubeResp { + var group, resource, name string + + if input == nil { + klog.Warningf("input to KubeConflict is nil for error %s", err) + } else { + if input.RequestInfo == nil { + klog.Warningf("input to KubeConflict has nil RequestInfo for error %s", err) + } else { + group = input.RequestInfo.APIGroup + resource = input.RequestInfo.Resource + } + if input.ObjectMeta == nil { + klog.Warningf("input to KubeConflict has nil ObjectMeta for error %s", err) + } else { + name = input.ObjectMeta.Name + } + } + var out KubeResp statusError := k8serrors.NewConflict(schema.GroupResource{ - Group: input.RequestInfo.APIGroup, - Resource: input.RequestInfo.Resource, - }, input.ObjectMeta.Name, err) + Group: group, + Resource: resource, + }, name, err) out.StatusCode = http.StatusConflict out.Err = *statusError out.Body, _ = json.Marshal(statusError) diff --git a/pkg/authz/distributedtx/workflow_test.go b/pkg/authz/distributedtx/workflow_test.go index a483178..d617210 100644 --- a/pkg/authz/distributedtx/workflow_test.go +++ b/pkg/authz/distributedtx/workflow_test.go @@ -2,6 +2,7 @@ package distributedtx import ( "context" + "errors" "io" "net/http" "strings" @@ -119,3 +120,94 @@ func TestWorkflow(t *testing.T) { }) } } + +func TestKubeConflict(t *testing.T) { + t.Parallel() + testCases := map[string]struct { + inputErr error + input *WriteObjInput + expectedJSON string + }{ + `nil input`: { + inputErr: errors.New("some err"), + input: nil, + expectedJSON: `{ + "ErrStatus" : { + "metadata" : { }, + "status" : "Failure", + "message" : "Operation cannot be fulfilled on \"\": some err", + "reason" : "Conflict", + "details" : { }, + "code" : 409 + } +}`, + }, + `nil request info`: { + inputErr: errors.New("some err"), + input: &WriteObjInput{ + ObjectMeta: &metav1.ObjectMeta{Name: "my_object_meta"}, + }, + expectedJSON: `{ + "ErrStatus" : { + "metadata" : { }, + "status" : "Failure", + "message" : "Operation cannot be fulfilled on \"my_object_meta\": some err", + "reason" : "Conflict", + "details" : { + "name" : "my_object_meta" + }, + "code" : 409 + } +}`, + }, + `nil object meta`: { + inputErr: errors.New("some err"), + input: &WriteObjInput{ + RequestInfo: &request.RequestInfo{APIGroup: "foo", Resource: "bar"}, + }, + expectedJSON: `{ + "ErrStatus" : { + "metadata" : { }, + "status" : "Failure", + "message" : "Operation cannot be fulfilled on bar.foo \"\": some err", + "reason" : "Conflict", + "details" : { + "group" : "foo", + "kind" : "bar" + }, + "code" : 409 + } +}`, + }, + `valid input`: { + inputErr: errors.New("some err"), + input: &WriteObjInput{ + RequestInfo: &request.RequestInfo{APIGroup: "foo", Resource: "bar"}, + ObjectMeta: &metav1.ObjectMeta{Name: "my_object_meta"}, + }, + expectedJSON: `{ + "ErrStatus" : { + "metadata" : { }, + "status" : "Failure", + "message" : "Operation cannot be fulfilled on bar.foo \"my_object_meta\": some err", + "reason" : "Conflict", + "details" : { + "name" : "my_object_meta", + "group" : "foo", + "kind" : "bar" + }, + "code" : 409 + } +}`, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + t.Parallel() + + res := KubeConflict(tc.inputErr, tc.input) + require.NotNil(t, res, "expected non-nil response") + require.JSONEq(t, tc.expectedJSON, string(res.Body), "unexpected response body") + }) + } +} diff --git a/pkg/proxy/options_test.go b/pkg/proxy/options_test.go index eeb8700..2a45a6b 100644 --- a/pkg/proxy/options_test.go +++ b/pkg/proxy/options_test.go @@ -210,6 +210,7 @@ func newTCPSpiceDB(t *testing.T, ctx context.Context) (server.RunnableServer, st server.WithDispatchCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), server.WithNamespaceCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), server.WithClusterDispatchCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), + server.WithEnableExperimentalRelationshipExpiration(true), server.WithDatastore(ds), } diff --git a/pkg/spicedb/bootstrap.yaml b/pkg/spicedb/bootstrap.yaml index dd062ab..0bb3044 100644 --- a/pkg/spicedb/bootstrap.yaml +++ b/pkg/spicedb/bootstrap.yaml @@ -1,4 +1,6 @@ schema: |- + use expiration + definition cluster {} definition user {} definition namespace { @@ -28,6 +30,11 @@ schema: |- definition lock { relation workflow: workflow } - definition workflow {} + + definition workflow { + relation idempotency_key: activity with expiration + } + + definition activity{} relationships: | namespace:spicedb-kubeapi-proxy#viewer@user:rakis diff --git a/pkg/spicedb/spicedb.go b/pkg/spicedb/spicedb.go index 35b8c81..14457ee 100644 --- a/pkg/spicedb/spicedb.go +++ b/pkg/spicedb/spicedb.go @@ -42,6 +42,7 @@ func NewServer(ctx context.Context, bootstrapFilePath string) (server.RunnableSe server.WithDispatchCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), server.WithNamespaceCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), server.WithClusterDispatchCacheConfig(server.CacheConfig{Enabled: false, Metrics: false}), + server.WithEnableExperimentalRelationshipExpiration(true), server.WithDatastoreConfig( *datastore.NewConfigWithOptionsAndDefaults().WithOptions( datastore.WithEngine(datastore.MemoryEngine),