Skip to content

Commit 479c1d5

Browse files
committed
feat: add unified schema storage with ReadStoredSchema/WriteStoredSchema
Introduce the foundation for unified schema storage, where the entire compiled schema (definitions + schema text + hash) is stored as a single serialized proto blob, chunked across rows for SQL datastores. Datastore interface changes: - Add ReadStoredSchema(ctx, SchemaHash) to Reader interface - Add WriteStoredSchema(ctx, *StoredSchema) to ReadWriteTransaction - Add SchemaHash type with sentinel values for cache bypass DataLayer interface changes: - SnapshotReader now takes (Revision, SchemaHash) to thread cache keys - OptimizedRevision/HeadRevision return SchemaHash alongside Revision - Add SchemaMode configuration for migration path (legacy → dual → new) - Add storedSchemaReaderAdapter for reading from StoredSchema protos - Add writeSchemaViaStoredSchema for building and writing StoredSchema Storage implementation: - Add SQLByteChunker generic chunked blob storage for SQL datastores - Add SQLSingleStoreSchemaReaderWriter for read/write of StoredSchema - Add per-datastore ReadStoredSchema/WriteStoredSchema implementations for postgres, crdb, mysql, spanner, and memdb - Add schema table migrations for all SQL datastores - Add populate migrations to backfill from legacy namespace/caveat tables - Rename MySQL schema table to stored_schema (schema is a reserved word) Caching: - Add SchemaHashCache with LRU + singleflight for schema-by-hash lookups Proxy/middleware updates: - Add ReadStoredSchema/WriteStoredSchema pass-through to all datastore proxies (observable, counting, readonly, singleflight, indexcheck, strictreplicated, checkingreplicated, relationshipintegrity, hashcache) - Update consistency middleware for new HeadRevision/OptimizedRevision signatures Proto changes: - Add StoredSchema message to core.proto with V1StoredSchema containing schema_text, schema_hash, namespace_definitions, caveat_definitions
1 parent 0e16dd1 commit 479c1d5

File tree

148 files changed

+9624
-472
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

148 files changed

+9624
-472
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
1717
- feat(query planner): add recursive direction strategies, and fix IS BFS (https://github.com/authzed/spicedb/pull/2891)
1818
- feat(query planner): introduce query plan outlines and canonicalization (https://github.com/authzed/spicedb/pull/2901)
1919
- Schema v2: introduces support for PostOrder traversal in walk.go (https://github.com/authzed/spicedb/pull/2761) and improve PostOrder walker cycle detection (https://github.com/authzed/spicedb/pull/2902)
20+
- Experimental: Add unified schema storage with ReadStoredSchema/WriteStoredSchema for improved schema read performance (https://github.com/authzed/spicedb/pull/2924)
2021

2122
### Changed
2223
- Begin deprecation of library "github.com/dlmiddlecote/sqlstats" (https://github.com/authzed/spicedb/pull/2904).

docs/spicedb.md

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/caveats/run_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func TestRunCaveatExpressions(t *testing.T) {
471471
req.NoError(err)
472472

473473
dl := datalayer.NewDataLayer(ds)
474-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
474+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
475475
req.NoError(err)
476476

477477
for _, debugOption := range []RunCaveatExpressionDebugOption{
@@ -524,7 +524,7 @@ func TestRunCaveatWithMissingMap(t *testing.T) {
524524
req.NoError(err)
525525

526526
dl := datalayer.NewDataLayer(ds)
527-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
527+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
528528
req.NoError(err)
529529

530530
result, err := RunSingleCaveatExpression(
@@ -556,7 +556,7 @@ func TestRunCaveatWithEmptyMap(t *testing.T) {
556556
req.NoError(err)
557557

558558
dl := datalayer.NewDataLayer(ds)
559-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
559+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
560560
req.NoError(err)
561561

562562
_, err = RunSingleCaveatExpression(
@@ -594,7 +594,7 @@ func TestRunCaveatMultipleTimes(t *testing.T) {
594594
req.NoError(err)
595595

596596
dl := datalayer.NewDataLayer(ds)
597-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
597+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
598598
req.NoError(err)
599599

600600
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -662,7 +662,7 @@ func TestRunCaveatWithMissingDefinition(t *testing.T) {
662662
req.NoError(err)
663663

664664
dl := datalayer.NewDataLayer(ds)
665-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
665+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
666666
req.NoError(err)
667667

668668
// Try to run a caveat that doesn't exist
@@ -697,7 +697,7 @@ func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
697697
req.NoError(err)
698698

699699
dl := datalayer.NewDataLayer(ds)
700-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
700+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
701701
req.NoError(err)
702702

703703
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -742,7 +742,7 @@ func TestCaveatRunnerEmptyExpression(t *testing.T) {
742742
req.NoError(err)
743743

744744
dl := datalayer.NewDataLayer(ds)
745-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
745+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
746746
req.NoError(err)
747747

748748
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -823,7 +823,7 @@ func TestUnknownCaveatOperation(t *testing.T) {
823823
req.NoError(err)
824824

825825
dl := datalayer.NewDataLayer(ds)
826-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
826+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
827827
req.NoError(err)
828828

829829
runner := NewCaveatRunner(types.Default.TypeSet)

internal/datastore/common/chunkbytes.go

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,17 @@ import (
66
"fmt"
77

88
sq "github.com/Masterminds/squirrel"
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/attribute"
11+
12+
"github.com/authzed/spicedb/internal/telemetry/otelconv"
913
)
1014

15+
var tracer = otel.Tracer("spicedb/internal/datastore/common")
16+
17+
// ErrNoChunksFound is returned when no chunks are found for a given key.
18+
var ErrNoChunksFound = errors.New("no chunks found")
19+
1120
// ChunkedBytesTransaction defines the interface for executing SQL queries within a transaction.
1221
type ChunkedBytesTransaction interface {
1322
// ExecuteWrite executes an INSERT query.
@@ -81,48 +90,70 @@ type SQLByteChunkerConfig[T any] struct {
8190
AliveValue T
8291
}
8392

93+
// WithExecutor returns a copy of the config with the specified executor.
94+
func (c SQLByteChunkerConfig[T]) WithExecutor(executor ChunkedBytesExecutor) SQLByteChunkerConfig[T] {
95+
c.Executor = executor
96+
return c
97+
}
98+
99+
// WithTableName returns a copy of the config with the specified table name.
100+
func (c SQLByteChunkerConfig[T]) WithTableName(tableName string) SQLByteChunkerConfig[T] {
101+
c.TableName = tableName
102+
return c
103+
}
104+
84105
// SQLByteChunker provides methods for reading and writing byte data
85106
// that is chunked across multiple rows in a SQL table.
86107
type SQLByteChunker[T any] struct {
87108
config SQLByteChunkerConfig[T]
88109
}
89110

90-
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
91-
// Panics if the configuration is invalid.
92-
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
111+
// NewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
112+
// Returns an error if the configuration is invalid.
113+
func NewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) (*SQLByteChunker[T], error) {
93114
if config.MaxChunkSize <= 0 {
94-
panic("maxChunkSize must be greater than 0")
115+
return nil, errors.New("maxChunkSize must be greater than 0")
95116
}
96117
if config.TableName == "" {
97-
panic("tableName cannot be empty")
118+
return nil, errors.New("tableName cannot be empty")
98119
}
99120
if config.NameColumn == "" {
100-
panic("nameColumn cannot be empty")
121+
return nil, errors.New("nameColumn cannot be empty")
101122
}
102123
if config.ChunkIndexColumn == "" {
103-
panic("chunkIndexColumn cannot be empty")
124+
return nil, errors.New("chunkIndexColumn cannot be empty")
104125
}
105126
if config.ChunkDataColumn == "" {
106-
panic("chunkDataColumn cannot be empty")
127+
return nil, errors.New("chunkDataColumn cannot be empty")
107128
}
108129
if config.PlaceholderFormat == nil {
109-
panic("placeholderFormat cannot be nil")
130+
return nil, errors.New("placeholderFormat cannot be nil")
110131
}
111132
if config.Executor == nil {
112-
panic("executor cannot be nil")
133+
return nil, errors.New("executor cannot be nil")
113134
}
114135
if config.WriteMode == WriteModeInsertWithTombstones {
115136
if config.CreatedAtColumn == "" {
116-
panic("createdAtColumn is required when using WriteModeInsertWithTombstones")
137+
return nil, errors.New("createdAtColumn is required when using WriteModeInsertWithTombstones")
117138
}
118139
if config.DeletedAtColumn == "" {
119-
panic("deletedAtColumn is required when using WriteModeInsertWithTombstones")
140+
return nil, errors.New("deletedAtColumn is required when using WriteModeInsertWithTombstones")
120141
}
121142
}
122143

123144
return &SQLByteChunker[T]{
124145
config: config,
146+
}, nil
147+
}
148+
149+
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
150+
// Panics if the configuration is invalid.
151+
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
152+
chunker, err := NewSQLByteChunker(config)
153+
if err != nil {
154+
panic(err)
125155
}
156+
return chunker
126157
}
127158

128159
// WriteChunkedBytes writes chunked byte data to the database within a transaction.
@@ -143,6 +174,13 @@ func (c *SQLByteChunker[T]) WriteChunkedBytes(
143174
return errors.New("name cannot be empty")
144175
}
145176

177+
ctx, span := tracer.Start(ctx, "WriteChunkedBytes")
178+
defer span.End()
179+
span.SetAttributes(
180+
attribute.String(otelconv.AttrSchemaDefinitionName, name),
181+
attribute.Int(otelconv.AttrSchemaDataSizeBytes, len(data)),
182+
)
183+
146184
// Begin transaction
147185
txn, err := c.config.Executor.BeginTransaction(ctx)
148186
if err != nil {
@@ -186,6 +224,7 @@ func (c *SQLByteChunker[T]) WriteChunkedBytes(
186224
// Handle empty data case - insert a single empty chunk
187225
chunks = [][]byte{{}}
188226
}
227+
span.SetAttributes(attribute.Int(otelconv.AttrSchemaChunkCount, len(chunks)))
189228

190229
// Set up the columns - base columns plus created_at (if using tombstone mode)
191230
columns := []string{c.config.NameColumn, c.config.ChunkIndexColumn, c.config.ChunkDataColumn}
@@ -230,6 +269,10 @@ func (c *SQLByteChunker[T]) DeleteChunkedBytes(
230269
return errors.New("name cannot be empty")
231270
}
232271

272+
ctx, span := tracer.Start(ctx, "DeleteChunkedBytes")
273+
defer span.End()
274+
span.SetAttributes(attribute.String(otelconv.AttrSchemaDefinitionName, name))
275+
233276
// Begin transaction
234277
txn, err := c.config.Executor.BeginTransaction(ctx)
235278
if err != nil {
@@ -279,6 +322,10 @@ func (c *SQLByteChunker[T]) ReadChunkedBytes(
279322
return nil, errors.New("name cannot be empty")
280323
}
281324

325+
ctx, span := tracer.Start(ctx, "ReadChunkedBytes")
326+
defer span.End()
327+
span.SetAttributes(attribute.String(otelconv.AttrSchemaDefinitionName, name))
328+
282329
selectBuilder := sq.StatementBuilder.
283330
PlaceholderFormat(c.config.PlaceholderFormat).
284331
Select(c.config.ChunkIndexColumn, c.config.ChunkDataColumn).
@@ -292,20 +339,26 @@ func (c *SQLByteChunker[T]) ReadChunkedBytes(
292339
return nil, fmt.Errorf("failed to read chunks: %w", err)
293340
}
294341

342+
span.SetAttributes(
343+
attribute.Int(otelconv.AttrSchemaChunkCount, len(chunks)),
344+
)
345+
295346
// Reassemble the chunks
296347
data, err := c.reassembleChunks(chunks)
297348
if err != nil {
298349
return nil, fmt.Errorf("failed to reassemble chunks: %w", err)
299350
}
300351

352+
span.SetAttributes(attribute.Int(otelconv.AttrSchemaDataSizeBytes, len(data)))
353+
301354
return data, nil
302355
}
303356

304357
// reassembleChunks takes the chunks read from the database and reassembles them
305358
// into the original byte array. It validates that all chunks are present and in order.
306359
func (c *SQLByteChunker[T]) reassembleChunks(chunks map[int][]byte) ([]byte, error) {
307360
if len(chunks) == 0 {
308-
return nil, errors.New("no chunks found")
361+
return nil, ErrNoChunksFound
309362
}
310363

311364
// Validate that we have all chunks from 0 to N-1 and calculate total size
@@ -338,7 +391,10 @@ func (c *SQLByteChunker[T]) chunkData(data []byte) [][]byte {
338391
chunks := make([][]byte, 0, numChunks)
339392

340393
for i := 0; i < len(data); i += c.config.MaxChunkSize {
341-
end := min(i+c.config.MaxChunkSize, len(data))
394+
end := i + c.config.MaxChunkSize
395+
if end > len(data) {
396+
end = len(data)
397+
}
342398
chunks = append(chunks, data[i:end])
343399
}
344400

internal/datastore/common/chunkbytes_test.go

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,17 @@ type fakeExecutor struct {
5757
readResult map[int][]byte
5858
readErr error
5959
transaction *fakeTransaction
60+
onRead func() // Optional callback invoked on each read
6061
}
6162

6263
func (m *fakeExecutor) BeginTransaction(ctx context.Context) (ChunkedBytesTransaction, error) {
6364
return m.transaction, nil
6465
}
6566

6667
func (m *fakeExecutor) ExecuteRead(ctx context.Context, builder sq.SelectBuilder) (map[int][]byte, error) {
68+
if m.onRead != nil {
69+
m.onRead()
70+
}
6771
if m.readErr != nil {
6872
return nil, m.readErr
6973
}
@@ -392,6 +396,7 @@ func TestReadChunkedBytes(t *testing.T) {
392396
name string
393397
chunks map[int][]byte
394398
expectedData []byte
399+
expectedErr error
395400
expectedError string
396401
}{
397402
{
@@ -418,9 +423,9 @@ func TestReadChunkedBytes(t *testing.T) {
418423
expectedData: []byte{},
419424
},
420425
{
421-
name: "no chunks",
422-
chunks: map[int][]byte{},
423-
expectedError: "no chunks found",
426+
name: "no chunks",
427+
chunks: map[int][]byte{},
428+
expectedErr: ErrNoChunksFound,
424429
},
425430
{
426431
name: "missing chunk in sequence",
@@ -450,10 +455,13 @@ func TestReadChunkedBytes(t *testing.T) {
450455

451456
data, err := chunker.ReadChunkedBytes(context.Background(), "test-key")
452457

453-
if tt.expectedError != "" {
458+
switch {
459+
case tt.expectedErr != nil:
460+
require.ErrorIs(t, err, tt.expectedErr)
461+
case tt.expectedError != "":
454462
require.Error(t, err)
455463
require.Contains(t, err.Error(), tt.expectedError)
456-
} else {
464+
default:
457465
require.NoError(t, err)
458466
require.Equal(t, tt.expectedData, data)
459467
}
@@ -677,6 +685,49 @@ func TestWriteChunkedBytes_LargeData_DeleteAndInsert(t *testing.T) {
677685
require.Len(t, insertArgs, 33) // 11 chunks * 3 values per chunk
678686
}
679687

688+
func TestWithExecutor(t *testing.T) {
689+
executor1 := &fakeExecutor{}
690+
executor2 := &fakeExecutor{}
691+
692+
config := SQLByteChunkerConfig[uint64]{
693+
TableName: "test_table",
694+
NameColumn: "name",
695+
ChunkIndexColumn: "chunk_index",
696+
ChunkDataColumn: "chunk_data",
697+
MaxChunkSize: 1024,
698+
PlaceholderFormat: sq.Question,
699+
Executor: executor1,
700+
WriteMode: WriteModeDeleteAndInsert,
701+
}
702+
703+
// WithExecutor should return a copy with the new executor.
704+
newConfig := config.WithExecutor(executor2)
705+
require.Equal(t, executor2, newConfig.Executor)
706+
707+
// Original should be unchanged.
708+
require.Equal(t, executor1, config.Executor)
709+
}
710+
711+
func TestWithTableName(t *testing.T) {
712+
config := SQLByteChunkerConfig[uint64]{
713+
TableName: "original_table",
714+
NameColumn: "name",
715+
ChunkIndexColumn: "chunk_index",
716+
ChunkDataColumn: "chunk_data",
717+
MaxChunkSize: 1024,
718+
PlaceholderFormat: sq.Question,
719+
Executor: &fakeExecutor{},
720+
WriteMode: WriteModeDeleteAndInsert,
721+
}
722+
723+
// WithTableName should return a copy with the new table name.
724+
newConfig := config.WithTableName("new_table")
725+
require.Equal(t, "new_table", newConfig.TableName)
726+
727+
// Original should be unchanged.
728+
require.Equal(t, "original_table", config.TableName)
729+
}
730+
680731
func TestWriteChunkedBytes_LargeData_InsertWithTombstones(t *testing.T) {
681732
txn := &fakeTransaction{}
682733
executor := &fakeExecutor{transaction: txn}

0 commit comments

Comments
 (0)