Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,57 +792,45 @@ var _ = Describe("Proxy", func() {
Expect(owners[0].Relationship.Subject.Object.ObjectId).To(Equal("chani"))
})

It("recovers writes when spicedb write succeeds but crashes", func(ctx context.Context) {
It("recovers a `create` operation after a SpiceDB write followed by crash, retries and succeeds", func(ctx context.Context) {
// paul creates his namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())

// make spicedb write crash on chani's namespace write
failpoints.EnableFailPoint("panicSpiceDBReadResp", 1)
// make spicedb write crash on chani's namespace write, it should be retried idempotently and succeed
failpoints.EnableFailPoint("panicSpiceDBWriteResp", 1)
err := CreateNamespace(ctx, chaniClient, chaniNamespace)
Expect(err).ToNot(BeNil())
// pessimistic locking reports a conflict, optimistic locking reports already exists
Expect(k8serrors.IsConflict(err) || k8serrors.IsAlreadyExists(err)).To(BeTrue())
Expect(err).To(Succeed())

// paul creates chani's namespace so that the namespace exists
Expect(CreateNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())
// check that chani can get her namespace
Expect(GetNamespace(ctx, chaniClient, chaniNamespace)).To(Succeed())

// check that chani can't get her namespace, indirectly showing
// that the spicedb write was rolled back
Expect(k8serrors.IsUnauthorized(GetNamespace(ctx, chaniClient, chaniNamespace))).To(BeTrue())
// check that paul can't read chani's namespace
Expect(k8serrors.IsUnauthorized(GetNamespace(ctx, paulClient, chaniNamespace))).To(BeTrue())

// confirm the relationship doesn't exist
Expect(len(GetAllTuples(ctx, &v1.RelationshipFilter{
ResourceType: "namespace",
OptionalResourceId: chaniNamespace,
OptionalRelation: "creator",
OptionalSubjectFilter: &v1.SubjectFilter{SubjectType: "user", OptionalSubjectId: "chani"},
}))).To(BeZero())

// confirm paul can get the namespace
Expect(GetNamespace(ctx, paulClient, chaniNamespace)).To(Succeed())
}))).To(Equal(1))
})

It("recovers deletes when spicedb write succeeds but crashes", func(ctx context.Context) {
It("recovers a `delete` operation after a SpiceDB write followed by crash, retries and succeeds", func(ctx context.Context) {
// paul creates his namespace
Expect(CreateNamespace(ctx, paulClient, paulNamespace)).To(Succeed())
Expect(CreatePod(ctx, paulClient, paulNamespace, paulPod)).To(Succeed())

// chani can't create the same pod
Expect(k8serrors.IsAlreadyExists(CreatePod(ctx, chaniClient, paulNamespace, paulPod))).To(BeTrue())
fmt.Println(GetPod(ctx, chaniClient, paulNamespace, paulPod))
Expect(k8serrors.IsUnauthorized(GetPod(ctx, chaniClient, paulNamespace, paulPod))).To(BeTrue())
// chani isn't authorized to get the pod
err := GetPod(ctx, chaniClient, paulNamespace, paulPod)
Expect(k8serrors.IsUnauthorized(err)).To(BeTrue())

// make spicedb write crash on pod delete
failpoints.EnableFailPoint("panicSpiceDBReadResp", 1)
err := DeletePod(ctx, paulClient, paulNamespace, paulPod)
if lockMode == proxyrule.OptimisticLockMode {
Expect(err).To(Succeed())
} else {
fmt.Println(err)
Expect(k8serrors.IsUnauthorized(err)).To(BeTrue())
// paul sees the request fail, so he tries again:
Expect(DeletePod(ctx, paulClient, paulNamespace, paulPod)).To(Succeed())
}
failpoints.EnableFailPoint("panicSpiceDBWriteResp", 1)
err = DeletePod(ctx, paulClient, paulNamespace, paulPod)
Expect(err).To(Succeed())

// chani can now re-create paul's pod and take ownership
Expect(CreatePod(ctx, chaniClient, paulNamespace, paulPod)).To(Succeed())
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/warpstreamlabs/bento v1.8.2
golang.org/x/sync v0.15.0
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
k8s.io/apimachinery v0.33.1
k8s.io/apiserver v0.33.1
k8s.io/client-go v0.33.1
Expand Down Expand Up @@ -411,7 +412,6 @@ require (
google.golang.org/genproto v0.0.0-20250505200425-f936aa4a68b2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
gopkg.in/go-jose/go-jose.v2 v2.6.3 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
14 changes: 12 additions & 2 deletions magefiles/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ func (t Test) Unit() error {

// E2e runs the end-to-end tests against a real apiserver.
func (t Test) E2e() error {
args := append(e2eArgs(), "../e2e")
return RunSh("go", Tool())(args...)
}

// E2eUntilItFails runs the end-to-end tests indefinitely against a real apiserver until it fails.
func (t Test) E2eUntilItFails() error {
args := append(e2eArgs(), "--until-it-fails", "../e2e")
return RunSh("go", Tool())(args...)
}

func e2eArgs() []string {
args := []string{"run", "github.com/onsi/ginkgo/v2/ginkgo"}
args = append(args,
"--tags=e2e,failpoints",
Expand All @@ -41,6 +52,5 @@ func (t Test) E2e() error {
"-vv",
"--fail-fast",
"--randomize-all")
args = append(args, "../e2e")
return RunSh("go", Tool())(args...)
return args
}
111 changes: 107 additions & 4 deletions pkg/authz/distributedtx/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ import (
"fmt"
"io"
"net/http"
"time"

"github.com/cespare/xxhash/v2"
"google.golang.org/protobuf/types/known/timestamppb"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"

"github.com/authzed/spicedb-kubeapi-proxy/pkg/failpoints"
)

const defaultIdempotencyKeyExpiration = 24 * time.Hour

type KubeReqInput struct {
RequestURI string
RequestInfo *request.RequestInfo
Expand All @@ -38,11 +44,108 @@ type ActivityHandler struct {
}

// WriteToSpiceDB writes relationships to spicedb and returns any errors.
func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest) (*v1.WriteRelationshipsResponse, error) {
func (h *ActivityHandler) WriteToSpiceDB(ctx context.Context, input *v1.WriteRelationshipsRequest, workflowID string) (*v1.ZedToken, error) {
failpoints.FailPoint("panicWriteSpiceDB")
out, err := h.PermissionClient.WriteRelationships(ctx, input)
failpoints.FailPoint("panicSpiceDBReadResp")
return out, err
idempotencyKey, err := idempotencyKeyForPayload(input, workflowID)
if err != nil {
return nil, fmt.Errorf("failed to create idempotency key for payload: %w", err)
}

cloned := input.CloneVT()
cloned.Updates = append(cloned.Updates, &v1.RelationshipUpdate{
Operation: v1.RelationshipUpdate_OPERATION_CREATE,
Relationship: idempotencyKey,
})

response, err := h.PermissionClient.WriteRelationships(ctx, cloned)
failpoints.FailPoint("panicSpiceDBWriteResp")
if err != nil {
exists, evaluatedAt, relErr := isRelExists(ctx, h.PermissionClient, idempotencyKey)
if relErr != nil {
return nil, relErr
}

if exists {
klog.Infof("idempotency key already exists, relationships were already written: %v", idempotencyKey)
return evaluatedAt, nil // idempotent write, key already exists
}

return nil, err
}

return response.WrittenAt, nil
}

// idempotencyKeyForPayload computes an idempotency key off a proto payload for a request, and the workflow ID that executed it.
func idempotencyKeyForPayload(input *v1.WriteRelationshipsRequest, workflowID string) (*v1.Relationship, error) {
bytes, err := input.MarshalVT()
if err != nil {
return nil, err
}

return &v1.Relationship{
Resource: &v1.ObjectReference{
ObjectType: "workflow",
ObjectId: workflowID,
},
Relation: "idempotency_key",
Subject: &v1.SubjectReference{
Object: &v1.ObjectReference{
ObjectType: "activity",
ObjectId: fmt.Sprintf("%x", xxhash.Sum64(bytes)),
},
},
OptionalExpiresAt: &timestamppb.Timestamp{
Seconds: time.Now().Add(defaultIdempotencyKeyExpiration).Unix(),
},
}, nil
}

func isRelExists(ctx context.Context, client v1.PermissionsServiceClient, toCheck *v1.Relationship) (bool, *v1.ZedToken, error) {
req := readRequestForRel(toCheck)

resp, err := client.ReadRelationships(ctx, req)
if err != nil {
klog.ErrorS(err, "failed to check existence of relationship", "rel", toCheck)
return false, nil, fmt.Errorf("failed to determine if relationship exists: %w", err)
}

var exists bool
var token *v1.ZedToken
res, err := resp.Recv()
if err != nil {
if !errors.Is(err, io.EOF) {
return false, nil, fmt.Errorf("failed to determine existence of relationship: %w", err)
}
} else {
token = res.ReadAt
exists = true
}

return exists, token, nil
}

func readRequestForRel(lock *v1.Relationship) *v1.ReadRelationshipsRequest {
req := &v1.ReadRelationshipsRequest{
Consistency: &v1.Consistency{
Requirement: &v1.Consistency_FullyConsistent{
FullyConsistent: true,
},
},
RelationshipFilter: &v1.RelationshipFilter{
ResourceType: lock.Resource.ObjectType,
OptionalResourceId: lock.Resource.ObjectId,
OptionalRelation: lock.Relation,
OptionalSubjectFilter: &v1.SubjectFilter{
SubjectType: lock.Subject.Object.ObjectType,
OptionalSubjectId: lock.Subject.Object.ObjectId,
OptionalRelation: &v1.SubjectFilter_RelationFilter{
Relation: lock.Subject.OptionalRelation,
},
},
},
}
return req
}

// ReadRelationships reads relationships from spicedb and returns any errors.
Expand Down
17 changes: 17 additions & 0 deletions pkg/authz/distributedtx/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/rest/fake"

v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
)

type testRoundTripper struct {
Expand Down Expand Up @@ -105,3 +107,18 @@ func TestCheckKubeResourceError(t *testing.T) {

require.Error(t, err)
}

func TestIdempotencyKey(t *testing.T) {
payload := &v1.WriteRelationshipsRequest{}
key, err := idempotencyKeyForPayload(payload, "test-key")
require.NoError(t, err)
require.NotNil(t, key)
require.Equal(t, "test-key", key.Resource.ObjectId)
require.NotNil(t, key.OptionalExpiresAt)

sameKey, err := idempotencyKeyForPayload(payload, "test-key")
require.NoError(t, err)
key.OptionalExpiresAt = nil
sameKey.OptionalExpiresAt = nil
require.True(t, key.EqualMessageVT(sameKey))
}
Loading
Loading