Skip to content

Commit 0ccb2b1

Browse files
committed
feat: add unified schema storage with ReadStoredSchema/WriteStoredSchema
Introduce the foundation for unified schema storage, where the entire compiled schema (definitions + schema text + hash) is stored as a single serialized proto blob, chunked across rows for SQL datastores. Datastore interface changes: - Add ReadStoredSchema(ctx, SchemaHash) to Reader interface - Add WriteStoredSchema(ctx, *StoredSchema) to ReadWriteTransaction - Add SchemaHash type with sentinel values for cache bypass DataLayer interface changes: - SnapshotReader now takes (Revision, SchemaHash) to thread cache keys - OptimizedRevision/HeadRevision return SchemaHash alongside Revision - Add SchemaMode configuration for migration path (legacy → dual → new) - Add storedSchemaReaderAdapter for reading from StoredSchema protos - Add writeSchemaViaStoredSchema for building and writing StoredSchema Storage implementation: - Add SQLByteChunker generic chunked blob storage for SQL datastores - Add SQLSingleStoreSchemaReaderWriter for read/write of StoredSchema - Add per-datastore ReadStoredSchema/WriteStoredSchema implementations for postgres, crdb, mysql, spanner, and memdb - Add schema table migrations for all SQL datastores - Add populate migrations to backfill from legacy namespace/caveat tables - Rename MySQL schema table to stored_schema (schema is a reserved word) Caching: - Add SchemaHashCache with LRU + singleflight for schema-by-hash lookups Proxy/middleware updates: - Add ReadStoredSchema/WriteStoredSchema pass-through to all datastore proxies (observable, counting, readonly, singleflight, indexcheck, strictreplicated, checkingreplicated, relationshipintegrity, hashcache) - Update consistency middleware for new HeadRevision/OptimizedRevision signatures Proto changes: - Add StoredSchema message to core.proto with V1StoredSchema containing schema_text, schema_hash, namespace_definitions, caveat_definitions
1 parent b554261 commit 0ccb2b1

File tree

148 files changed

+9240
-704
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

148 files changed

+9240
-704
lines changed

CHANGELOG.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
1717
- feat(query planner): add recursive direction strategies, and fix IS BFS (https://github.com/authzed/spicedb/pull/2891)
1818
- feat(query planner): introduce query plan outlines and canonicalization (https://github.com/authzed/spicedb/pull/2901)
1919
- Schema v2: introduces support for PostOrder traversal in walk.go (https://github.com/authzed/spicedb/pull/2761) and improve PostOrder walker cycle detection (https://github.com/authzed/spicedb/pull/2902)
20+
- Experimental: Add unified schema storage with ReadStoredSchema/WriteStoredSchema for improved schema read performance (https://github.com/authzed/spicedb/pull/2924)
21+
22+
This feature stores the entire schema as a single serialized proto rather than reading individual namespace and caveat definitions separately, significantly improving schema read performance.
23+
24+
Migration to unified schema storage is controlled by the `--experimental-schema-mode` flag, which supports a 4-phase rolling migration:
25+
26+
1. `read-legacy-write-legacy` (default) - No change; reads and writes use legacy per-definition storage.
27+
2. `read-legacy-write-both` - Reads from legacy storage, writes to both legacy and unified storage. This is the first migration step and backfills the unified schema table.
28+
3. `read-new-write-both` - Reads from unified storage, writes to both. Validates the new read path while maintaining backward compatibility.
29+
4. `read-new-write-new` - Reads and writes only unified storage. This is the final migration target.
30+
31+
**With the SpiceDB Operator:** Configure the operator to roll through stages 1 through 4 in sequence. The operator handles the rolling update of SpiceDB instances at each stage.
32+
33+
**Without the operator:** Progress through the stages manually by updating the `--experimental-schema-mode` flag and performing a rolling restart at each stage. You can also take the system down briefly and move directly from stage 1 to stage 4, which runs the full migration in one step.
2034

2135
### Changed
2236
- Begin deprecation of library "github.com/dlmiddlecote/sqlstats" (https://github.com/authzed/spicedb/pull/2904).

docs/spicedb.md

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/caveats/run_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func TestRunCaveatExpressions(t *testing.T) {
471471
req.NoError(err)
472472

473473
dl := datalayer.NewDataLayer(ds)
474-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
474+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
475475
req.NoError(err)
476476

477477
for _, debugOption := range []RunCaveatExpressionDebugOption{
@@ -524,7 +524,7 @@ func TestRunCaveatWithMissingMap(t *testing.T) {
524524
req.NoError(err)
525525

526526
dl := datalayer.NewDataLayer(ds)
527-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
527+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
528528
req.NoError(err)
529529

530530
result, err := RunSingleCaveatExpression(
@@ -556,7 +556,7 @@ func TestRunCaveatWithEmptyMap(t *testing.T) {
556556
req.NoError(err)
557557

558558
dl := datalayer.NewDataLayer(ds)
559-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
559+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
560560
req.NoError(err)
561561

562562
_, err = RunSingleCaveatExpression(
@@ -594,7 +594,7 @@ func TestRunCaveatMultipleTimes(t *testing.T) {
594594
req.NoError(err)
595595

596596
dl := datalayer.NewDataLayer(ds)
597-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
597+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
598598
req.NoError(err)
599599

600600
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -662,7 +662,7 @@ func TestRunCaveatWithMissingDefinition(t *testing.T) {
662662
req.NoError(err)
663663

664664
dl := datalayer.NewDataLayer(ds)
665-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
665+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
666666
req.NoError(err)
667667

668668
// Try to run a caveat that doesn't exist
@@ -697,7 +697,7 @@ func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
697697
req.NoError(err)
698698

699699
dl := datalayer.NewDataLayer(ds)
700-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
700+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
701701
req.NoError(err)
702702

703703
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -742,7 +742,7 @@ func TestCaveatRunnerEmptyExpression(t *testing.T) {
742742
req.NoError(err)
743743

744744
dl := datalayer.NewDataLayer(ds)
745-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
745+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
746746
req.NoError(err)
747747

748748
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -823,7 +823,7 @@ func TestUnknownCaveatOperation(t *testing.T) {
823823
req.NoError(err)
824824

825825
dl := datalayer.NewDataLayer(ds)
826-
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
826+
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
827827
req.NoError(err)
828828

829829
runner := NewCaveatRunner(types.Default.TypeSet)

internal/datastore/common/chunkbytes.go

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,17 @@ import (
66
"fmt"
77

88
sq "github.com/Masterminds/squirrel"
9+
"go.opentelemetry.io/otel"
10+
"go.opentelemetry.io/otel/attribute"
11+
12+
"github.com/authzed/spicedb/internal/telemetry/otelconv"
913
)
1014

15+
var tracer = otel.Tracer("spicedb/internal/datastore/common")
16+
17+
// ErrNoChunksFound is returned when no chunks are found for a given key.
18+
var ErrNoChunksFound = errors.New("no chunks found")
19+
1120
// ChunkedBytesTransaction defines the interface for executing SQL queries within a transaction.
1221
type ChunkedBytesTransaction interface {
1322
// ExecuteWrite executes an INSERT query.
@@ -81,48 +90,70 @@ type SQLByteChunkerConfig[T any] struct {
8190
AliveValue T
8291
}
8392

93+
// WithExecutor returns a copy of the config with the specified executor.
94+
func (c SQLByteChunkerConfig[T]) WithExecutor(executor ChunkedBytesExecutor) SQLByteChunkerConfig[T] {
95+
c.Executor = executor
96+
return c
97+
}
98+
99+
// WithTableName returns a copy of the config with the specified table name.
100+
func (c SQLByteChunkerConfig[T]) WithTableName(tableName string) SQLByteChunkerConfig[T] {
101+
c.TableName = tableName
102+
return c
103+
}
104+
84105
// SQLByteChunker provides methods for reading and writing byte data
85106
// that is chunked across multiple rows in a SQL table.
86107
type SQLByteChunker[T any] struct {
87108
config SQLByteChunkerConfig[T]
88109
}
89110

90-
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
91-
// Panics if the configuration is invalid.
92-
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
111+
// NewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
112+
// Returns an error if the configuration is invalid.
113+
func NewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) (*SQLByteChunker[T], error) {
93114
if config.MaxChunkSize <= 0 {
94-
panic("maxChunkSize must be greater than 0")
115+
return nil, errors.New("maxChunkSize must be greater than 0")
95116
}
96117
if config.TableName == "" {
97-
panic("tableName cannot be empty")
118+
return nil, errors.New("tableName cannot be empty")
98119
}
99120
if config.NameColumn == "" {
100-
panic("nameColumn cannot be empty")
121+
return nil, errors.New("nameColumn cannot be empty")
101122
}
102123
if config.ChunkIndexColumn == "" {
103-
panic("chunkIndexColumn cannot be empty")
124+
return nil, errors.New("chunkIndexColumn cannot be empty")
104125
}
105126
if config.ChunkDataColumn == "" {
106-
panic("chunkDataColumn cannot be empty")
127+
return nil, errors.New("chunkDataColumn cannot be empty")
107128
}
108129
if config.PlaceholderFormat == nil {
109-
panic("placeholderFormat cannot be nil")
130+
return nil, errors.New("placeholderFormat cannot be nil")
110131
}
111132
if config.Executor == nil {
112-
panic("executor cannot be nil")
133+
return nil, errors.New("executor cannot be nil")
113134
}
114135
if config.WriteMode == WriteModeInsertWithTombstones {
115136
if config.CreatedAtColumn == "" {
116-
panic("createdAtColumn is required when using WriteModeInsertWithTombstones")
137+
return nil, errors.New("createdAtColumn is required when using WriteModeInsertWithTombstones")
117138
}
118139
if config.DeletedAtColumn == "" {
119-
panic("deletedAtColumn is required when using WriteModeInsertWithTombstones")
140+
return nil, errors.New("deletedAtColumn is required when using WriteModeInsertWithTombstones")
120141
}
121142
}
122143

123144
return &SQLByteChunker[T]{
124145
config: config,
146+
}, nil
147+
}
148+
149+
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
150+
// Panics if the configuration is invalid.
151+
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
152+
chunker, err := NewSQLByteChunker(config)
153+
if err != nil {
154+
panic(err)
125155
}
156+
return chunker
126157
}
127158

128159
// WriteChunkedBytes writes chunked byte data to the database within a transaction.
@@ -143,6 +174,13 @@ func (c *SQLByteChunker[T]) WriteChunkedBytes(
143174
return errors.New("name cannot be empty")
144175
}
145176

177+
ctx, span := tracer.Start(ctx, "WriteChunkedBytes")
178+
defer span.End()
179+
span.SetAttributes(
180+
attribute.String(otelconv.AttrSchemaDefinitionName, name),
181+
attribute.Int(otelconv.AttrSchemaDataSizeBytes, len(data)),
182+
)
183+
146184
// Begin transaction
147185
txn, err := c.config.Executor.BeginTransaction(ctx)
148186
if err != nil {
@@ -186,6 +224,7 @@ func (c *SQLByteChunker[T]) WriteChunkedBytes(
186224
// Handle empty data case - insert a single empty chunk
187225
chunks = [][]byte{{}}
188226
}
227+
span.SetAttributes(attribute.Int(otelconv.AttrSchemaChunkCount, len(chunks)))
189228

190229
// Set up the columns - base columns plus created_at (if using tombstone mode)
191230
columns := []string{c.config.NameColumn, c.config.ChunkIndexColumn, c.config.ChunkDataColumn}
@@ -230,6 +269,10 @@ func (c *SQLByteChunker[T]) DeleteChunkedBytes(
230269
return errors.New("name cannot be empty")
231270
}
232271

272+
ctx, span := tracer.Start(ctx, "DeleteChunkedBytes")
273+
defer span.End()
274+
span.SetAttributes(attribute.String(otelconv.AttrSchemaDefinitionName, name))
275+
233276
// Begin transaction
234277
txn, err := c.config.Executor.BeginTransaction(ctx)
235278
if err != nil {
@@ -279,6 +322,10 @@ func (c *SQLByteChunker[T]) ReadChunkedBytes(
279322
return nil, errors.New("name cannot be empty")
280323
}
281324

325+
ctx, span := tracer.Start(ctx, "ReadChunkedBytes")
326+
defer span.End()
327+
span.SetAttributes(attribute.String(otelconv.AttrSchemaDefinitionName, name))
328+
282329
selectBuilder := sq.StatementBuilder.
283330
PlaceholderFormat(c.config.PlaceholderFormat).
284331
Select(c.config.ChunkIndexColumn, c.config.ChunkDataColumn).
@@ -292,20 +339,26 @@ func (c *SQLByteChunker[T]) ReadChunkedBytes(
292339
return nil, fmt.Errorf("failed to read chunks: %w", err)
293340
}
294341

342+
span.SetAttributes(
343+
attribute.Int(otelconv.AttrSchemaChunkCount, len(chunks)),
344+
)
345+
295346
// Reassemble the chunks
296347
data, err := c.reassembleChunks(chunks)
297348
if err != nil {
298349
return nil, fmt.Errorf("failed to reassemble chunks: %w", err)
299350
}
300351

352+
span.SetAttributes(attribute.Int(otelconv.AttrSchemaDataSizeBytes, len(data)))
353+
301354
return data, nil
302355
}
303356

304357
// reassembleChunks takes the chunks read from the database and reassembles them
305358
// into the original byte array. It validates that all chunks are present and in order.
306359
func (c *SQLByteChunker[T]) reassembleChunks(chunks map[int][]byte) ([]byte, error) {
307360
if len(chunks) == 0 {
308-
return nil, errors.New("no chunks found")
361+
return nil, ErrNoChunksFound
309362
}
310363

311364
// Validate that we have all chunks from 0 to N-1 and calculate total size
@@ -338,7 +391,10 @@ func (c *SQLByteChunker[T]) chunkData(data []byte) [][]byte {
338391
chunks := make([][]byte, 0, numChunks)
339392

340393
for i := 0; i < len(data); i += c.config.MaxChunkSize {
341-
end := min(i+c.config.MaxChunkSize, len(data))
394+
end := i + c.config.MaxChunkSize
395+
if end > len(data) {
396+
end = len(data)
397+
}
342398
chunks = append(chunks, data[i:end])
343399
}
344400

0 commit comments

Comments
 (0)