Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b93d999
feat: add StoredSchema proto definition for unified schema storage
josephschorr Feb 5, 2026
10cf32b
feat: add schema mode configuration options
josephschorr Feb 5, 2026
1386ad4
feat: add schema reader/writer interfaces and adapters
josephschorr Feb 5, 2026
f68ff0e
feat: add schema storage tables to all SQL datastores
josephschorr Feb 5, 2026
057c678
feat: add schema chunking and SQL storage implementation
josephschorr Feb 5, 2026
ef7282b
feat: add schema hash watching for real-time change detection
josephschorr Feb 5, 2026
7048102
feat: integrate unified schema support into all datastores
josephschorr Feb 5, 2026
cba8cd0
perf: add revision-aware schema caching with checkpoint-based safety
josephschorr Feb 5, 2026
b203e78
feat: update datastore proxies for unified schema support
josephschorr Feb 5, 2026
08f2c42
test: add comprehensive unified schema test suite
josephschorr Feb 5, 2026
594a729
refactor: migrate from revision-based to hash-based schema cache
josephschorr Feb 9, 2026
79e36b9
feat: add schema mode variants to steelthread tests with benchmarking
josephschorr Feb 9, 2026
46795ee
fix: proxy layers now respect schema mode configuration
josephschorr Feb 9, 2026
163fe1f
perf: change to a map for the sentinels in the hashcache
josephschorr Feb 10, 2026
d955beb
chore: fix linting issues
josephschorr Feb 10, 2026
b726c95
fix: address lint and test issues for schema hash implementation
josephschorr Feb 10, 2026
502bd6c
fix: convert Spanner migrations from mutations to DML
josephschorr Feb 10, 2026
7fe3a22
fix: only write legacy schema hash in 'write to both' modes
josephschorr Feb 10, 2026
6eabefe
fix: use dual schema writer in MemDB to write both legacy and unified…
josephschorr Feb 10, 2026
918008e
refactor: remove unused SingleStoreSchemaHashWatcher interface
josephschorr Feb 12, 2026
2f16006
fix: buffered transactional writes in Spanner cannot be read
josephschorr Feb 12, 2026
bbbf8b0
fix: spanner fixes for the migrations and the unified schema test
josephschorr Feb 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: # yamllint disable-line rule:truthy
push:
branches:
- "main"
# NOTE! Do NOT add any other "on", because this workflow has permission to write to the repo!
# NOTE! Do NOT add any other "on", because this workflow has permission to write to the repo!

permissions:
# permission to update benchmark contents in gh-pages branch
Expand Down
3 changes: 3 additions & 0 deletions docs/spicedb.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions internal/caveats/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,10 @@ func TestRunCaveatExpressions(t *testing.T) {
third
}
`, nil, req)
headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)

for _, debugOption := range []RunCaveatExpressionDebugOption{
RunCaveatExpressionNoDebugging,
Expand Down Expand Up @@ -519,10 +519,10 @@ func TestRunCaveatWithMissingMap(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)

result, err := RunSingleCaveatExpression(
t.Context(),
Expand All @@ -549,10 +549,10 @@ func TestRunCaveatWithEmptyMap(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)

_, err = RunSingleCaveatExpression(
t.Context(),
Expand Down Expand Up @@ -585,10 +585,10 @@ func TestRunCaveatMultipleTimes(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)
runner := NewCaveatRunner(types.Default.TypeSet)

// Run the first caveat.
Expand Down Expand Up @@ -646,10 +646,10 @@ func TestRunCaveatWithMissingDefinition(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)

// Try to run a caveat that doesn't exist
_, err = RunSingleCaveatExpression(
Expand Down Expand Up @@ -679,10 +679,10 @@ func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)
runner := NewCaveatRunner(types.Default.TypeSet)

// Test populating definitions for complex expression
Expand Down Expand Up @@ -721,10 +721,10 @@ func TestCaveatRunnerEmptyExpression(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)
runner := NewCaveatRunner(types.Default.TypeSet)

// Test with an expression that has no caveats (empty operation)
Expand Down Expand Up @@ -799,10 +799,10 @@ func TestUnknownCaveatOperation(t *testing.T) {
}
`, nil, req)

headRevision, err := ds.HeadRevision(t.Context())
headRevision, _, err := ds.HeadRevision(t.Context())
req.NoError(err)

reader := ds.SnapshotReader(headRevision)
reader := ds.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting)
runner := NewCaveatRunner(types.Default.TypeSet)

// Create an expression with an unknown operation
Expand Down
14 changes: 7 additions & 7 deletions internal/datastore/benchmark/driver_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
// Sleep to give the datastore time to stabilize after all the writes
time.Sleep(1 * time.Second)

headRev, err := ds.HeadRevision(ctx)
headRev, _, err := ds.HeadRevision(ctx)
require.NoError(b, err)

b.StartTimer()
Expand All @@ -104,7 +104,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
b.Run("SnapshotRead", func(b *testing.B) {
for n := 0; n < b.N; n++ {
randDocNum := rand.Intn(numDocuments) //nolint:gosec
iter, err := ds.SnapshotReader(headRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
iter, err := ds.SnapshotReader(headRev, datastore.NoSchemaHashForTesting).QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: testfixtures.DocumentNS.Name,
OptionalResourceIds: []string{strconv.Itoa(randDocNum)},
OptionalResourceRelation: "viewer",
Expand All @@ -120,7 +120,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
})
b.Run("SnapshotReadOnlyNamespace", func(b *testing.B) {
for n := 0; n < b.N; n++ {
iter, err := ds.SnapshotReader(headRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
iter, err := ds.SnapshotReader(headRev, datastore.NoSchemaHashForTesting).QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: testfixtures.DocumentNS.Name,
}, options.WithQueryShape(queryshape.FindResourceOfType))
require.NoError(b, err)
Expand All @@ -136,7 +136,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
order := order
b.Run(orderName, func(b *testing.B) {
for n := 0; n < b.N; n++ {
iter, err := ds.SnapshotReader(headRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
iter, err := ds.SnapshotReader(headRev, datastore.NoSchemaHashForTesting).QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: testfixtures.DocumentNS.Name,
}, options.WithSort(order), options.WithQueryShape(queryshape.FindResourceOfType))
require.NoError(b, err)
Expand All @@ -154,7 +154,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
order := order
b.Run(orderName, func(b *testing.B) {
for n := 0; n < b.N; n++ {
iter, err := ds.SnapshotReader(headRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
iter, err := ds.SnapshotReader(headRev, datastore.NoSchemaHashForTesting).QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: testfixtures.DocumentNS.Name,
OptionalResourceRelation: "viewer",
}, options.WithSort(order), options.WithQueryShape(queryshape.Varying))
Expand All @@ -174,7 +174,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
b.Run(orderName, func(b *testing.B) {
for n := 0; n < b.N; n++ {
randDocNum := rand.Intn(numDocuments) //nolint:gosec
iter, err := ds.SnapshotReader(headRev).QueryRelationships(ctx, datastore.RelationshipsFilter{
iter, err := ds.SnapshotReader(headRev, datastore.NoSchemaHashForTesting).QueryRelationships(ctx, datastore.RelationshipsFilter{
OptionalResourceType: testfixtures.DocumentNS.Name,
OptionalResourceIds: []string{strconv.Itoa(randDocNum)},
OptionalResourceRelation: "viewer",
Expand All @@ -191,7 +191,7 @@ func BenchmarkDatastoreDriver(b *testing.B) {
})
b.Run("SnapshotReverseRead", func(b *testing.B) {
for n := 0; n < b.N; n++ {
iter, err := ds.SnapshotReader(headRev).ReverseQueryRelationships(ctx, datastore.SubjectsFilter{
iter, err := ds.SnapshotReader(headRev, datastore.NoSchemaHashForTesting).ReverseQueryRelationships(ctx, datastore.SubjectsFilter{
SubjectType: testfixtures.UserNS.Name,
}, options.WithSortForReverse(options.ByResource), options.WithQueryShapeForReverse(queryshape.Varying))
require.NoError(b, err)
Expand Down
46 changes: 34 additions & 12 deletions internal/datastore/common/chunkbytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,48 +81,70 @@
AliveValue T
}

// WithExecutor returns a copy of the config with the specified executor.
func (c SQLByteChunkerConfig[T]) WithExecutor(executor ChunkedBytesExecutor) SQLByteChunkerConfig[T] {
c.Executor = executor
return c
}

// WithTableName returns a copy of the config with the specified table name.
func (c SQLByteChunkerConfig[T]) WithTableName(tableName string) SQLByteChunkerConfig[T] {
c.TableName = tableName
return c
}

// SQLByteChunker provides methods for reading and writing byte data
// that is chunked across multiple rows in a SQL table.
type SQLByteChunker[T any] struct {
config SQLByteChunkerConfig[T]
}

// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
// Panics if the configuration is invalid.
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
// NewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
// Returns an error if the configuration is invalid.
func NewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) (*SQLByteChunker[T], error) {
if config.MaxChunkSize <= 0 {
panic("maxChunkSize must be greater than 0")
return nil, errors.New("maxChunkSize must be greater than 0")
}
if config.TableName == "" {
panic("tableName cannot be empty")
return nil, errors.New("tableName cannot be empty")
}
if config.NameColumn == "" {
panic("nameColumn cannot be empty")
return nil, errors.New("nameColumn cannot be empty")

Check warning on line 112 in internal/datastore/common/chunkbytes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/chunkbytes.go#L112

Added line #L112 was not covered by tests
}
if config.ChunkIndexColumn == "" {
panic("chunkIndexColumn cannot be empty")
return nil, errors.New("chunkIndexColumn cannot be empty")

Check warning on line 115 in internal/datastore/common/chunkbytes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/chunkbytes.go#L115

Added line #L115 was not covered by tests
}
if config.ChunkDataColumn == "" {
panic("chunkDataColumn cannot be empty")
return nil, errors.New("chunkDataColumn cannot be empty")

Check warning on line 118 in internal/datastore/common/chunkbytes.go

View check run for this annotation

Codecov / codecov/patch

internal/datastore/common/chunkbytes.go#L118

Added line #L118 was not covered by tests
}
if config.PlaceholderFormat == nil {
panic("placeholderFormat cannot be nil")
return nil, errors.New("placeholderFormat cannot be nil")
}
if config.Executor == nil {
panic("executor cannot be nil")
return nil, errors.New("executor cannot be nil")
}
if config.WriteMode == WriteModeInsertWithTombstones {
if config.CreatedAtColumn == "" {
panic("createdAtColumn is required when using WriteModeInsertWithTombstones")
return nil, errors.New("createdAtColumn is required when using WriteModeInsertWithTombstones")
}
if config.DeletedAtColumn == "" {
panic("deletedAtColumn is required when using WriteModeInsertWithTombstones")
return nil, errors.New("deletedAtColumn is required when using WriteModeInsertWithTombstones")
}
}

return &SQLByteChunker[T]{
config: config,
}, nil
}

// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
// Panics if the configuration is invalid.
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
chunker, err := NewSQLByteChunker(config)
if err != nil {
panic(err)
}
return chunker
}

// WriteChunkedBytes writes chunked byte data to the database within a transaction.
Expand Down
4 changes: 4 additions & 0 deletions internal/datastore/common/chunkbytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,17 @@ type fakeExecutor struct {
readResult map[int][]byte
readErr error
transaction *fakeTransaction
onRead func() // Optional callback invoked on each read
}

func (m *fakeExecutor) BeginTransaction(ctx context.Context) (ChunkedBytesTransaction, error) {
return m.transaction, nil
}

func (m *fakeExecutor) ExecuteRead(ctx context.Context, builder sq.SelectBuilder) (map[int][]byte, error) {
if m.onRead != nil {
m.onRead()
}
if m.readErr != nil {
return nil, m.readErr
}
Expand Down
Loading
Loading