Skip to content

Commit 9d4d4b7

Browse files
authored
refactor(query): add QueryDatastoreReader to act as an implementation shim for datastores (#2956)
## Description Currently, there are very few calls in pkg/query direct to the datastore interface. We'd like to keep this interface minimal, in an effort to start to clean up datastores over time as we continue with query planner. This has the added benefit of being a way to inject various testing scenarios directly with the query planner tests and benchmarks; for example, adding a slight delay on any call to simulate a network latency.
1 parent 70f05dd commit 9d4d4b7

28 files changed

+1130
-526
lines changed

internal/services/integrationtesting/query_plan_consistency_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type queryPlanConsistencyHandle struct {
4848

4949
func (q *queryPlanConsistencyHandle) buildContext(t *testing.T) *query.Context {
5050
return query.NewLocalContext(t.Context(),
51-
query.WithReader(datalayer.NewDataLayer(q.ds).SnapshotReader(q.revision)),
51+
query.WithRevisionedReader(datalayer.NewDataLayer(q.ds).SnapshotReader(q.revision)),
5252
query.WithCaveatRunner(caveats.NewCaveatRunner(caveattypes.Default.TypeSet)),
5353
query.WithTraceLogger(query.NewTraceLogger())) // Enable tracing for debugging
5454
}

internal/services/v1/permissions_queryplan.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (ps *permissionServer) checkPermissionWithQueryPlan(ctx context.Context, re
7373
qctx := &query.Context{
7474
Context: ctx,
7575
Executor: query.LocalExecutor{},
76-
Reader: reader,
76+
Reader: query.NewQueryDatastoreReader(reader),
7777
CaveatContext: caveatContext,
7878
CaveatRunner: caveatsimpl.NewCaveatRunner(ps.config.CaveatTypeSet),
7979
}

pkg/query/alias.go

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
11
package query
22

3-
import (
4-
"github.com/authzed/spicedb/pkg/datastore"
5-
"github.com/authzed/spicedb/pkg/datastore/options"
6-
)
7-
83
// AliasIterator is an iterator that rewrites the Resource's Relation field of all paths
94
// streamed from the sub-iterator to a specified alias relation.
105
type AliasIterator struct {
@@ -144,31 +139,7 @@ func (a *AliasIterator) shouldIncludeSelfEdge(ctx *Context, resource Object, fil
144139
// resourceExistsAsSubject queries the datastore to check if the given resource appears
145140
// as a subject in any relationship, including expired relationships.
146141
func (a *AliasIterator) resourceExistsAsSubject(ctx *Context, resource Object) (bool, error) {
147-
filter := datastore.RelationshipsFilter{
148-
OptionalSubjectsSelectors: []datastore.SubjectsSelector{{
149-
OptionalSubjectType: resource.ObjectType,
150-
OptionalSubjectIds: []string{resource.ObjectID},
151-
RelationFilter: datastore.SubjectRelationFilter{}.WithNonEllipsisRelation(a.relation),
152-
}},
153-
OptionalExpirationOption: datastore.ExpirationFilterOptionNone,
154-
}
155-
156-
iter, err := ctx.Reader.QueryRelationships(ctx, filter,
157-
options.WithLimit(options.LimitOne),
158-
options.WithSkipExpiration(true)) // Include expired relationships
159-
if err != nil {
160-
return false, err
161-
}
162-
163-
// Check if any relationship exists
164-
for _, err := range iter {
165-
if err != nil {
166-
return false, err
167-
}
168-
return true, nil
169-
}
170-
171-
return false, nil
142+
return ctx.Reader.SubjectExistsAsRelationship(ctx, resource, a.relation)
172143
}
173144

174145
func (a *AliasIterator) IterResourcesImpl(ctx *Context, subject ObjectAndRelation, filterResourceType ObjectType) (PathSeq, error) {

pkg/query/arrow_reversal_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) {
112112
resources := NewObjects("file", "file0")
113113
subject := NewObject("user", "user42").WithEllipses()
114114

115-
reader := datalayer.NewDataLayer(rawDS).SnapshotReader(revision)
115+
readerOpt := WithRevisionedReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))
116116

117117
// ---- plain (LTR) ----
118118

119119
plainTrace := NewTraceLogger()
120120
plainIt, err := canonicalOutline.Compile()
121121
require.NoError(t, err)
122122

123-
plainSeq, err := NewLocalContext(ctx, WithReader(reader), WithTraceLogger(plainTrace)).
123+
plainSeq, err := NewLocalContext(ctx, readerOpt, WithTraceLogger(plainTrace)).
124124
Check(plainIt, resources, subject)
125125
require.NoError(t, err)
126126
plainPaths, err := CollectAll(plainSeq)
@@ -132,7 +132,7 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) {
132132
obs := NewCountObserver()
133133
warmIt, err := canonicalOutline.Compile()
134134
require.NoError(t, err)
135-
warmSeq, err := NewLocalContext(ctx, WithReader(reader), WithObserver(obs)).
135+
warmSeq, err := NewLocalContext(ctx, readerOpt, WithObserver(obs)).
136136
Check(warmIt, resources, subject)
137137
require.NoError(t, err)
138138
_, err = CollectAll(warmSeq)
@@ -144,7 +144,7 @@ func TestDoubleWideArrowAdvisedMatchesPlain(t *testing.T) {
144144
require.NoError(t, err)
145145

146146
advisedTrace := NewTraceLogger()
147-
advisedSeq, err := NewLocalContext(ctx, WithReader(reader), WithTraceLogger(advisedTrace)).
147+
advisedSeq, err := NewLocalContext(ctx, readerOpt, WithTraceLogger(advisedTrace)).
148148
Check(advisedIt, resources, subject)
149149
require.NoError(t, err)
150150
advisedPaths, err := CollectAll(advisedSeq)

pkg/query/benchmarks/check_deep_arrow_benchmark_test.go

Lines changed: 129 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,22 @@ import (
2020

2121
// BenchmarkCheckDeepArrow benchmarks permission checking through a deep recursive chain.
2222
// This recreates the testharness scenario with:
23-
// - A 30+ level deep parent chain: document:target -> document:1 -> ... -> document:29
23+
// - A 30+ level deep parent chain: document:target -> document:1 -> ... -> document:30
2424
// - document:29#view@user:slow
2525
// - Checking if user:slow has viewer permission on document:target
2626
//
2727
// The permission viewer = view + parent->viewer creates a recursive traversal through
2828
// all 30+ levels to find the view relationship at the end of the chain.
29+
//
30+
// Four sub-benchmarks are run:
31+
// - plain: compile the outline directly and run Check each iteration
32+
// - advised: seed a CountAdvisor from a single warm-up run, apply it to the
33+
// canonical outline, compile once, then run Check each iteration
34+
// - plain_delay: same as plain, but with a delay reader simulating network latency
35+
// - advised_delay: same as advised, but with a delay reader simulating network latency
2936
func BenchmarkCheckDeepArrow(b *testing.B) {
30-
// Create an in-memory datastore
37+
// ---- shared setup ----
38+
3139
rawDS, err := memdb.NewMemdbDatastore(0, 0, memdb.DisableGC)
3240
require.NoError(b, err)
3341

@@ -43,76 +51,159 @@ func BenchmarkCheckDeepArrow(b *testing.B) {
4351
}
4452
`
4553

46-
// Compile the schema
4754
compiled, err := compiler.Compile(compiler.InputSchema{
4855
Source: input.Source("benchmark"),
4956
SchemaString: schemaText,
5057
}, compiler.AllowUnprefixedObjectType())
5158
require.NoError(b, err)
5259

53-
// Write the schema
5460
_, err = rawDS.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error {
5561
return rwt.LegacyWriteNamespaces(ctx, compiled.ObjectDefinitions...)
5662
})
5763
require.NoError(b, err)
5864

59-
// Build relationships for the deep arrow scenario
60-
// Create a chain: document:target -> document:1 -> document:2 -> ... -> document:30 -> document:31
65+
// Build relationships for the deep arrow scenario.
66+
// Chain: document:target -> document:1 -> document:2 -> ... -> document:30
6167
// Plus: document:29#view@user:slow
6268
relationships := make([]tuple.Relationship, 0, 33)
63-
64-
// document:target#parent@document:1
6569
relationships = append(relationships, tuple.MustParse("document:target#parent@document:1"))
66-
67-
// Chain: document:1 through document:30
6870
for i := 1; i <= 30; i++ {
6971
rel := fmt.Sprintf("document:%d#parent@document:%d", i, i+1)
7072
relationships = append(relationships, tuple.MustParse(rel))
7173
}
72-
73-
// The view relationship at the end of the chain
7474
relationships = append(relationships, tuple.MustParse("document:29#view@user:slow"))
7575

76-
// Write all relationships to the datastore
7776
revision, err := common.WriteRelationships(ctx, rawDS, tuple.UpdateOperationCreate, relationships...)
7877
require.NoError(b, err)
7978

80-
// Build schema for querying
8179
dsSchema, err := schema.BuildSchemaFromDefinitions(compiled.ObjectDefinitions, nil)
8280
require.NoError(b, err)
8381

84-
// Create the iterator tree for the viewer permission using BuildIteratorFromSchema
85-
viewerIterator, err := query.BuildIteratorFromSchema(dsSchema, "document", "viewer")
82+
// Build the canonical outline once; all sub-benchmarks derive from it.
83+
canonicalOutline, err := query.BuildOutlineFromSchema(dsSchema, "document", "viewer")
8684
require.NoError(b, err)
8785

88-
// Create query context
89-
queryCtx := query.NewLocalContext(ctx,
90-
query.WithReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision)),
91-
query.WithMaxRecursionDepth(50),
92-
)
93-
94-
// The resource we're checking: document:target
86+
// The resource and subject are the same for all sub-benchmarks.
9587
resources := query.NewObjects("document", "target")
96-
97-
// The subject we're checking: user:slow
9888
subject := query.NewObject("user", "slow").WithEllipses()
9989

100-
// Reset the timer - everything before this is setup
101-
b.ResetTimer()
90+
// Base reader (no simulated latency).
91+
reader := query.NewQueryDatastoreReader(datalayer.NewDataLayer(rawDS).SnapshotReader(revision))
10292

103-
// Run the benchmark
104-
for b.Loop() {
105-
// Check if user:slow can view document:target
106-
// This will traverse the entire 30+ level chain
107-
seq, err := queryCtx.Check(viewerIterator, resources, subject)
108-
require.NoError(b, err)
93+
// Delay reader wrapping the base reader with simulated network latency.
94+
delayReader := query.NewDelayReader(networkDelay, reader)
10995

110-
// Collect all results (should find user:slow at the end of the chain)
111-
paths, err := query.CollectAll(seq)
96+
// buildAdvisedIterator seeds a CountAdvisor from a single warm-up run using the
97+
// provided reader and returns the compiled advised iterator.
98+
buildAdvisedIterator := func(b *testing.B, r query.QueryDatastoreReader) query.Iterator {
99+
b.Helper()
100+
obs := query.NewCountObserver()
101+
warmIt, err := canonicalOutline.Compile()
102+
require.NoError(b, err)
103+
warmCtx := query.NewLocalContext(ctx,
104+
query.WithReader(r),
105+
query.WithObserver(obs),
106+
query.WithMaxRecursionDepth(50),
107+
)
108+
seq, err := warmCtx.Check(warmIt, resources, subject)
109+
require.NoError(b, err)
110+
_, err = query.CollectAll(seq)
112111
require.NoError(b, err)
113112

114-
// Verify we found the expected result
115-
require.Len(b, paths, 1)
116-
require.Equal(b, "slow", paths[0].Subject.ObjectID)
113+
advisor := query.NewCountAdvisor(obs.GetStats())
114+
advisedCO, err := query.ApplyAdvisor(canonicalOutline, advisor)
115+
require.NoError(b, err)
116+
advisedIt, err := advisedCO.Compile()
117+
require.NoError(b, err)
118+
return advisedIt
117119
}
120+
121+
// ---- plain sub-benchmark ----
122+
123+
b.Run("plain", func(b *testing.B) {
124+
it, err := canonicalOutline.Compile()
125+
require.NoError(b, err)
126+
127+
b.Log("plain explain:\n", it.Explain())
128+
129+
queryCtx := query.NewLocalContext(ctx,
130+
query.WithReader(reader),
131+
query.WithMaxRecursionDepth(50),
132+
)
133+
134+
b.ResetTimer()
135+
for b.Loop() {
136+
seq, err := queryCtx.Check(it, resources, subject)
137+
require.NoError(b, err)
138+
paths, err := query.CollectAll(seq)
139+
require.NoError(b, err)
140+
require.Len(b, paths, 1)
141+
require.Equal(b, "slow", paths[0].Subject.ObjectID)
142+
}
143+
})
144+
145+
// ---- advised sub-benchmark ----
146+
147+
b.Run("advised", func(b *testing.B) {
148+
advisedIt := buildAdvisedIterator(b, reader)
149+
150+
b.Log("advised explain:\n", advisedIt.Explain())
151+
152+
queryCtx := query.NewLocalContext(ctx,
153+
query.WithReader(reader),
154+
query.WithMaxRecursionDepth(50),
155+
)
156+
157+
b.ResetTimer()
158+
for b.Loop() {
159+
seq, err := queryCtx.Check(advisedIt, resources, subject)
160+
require.NoError(b, err)
161+
paths, err := query.CollectAll(seq)
162+
require.NoError(b, err)
163+
require.Len(b, paths, 1)
164+
require.Equal(b, "slow", paths[0].Subject.ObjectID)
165+
}
166+
})
167+
168+
// ---- plain_delay sub-benchmark ----
169+
170+
b.Run("plain_delay", func(b *testing.B) {
171+
it, err := canonicalOutline.Compile()
172+
require.NoError(b, err)
173+
174+
queryCtx := query.NewLocalContext(ctx,
175+
query.WithReader(delayReader),
176+
query.WithMaxRecursionDepth(50),
177+
)
178+
179+
b.ResetTimer()
180+
for b.Loop() {
181+
seq, err := queryCtx.Check(it, resources, subject)
182+
require.NoError(b, err)
183+
paths, err := query.CollectAll(seq)
184+
require.NoError(b, err)
185+
require.Len(b, paths, 1)
186+
require.Equal(b, "slow", paths[0].Subject.ObjectID)
187+
}
188+
})
189+
190+
// ---- advised_delay sub-benchmark ----
191+
192+
b.Run("advised_delay", func(b *testing.B) {
193+
advisedIt := buildAdvisedIterator(b, delayReader)
194+
queryCtx := query.NewLocalContext(ctx,
195+
query.WithReader(delayReader),
196+
query.WithMaxRecursionDepth(50),
197+
)
198+
199+
b.ResetTimer()
200+
for b.Loop() {
201+
seq, err := queryCtx.Check(advisedIt, resources, subject)
202+
require.NoError(b, err)
203+
paths, err := query.CollectAll(seq)
204+
require.NoError(b, err)
205+
require.Len(b, paths, 1)
206+
require.Equal(b, "slow", paths[0].Subject.ObjectID)
207+
}
208+
})
118209
}

0 commit comments

Comments
 (0)