diff --git a/pkg/authz/check.go b/pkg/authz/check.go index 2b85eab..ee49e66 100644 --- a/pkg/authz/check.go +++ b/pkg/authz/check.go @@ -28,9 +28,7 @@ func runAllMatchingChecks(ctx context.Context, matchingRules []*rules.RunnableRu return err } req := &v1.CheckPermissionRequest{ - Consistency: &v1.Consistency{ - Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, - }, + Consistency: input.Consistency, Resource: &v1.ObjectReference{ ObjectType: rel.ResourceType, ObjectId: rel.ResourceID, diff --git a/pkg/authz/filter.go b/pkg/authz/filter.go index c7df470..76c6a0f 100644 --- a/pkg/authz/filter.go +++ b/pkg/authz/filter.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "slices" + "time" "github.com/kyverno/go-jmespath" "k8s.io/klog/v2" @@ -79,9 +80,7 @@ func filterList(ctx context.Context, client v1.PermissionsServiceClient, filter defer close(authzData.removedNNC) req := &v1.LookupResourcesRequest{ - Consistency: &v1.Consistency{ - Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, - }, + Consistency: input.Consistency, ResourceObjectType: filter.Rel.ResourceType, Permission: filter.Rel.ResourceRelation, Subject: &v1.SubjectReference{ @@ -173,6 +172,10 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC defer close(authzData.allowedNNC) defer close(authzData.removedNNC) + logger := klog.LoggerWithValues(klog.FromContext(ctx), "request", "watch", "filter", filter).WithCallDepth(1) + + logger.V(3).Info("started watch") + watchResource, err := watchClient.Watch(ctx, &v1.WatchRequest{ OptionalObjectTypes: []string{filter.Rel.ResourceType}, }) @@ -187,14 +190,18 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC } if err != nil { - fmt.Println(err) + logger.V(2).Error(err, "watch error") return } + time.Sleep(input.WatchDelay) + for _, u := range resp.Updates { cr, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ - Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: resp.ChangesThrough, + }, }, Resource: &v1.ObjectReference{ ObjectType: filter.Rel.ResourceType, @@ -211,46 +218,45 @@ func filterWatch(ctx context.Context, client v1.PermissionsServiceClient, watchC }, }) if err != nil { - fmt.Println(err) + logger.V(2).Error(err, "check permission error") return } byteIn, err := json.Marshal(wrapper{ResourceID: u.Relationship.Resource.ObjectId, SubjectID: u.Relationship.Subject.Object.ObjectId}) if err != nil { - fmt.Println(err) + logger.V(2).Error(err, "marshal error") return } var data any if err := json.Unmarshal(byteIn, &data); err != nil { - fmt.Println(err) + logger.V(2).Error(err, "unmarshal error") return } - fmt.Println(data) - fmt.Println("RESPONSE", string(byteIn)) + + logger.V(5).Info("response", "data", data) name, err := filter.Name.Search(data) if err != nil { - fmt.Println(err) + klog.V(2).ErrorS(err, "error extracting name") return } - fmt.Println("GOT NAME", name) if name == nil || len(name.(string)) == 0 { return } namespace, err := filter.Namespace.Search(data) if err != nil { - fmt.Println(err) + logger.V(2).Error(err, "namespace extract error") return } - fmt.Println("GOT NAMESPACE", namespace) if namespace == nil { namespace = "" } nn := types.NamespacedName{Name: name.(string), Namespace: namespace.(string)} + logger.V(4).Info("response object", "namespacedName", nn.String()) // TODO: this should really be over a single channel to prevent // races on add/remove - fmt.Println(u.Relationship.Resource.ObjectId, cr.Permissionship) + logger.V(4).Info("result", "object", u.Relationship.Resource.ObjectId, "permission", cr.Permissionship) if cr.Permissionship == v1.CheckPermissionResponse_PERMISSIONSHIP_HAS_PERMISSION { authzData.allowedNNC <- nn } else { diff --git a/pkg/rules/consistency.go b/pkg/rules/consistency.go new file mode 100644 index 0000000..029e6bd --- /dev/null +++ b/pkg/rules/consistency.go @@ -0,0 +1,79 @@ +package rules + +import ( + "net/http" + "strings" + "time" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" +) + +const ( + WatchDelayKey = "SpiceDB-Watch-Delay" + ConsistencyHeaderKey = "SpiceDB-Consistency" + FullyConsistentHeaderValue = "Full" + AtExactSnapshotHeaderValuePrefix = "Exact " + AtLeastAsFreshHeaderValuePrefix = "At-Least " +) + +var MinimizeLatency = &v1.Consistency{ + Requirement: &v1.Consistency_MinimizeLatency{MinimizeLatency: true}, +} + +var FullyConsistent = &v1.Consistency{ + Requirement: &v1.Consistency_FullyConsistent{FullyConsistent: true}, +} + +// ConsistencyFromHeaders returns a consistency block given a consistency +// header. Defaults to Minimize Latency, otherwise returns the first match in order: +// - Fully Consistent +// - At Exact Snapshot +// - At Least As Fresh +// - Minimize Latency +func ConsistencyFromHeaders(headers http.Header) *v1.Consistency { + consistencyValue := headers.Get(ConsistencyHeaderKey) + if len(consistencyValue) == 0 { + return MinimizeLatency + } + + switch consistencyValue[0] { + case 'F': + fallthrough + case 'f': + return FullyConsistent + case 'E': + fallthrough + case 'e': + return &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: &v1.ZedToken{Token: strings.TrimPrefix(consistencyValue, AtExactSnapshotHeaderValuePrefix)}, + }, + } + case 'A': + fallthrough + case 'a': + return &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: &v1.ZedToken{Token: strings.TrimPrefix(consistencyValue, AtLeastAsFreshHeaderValuePrefix)}, + }, + } + } + + return MinimizeLatency +} + +// WatchDelayFromHeaders returns a time duration from the delay header. +// This is used to delay sending the check request to SpiceDB on a watch event, +// so that non-fully-consistent modes can be used with watch. +func WatchDelayFromHeaders(headers http.Header) time.Duration { + delay := headers.Get(WatchDelayKey) + if len(delay) == 0 { + return 0 + } + + duration, err := time.ParseDuration(delay) + if err != nil { + return 0 + } + return duration +} diff --git a/pkg/rules/consistency_test.go b/pkg/rules/consistency_test.go new file mode 100644 index 0000000..b0603ef --- /dev/null +++ b/pkg/rules/consistency_test.go @@ -0,0 +1,107 @@ +package rules + +import ( + "net/http" + "reflect" + "testing" + "time" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + "github.com/authzed/spicedb/pkg/datastore/revision" + "github.com/authzed/spicedb/pkg/zedtoken" + "github.com/shopspring/decimal" +) + +func MustIntZedToken(t int64) string { + return zedtoken.MustNewFromRevision(revision.NewFromDecimal(decimal.NewFromInt(t))).String() +} + +func TestConsistencyFromHeader(t *testing.T) { + tests := []struct { + name string + headers http.Header + want *v1.Consistency + }{ + { + name: "defaults to minimize latency", + want: MinimizeLatency, + }, + { + name: "fully consistent", + headers: map[string][]string{ + ConsistencyHeaderKey: {FullyConsistentHeaderValue}, + }, + want: FullyConsistent, + }, + { + name: "at exact snapshot", + headers: map[string][]string{ + ConsistencyHeaderKey: {AtExactSnapshotHeaderValuePrefix + MustIntZedToken(5)}, + }, + want: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: &v1.ZedToken{Token: MustIntZedToken(5)}, + }, + }, + }, + { + name: "at least as fresh", + headers: map[string][]string{ + ConsistencyHeaderKey: {AtLeastAsFreshHeaderValuePrefix + MustIntZedToken(3)}, + }, + want: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: &v1.ZedToken{Token: MustIntZedToken(3)}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := http.Header{} + for k, v := range tt.headers { + for _, vv := range v { + h.Set(k, vv) + } + } + if got := ConsistencyFromHeaders(h); !reflect.DeepEqual(got, tt.want) { + t.Errorf("ConsistencyFromHeaders() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestWatchDelayFromHeaders(t *testing.T) { + tests := []struct { + name string + headers http.Header + want time.Duration + }{ + { + name: "no headers", + want: time.Duration(0), + }, + { + name: "bad fmt", + headers: map[string][]string{WatchDelayKey: {"123qwewe"}}, + }, + { + name: "set delay", + headers: map[string][]string{WatchDelayKey: {"6s"}}, + want: time.Second * 6, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + h := http.Header{} + for k, v := range tt.headers { + for _, vv := range v { + h.Set(k, vv) + } + } + if got := WatchDelayFromHeaders(h); got != tt.want { + t.Errorf("WatchDelayFromHeaders() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index d3d9521..7430e4d 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -8,7 +8,9 @@ import ( "regexp" "slices" "strings" + "time" + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/kyverno/go-jmespath" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -137,6 +139,8 @@ type ResolveInput struct { Object *metav1.PartialObjectMetadata `json:"object"` Body []byte `json:"body"` Headers http.Header `json:"headers"` + Consistency *v1.Consistency `json:"consistency"` + WatchDelay time.Duration `json:"watchDelay"` } func NewResolveInputFromHttp(req *http.Request) (*ResolveInput, error) { @@ -209,6 +213,8 @@ func NewResolveInput(req *request.RequestInfo, user *user.DefaultInfo, object *m Object: object, Body: body, Headers: headers, + Consistency: ConsistencyFromHeaders(headers), + WatchDelay: WatchDelayFromHeaders(headers), } }