Skip to content

Commit 324b399

Browse files
committed
perf: add proper relationships chunk cache for LR3
1 parent 4069fc9 commit 324b399

File tree

22 files changed

+383
-107
lines changed

22 files changed

+383
-107
lines changed

internal/dispatch/cluster/cluster.go

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cluster
22

33
import (
4+
"fmt"
45
"time"
56

67
"github.com/authzed/spicedb/internal/dispatch"
@@ -16,13 +17,15 @@ import (
1617
type Option func(*optionState)
1718

1819
type optionState struct {
19-
metricsEnabled bool
20-
prometheusSubsystem string
21-
cache cache.Cache[keys.DispatchCacheKey, any]
22-
concurrencyLimits graph.ConcurrencyLimits
23-
remoteDispatchTimeout time.Duration
24-
dispatchChunkSize uint16
25-
caveatTypeSet *caveattypes.TypeSet
20+
metricsEnabled bool
21+
prometheusSubsystem string
22+
cache cache.Cache[keys.DispatchCacheKey, any]
23+
concurrencyLimits graph.ConcurrencyLimits
24+
remoteDispatchTimeout time.Duration
25+
dispatchChunkSize uint16
26+
caveatTypeSet *caveattypes.TypeSet
27+
relationshipChunkCacheConfig *cache.Config
28+
relationshipChunkCache cache.Cache[cache.StringKey, any]
2629
}
2730

2831
// MetricsEnabled enables issuing prometheus metrics
@@ -68,6 +71,20 @@ func RemoteDispatchTimeout(remoteDispatchTimeout time.Duration) Option {
6871
}
6972
}
7073

74+
// RelationshipChunkCacheConfig sets the cache config for LR3 relationship chunks.
75+
func RelationshipChunkCacheConfig(config *cache.Config) Option {
76+
return func(state *optionState) {
77+
state.relationshipChunkCacheConfig = config
78+
}
79+
}
80+
81+
// RelationshipChunkCache sets the cache for LR3 relationship chunks.
82+
func RelationshipChunkCache(cache cache.Cache[cache.StringKey, any]) Option {
83+
return func(state *optionState) {
84+
state.relationshipChunkCache = cache
85+
}
86+
}
87+
7188
// CaveatTypeSet sets the type set to use for caveats. If not specified, the default
7289
// type set is used.
7390
func CaveatTypeSet(caveatTypeSet *caveattypes.TypeSet) Option {
@@ -92,7 +109,40 @@ func NewClusterDispatcher(dispatch dispatch.Dispatcher, options ...Option) (disp
92109
}
93110

94111
cts := caveattypes.TypeSetOrDefault(opts.caveatTypeSet)
95-
clusterDispatch := graph.NewDispatcher(dispatch, cts, opts.concurrencyLimits, opts.dispatchChunkSize)
112+
113+
// Use provided cache or create one from config
114+
var relationshipChunkCache cache.Cache[cache.StringKey, any]
115+
if opts.relationshipChunkCache != nil {
116+
relationshipChunkCache = opts.relationshipChunkCache
117+
} else {
118+
// Default RelationshipChunkCacheConfig if not provided
119+
relationshipChunkCacheConfig := opts.relationshipChunkCacheConfig
120+
if relationshipChunkCacheConfig == nil {
121+
relationshipChunkCacheConfig = &cache.Config{
122+
NumCounters: 1e4, // 10k
123+
MaxCost: 1 << 20, // 1MB
124+
DefaultTTL: 30 * time.Second,
125+
}
126+
}
127+
128+
// Create cache from config
129+
var err error
130+
relationshipChunkCache, err = cache.NewStandardCache[cache.StringKey, any](relationshipChunkCacheConfig)
131+
if err != nil {
132+
return nil, fmt.Errorf("failed to create relationship chunk cache: %w", err)
133+
}
134+
}
135+
136+
params := graph.DispatcherParameters{
137+
ConcurrencyLimits: opts.concurrencyLimits,
138+
TypeSet: cts,
139+
DispatchChunkSize: opts.dispatchChunkSize,
140+
RelationshipChunkCache: relationshipChunkCache,
141+
}
142+
clusterDispatch, err := graph.NewDispatcher(dispatch, params)
143+
if err != nil {
144+
return nil, fmt.Errorf("failed to create cluster dispatcher: %w", err)
145+
}
96146

97147
if opts.prometheusSubsystem == "" {
98148
opts.prometheusSubsystem = "dispatch"

internal/dispatch/combined/combined.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type optionState struct {
4242
dispatchChunkSize uint16
4343
startingPrimaryHedgingDelay time.Duration
4444
caveatTypeSet *caveattypes.TypeSet
45+
relationshipChunkCacheConfig *cache.Config
46+
relationshipChunkCache cache.Cache[cache.StringKey, any]
4547
}
4648

4749
// MetricsEnabled enables issuing prometheus metrics
@@ -137,6 +139,20 @@ func DispatchChunkSize(dispatchChunkSize uint16) Option {
137139
}
138140
}
139141

142+
// RelationshipChunkCacheConfig sets the cache config for LR3 relationship chunks.
143+
func RelationshipChunkCacheConfig(config *cache.Config) Option {
144+
return func(state *optionState) {
145+
state.relationshipChunkCacheConfig = config
146+
}
147+
}
148+
149+
// RelationshipChunkCache sets the cache for LR3 relationship chunks.
150+
func RelationshipChunkCache(cache cache.Cache[cache.StringKey, any]) Option {
151+
return func(state *optionState) {
152+
state.relationshipChunkCache = cache
153+
}
154+
}
155+
140156
// RemoteDispatchTimeout sets the maximum timeout for a remote dispatch.
141157
// Defaults to 60s (as defined in the remote dispatcher).
142158
func RemoteDispatchTimeout(remoteDispatchTimeout time.Duration) Option {
@@ -187,7 +203,40 @@ func NewDispatcher(options ...Option) (dispatch.Dispatcher, error) {
187203
}
188204

189205
cts := caveattypes.TypeSetOrDefault(opts.caveatTypeSet)
190-
redispatch := graph.NewDispatcher(cachingRedispatch, cts, opts.concurrencyLimits, chunkSize)
206+
207+
// Use provided cache or create one from config
208+
var relationshipChunkCache cache.Cache[cache.StringKey, any]
209+
if opts.relationshipChunkCache != nil {
210+
relationshipChunkCache = opts.relationshipChunkCache
211+
} else {
212+
// Default RelationshipChunkCacheConfig if not provided
213+
relationshipChunkCacheConfig := opts.relationshipChunkCacheConfig
214+
if relationshipChunkCacheConfig == nil {
215+
relationshipChunkCacheConfig = &cache.Config{
216+
NumCounters: 1e4, // 10k
217+
MaxCost: 1 << 20, // 1MB
218+
DefaultTTL: 30 * time.Second,
219+
}
220+
}
221+
222+
// Create cache from config
223+
var err error
224+
relationshipChunkCache, err = cache.NewStandardCache[cache.StringKey, any](relationshipChunkCacheConfig)
225+
if err != nil {
226+
return nil, fmt.Errorf("failed to create relationship chunk cache: %w", err)
227+
}
228+
}
229+
230+
params := graph.DispatcherParameters{
231+
ConcurrencyLimits: opts.concurrencyLimits,
232+
TypeSet: cts,
233+
DispatchChunkSize: chunkSize,
234+
RelationshipChunkCache: relationshipChunkCache,
235+
}
236+
redispatch, err := graph.NewDispatcher(cachingRedispatch, params)
237+
if err != nil {
238+
return nil, fmt.Errorf("failed to create dispatcher: %w", err)
239+
}
191240
redispatch = singleflight.New(redispatch, &keys.CanonicalKeyHandler{})
192241

193242
// If an upstream is specified, create a cluster dispatcher.

internal/dispatch/graph/check_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
log "github.com/authzed/spicedb/internal/logging"
2020
datastoremw "github.com/authzed/spicedb/internal/middleware/datastore"
2121
"github.com/authzed/spicedb/internal/testfixtures"
22-
caveattypes "github.com/authzed/spicedb/pkg/caveats/types"
2322
"github.com/authzed/spicedb/pkg/datastore"
2423
"github.com/authzed/spicedb/pkg/genutil/mapz"
2524
core "github.com/authzed/spicedb/pkg/proto/core/v1"
@@ -171,7 +170,8 @@ func TestMaxDepth(t *testing.T) {
171170
revision, err := common.UpdateRelationshipsInDatastore(ctx, ds, mutation)
172171
require.NoError(err)
173172

174-
dispatch := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
173+
dispatch, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
174+
require.NoError(err)
175175

176176
_, err = dispatch.DispatchCheck(ctx, &v1.DispatchCheckRequest{
177177
ResourceRelation: RR("folder", "view").ToCoreRR(),
@@ -1419,7 +1419,8 @@ func TestCheckPermissionOverSchema(t *testing.T) {
14191419

14201420
require := require.New(t)
14211421

1422-
dispatcher := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
1422+
dispatcher, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
1423+
require.NoError(err)
14231424

14241425
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
14251426
require.NoError(err)
@@ -1924,7 +1925,8 @@ func TestCheckWithHints(t *testing.T) {
19241925

19251926
require := require.New(t)
19261927

1927-
dispatcher := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
1928+
dispatcher, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
1929+
require.NoError(err)
19281930

19291931
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
19301932
require.NoError(err)
@@ -1964,7 +1966,8 @@ func TestCheckHintsPartialApplication(t *testing.T) {
19641966
t.Parallel()
19651967
require := require.New(t)
19661968

1967-
dispatcher := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
1969+
dispatcher, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
1970+
require.NoError(err)
19681971

19691972
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
19701973
require.NoError(err)
@@ -2010,7 +2013,8 @@ func TestCheckHintsPartialApplicationOverArrow(t *testing.T) {
20102013
t.Parallel()
20112014
require := require.New(t)
20122015

2013-
dispatcher := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
2016+
dispatcher, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
2017+
require.NoError(err)
20142018

20152019
ds, err := dsfortesting.NewMemDBDatastoreForTesting(0, 0, memdb.DisableGC)
20162020
require.NoError(err)
@@ -2063,7 +2067,8 @@ func newLocalDispatcherWithConcurrencyLimit(t testing.TB, concurrencyLimit uint1
20632067

20642068
ds, revision := testfixtures.StandardDatastoreWithData(rawDS, require.New(t))
20652069

2066-
dispatch := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, concurrencyLimit, 100)
2070+
dispatch, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
2071+
require.NoError(t, err)
20672072

20682073
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{})
20692074
require.NoError(t, err)
@@ -2085,7 +2090,8 @@ func newLocalDispatcherWithSchemaAndRels(t testing.TB, schema string, rels []tup
20852090

20862091
ds, revision := testfixtures.DatastoreFromSchemaAndTestRelationships(rawDS, schema, rels, require.New(t))
20872092

2088-
dispatch := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
2093+
dispatch, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
2094+
require.NoError(t, err)
20892095

20902096
cachingDispatcher, err := caching.NewCachingDispatcher(caching.DispatchTestCache(t), false, "", &keys.CanonicalKeyHandler{})
20912097
require.NoError(t, err)

internal/dispatch/graph/expand_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
expand "github.com/authzed/spicedb/internal/graph"
2020
datastoremw "github.com/authzed/spicedb/internal/middleware/datastore"
2121
"github.com/authzed/spicedb/internal/testfixtures"
22-
caveattypes "github.com/authzed/spicedb/pkg/caveats/types"
2322
"github.com/authzed/spicedb/pkg/graph"
2423
core "github.com/authzed/spicedb/pkg/proto/core/v1"
2524
v1 "github.com/authzed/spicedb/pkg/proto/dispatch/v1"
@@ -295,7 +294,8 @@ func TestMaxDepthExpand(t *testing.T) {
295294
require.NoError(err)
296295
require.NoError(datastoremw.SetInContext(ctx, ds))
297296

298-
dispatch := NewLocalOnlyDispatcher(caveattypes.Default.TypeSet, 10, 100)
297+
dispatch, err := NewLocalOnlyDispatcher(MustNewDefaultDispatcherParametersForTesting())
298+
require.NoError(err)
299299

300300
_, err = dispatch.DispatchExpand(ctx, &v1.DispatchExpandRequest{
301301
ResourceAndRelation: tuple.CoreONR("folder", "oops", "view"),

0 commit comments

Comments
 (0)