Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- feat(query planner): add recursive direction strategies, and fix IS BFS (https://github.com/authzed/spicedb/pull/2891)
- feat(query planner): introduce query plan outlines and canonicalization (https://github.com/authzed/spicedb/pull/2901)
- Schema v2: introduces support for PostOrder traversal in walk.go (https://github.com/authzed/spicedb/pull/2761) and improve PostOrder walker cycle detection (https://github.com/authzed/spicedb/pull/2902)
- Experimental: Add unified schema storage with ReadStoredSchema/WriteStoredSchema for improved schema read performance (https://github.com/authzed/spicedb/pull/2924)

This feature stores the entire schema as a single serialized proto rather than reading individual namespace and caveat definitions separately, significantly improving schema read performance.

Migration to unified schema storage is controlled by the `--experimental-schema-mode` flag, which supports a 4-phase rolling migration:

1. `read-legacy-write-legacy` (default) - No change; reads and writes use legacy per-definition storage.
2. `read-legacy-write-both` - Reads from legacy storage, writes to both legacy and unified storage. This is the first migration step and backfills the unified schema table.
3. `read-new-write-both` - Reads from unified storage, writes to both. Validates the new read path while maintaining backward compatibility.
4. `read-new-write-new` - Reads and writes only unified storage. This is the final migration target.

**With the SpiceDB Operator:** Configure the operator to roll through stages 1 through 4 in sequence. The operator handles the rolling update of SpiceDB instances at each stage.

**Without the operator:** Progress through the stages manually by updating the `--experimental-schema-mode` flag and performing a rolling restart at each stage. You can also take the system down briefly and move directly from stage 1 to stage 4, which runs the full migration in one step.

### Changed
- Begin deprecation of library "github.com/dlmiddlecote/sqlstats" (https://github.com/authzed/spicedb/pull/2904).
Expand Down
5 changes: 5 additions & 0 deletions docs/spicedb.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions internal/caveats/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestRunCaveatExpressions(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

for _, debugOption := range []RunCaveatExpressionDebugOption{
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestRunCaveatWithMissingMap(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

result, err := RunSingleCaveatExpression(
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestRunCaveatWithEmptyMap(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

_, err = RunSingleCaveatExpression(
Expand Down Expand Up @@ -594,7 +594,7 @@ func TestRunCaveatMultipleTimes(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

runner := NewCaveatRunner(types.Default.TypeSet)
Expand Down Expand Up @@ -662,7 +662,7 @@ func TestRunCaveatWithMissingDefinition(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

// Try to run a caveat that doesn't exist
Expand Down Expand Up @@ -697,7 +697,7 @@ func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

runner := NewCaveatRunner(types.Default.TypeSet)
Expand Down Expand Up @@ -742,7 +742,7 @@ func TestCaveatRunnerEmptyExpression(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

runner := NewCaveatRunner(types.Default.TypeSet)
Expand Down Expand Up @@ -823,7 +823,7 @@ func TestUnknownCaveatOperation(t *testing.T) {
req.NoError(err)

dl := datalayer.NewDataLayer(ds)
sr, err := dl.SnapshotReader(headRevision).ReadSchema(t.Context())
sr, err := dl.SnapshotReader(headRevision, datalayer.NoSchemaHashForTesting).ReadSchema(t.Context())
req.NoError(err)

runner := NewCaveatRunner(types.Default.TypeSet)
Expand Down
84 changes: 70 additions & 14 deletions internal/datastore/common/chunkbytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@
"fmt"

sq "github.com/Masterminds/squirrel"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

"github.com/authzed/spicedb/internal/telemetry/otelconv"
)

var tracer = otel.Tracer("spicedb/internal/datastore/common")

// ErrNoChunksFound is returned when no chunks are found for a given key.
var ErrNoChunksFound = errors.New("no chunks found")

// ChunkedBytesTransaction defines the interface for executing SQL queries within a transaction.
type ChunkedBytesTransaction interface {
// ExecuteWrite executes an INSERT query.
Expand Down Expand Up @@ -81,48 +90,70 @@
AliveValue T
}

// WithExecutor returns a copy of the config with the specified executor.
func (c SQLByteChunkerConfig[T]) WithExecutor(executor ChunkedBytesExecutor) SQLByteChunkerConfig[T] {
c.Executor = executor
return c
}

// WithTableName returns a copy of the config with the specified table name.
func (c SQLByteChunkerConfig[T]) WithTableName(tableName string) SQLByteChunkerConfig[T] {
c.TableName = tableName
return c
}

// SQLByteChunker provides methods for reading and writing byte data
// that is chunked across multiple rows in a SQL table.
type SQLByteChunker[T any] struct {
config SQLByteChunkerConfig[T]
}

// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
// Panics if the configuration is invalid.
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
// NewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
// Returns an error if the configuration is invalid.
func NewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) (*SQLByteChunker[T], error) {
if config.MaxChunkSize <= 0 {
panic("maxChunkSize must be greater than 0")
return nil, errors.New("maxChunkSize must be greater than 0")
}
if config.TableName == "" {
panic("tableName cannot be empty")
return nil, errors.New("tableName cannot be empty")
}
if config.NameColumn == "" {
panic("nameColumn cannot be empty")
return nil, errors.New("nameColumn cannot be empty")

Check warning on line 121 in internal/datastore/common/chunkbytes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/chunkbytes.go#L121

Added line #L121 was not covered by tests
}
if config.ChunkIndexColumn == "" {
panic("chunkIndexColumn cannot be empty")
return nil, errors.New("chunkIndexColumn cannot be empty")

Check warning on line 124 in internal/datastore/common/chunkbytes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/chunkbytes.go#L124

Added line #L124 was not covered by tests
}
if config.ChunkDataColumn == "" {
panic("chunkDataColumn cannot be empty")
return nil, errors.New("chunkDataColumn cannot be empty")

Check warning on line 127 in internal/datastore/common/chunkbytes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/chunkbytes.go#L127

Added line #L127 was not covered by tests
}
if config.PlaceholderFormat == nil {
panic("placeholderFormat cannot be nil")
return nil, errors.New("placeholderFormat cannot be nil")
}
if config.Executor == nil {
panic("executor cannot be nil")
return nil, errors.New("executor cannot be nil")
}
if config.WriteMode == WriteModeInsertWithTombstones {
if config.CreatedAtColumn == "" {
panic("createdAtColumn is required when using WriteModeInsertWithTombstones")
return nil, errors.New("createdAtColumn is required when using WriteModeInsertWithTombstones")
}
if config.DeletedAtColumn == "" {
panic("deletedAtColumn is required when using WriteModeInsertWithTombstones")
return nil, errors.New("deletedAtColumn is required when using WriteModeInsertWithTombstones")
}
}

return &SQLByteChunker[T]{
config: config,
}, nil
}

// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
// Panics if the configuration is invalid.
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
chunker, err := NewSQLByteChunker(config)
if err != nil {
panic(err)
}
return chunker
}

// WriteChunkedBytes writes chunked byte data to the database within a transaction.
Expand All @@ -143,6 +174,13 @@
return errors.New("name cannot be empty")
}

ctx, span := tracer.Start(ctx, "WriteChunkedBytes")
defer span.End()
span.SetAttributes(
attribute.String(otelconv.AttrSchemaDefinitionName, name),
attribute.Int(otelconv.AttrSchemaDataSizeBytes, len(data)),
)

// Begin transaction
txn, err := c.config.Executor.BeginTransaction(ctx)
if err != nil {
Expand Down Expand Up @@ -186,6 +224,7 @@
// Handle empty data case - insert a single empty chunk
chunks = [][]byte{{}}
}
span.SetAttributes(attribute.Int(otelconv.AttrSchemaChunkCount, len(chunks)))

// Set up the columns - base columns plus created_at (if using tombstone mode)
columns := []string{c.config.NameColumn, c.config.ChunkIndexColumn, c.config.ChunkDataColumn}
Expand Down Expand Up @@ -230,6 +269,10 @@
return errors.New("name cannot be empty")
}

ctx, span := tracer.Start(ctx, "DeleteChunkedBytes")
defer span.End()
span.SetAttributes(attribute.String(otelconv.AttrSchemaDefinitionName, name))

// Begin transaction
txn, err := c.config.Executor.BeginTransaction(ctx)
if err != nil {
Expand Down Expand Up @@ -279,6 +322,10 @@
return nil, errors.New("name cannot be empty")
}

ctx, span := tracer.Start(ctx, "ReadChunkedBytes")
defer span.End()
span.SetAttributes(attribute.String(otelconv.AttrSchemaDefinitionName, name))

selectBuilder := sq.StatementBuilder.
PlaceholderFormat(c.config.PlaceholderFormat).
Select(c.config.ChunkIndexColumn, c.config.ChunkDataColumn).
Expand All @@ -292,20 +339,26 @@
return nil, fmt.Errorf("failed to read chunks: %w", err)
}

span.SetAttributes(
attribute.Int(otelconv.AttrSchemaChunkCount, len(chunks)),
)

// Reassemble the chunks
data, err := c.reassembleChunks(chunks)
if err != nil {
return nil, fmt.Errorf("failed to reassemble chunks: %w", err)
}

span.SetAttributes(attribute.Int(otelconv.AttrSchemaDataSizeBytes, len(data)))

return data, nil
}

// reassembleChunks takes the chunks read from the database and reassembles them
// into the original byte array. It validates that all chunks are present and in order.
func (c *SQLByteChunker[T]) reassembleChunks(chunks map[int][]byte) ([]byte, error) {
if len(chunks) == 0 {
return nil, errors.New("no chunks found")
return nil, ErrNoChunksFound
}

// Validate that we have all chunks from 0 to N-1 and calculate total size
Expand Down Expand Up @@ -338,7 +391,10 @@
chunks := make([][]byte, 0, numChunks)

for i := 0; i < len(data); i += c.config.MaxChunkSize {
end := min(i+c.config.MaxChunkSize, len(data))
end := i + c.config.MaxChunkSize
if end > len(data) {
end = len(data)
}
chunks = append(chunks, data[i:end])
}

Expand Down
Loading
Loading