Skip to content

Commit 61106a9

Browse files
committed
feat: add unified schema storage with ReadStoredSchema/WriteStoredSchema
Introduce the foundation for unified schema storage, where the entire compiled schema (definitions + schema text + hash) is stored as a single serialized proto blob, chunked across rows for SQL datastores. Datastore interface changes: - Add ReadStoredSchema(ctx, SchemaHash) to Reader interface - Add WriteStoredSchema(ctx, *StoredSchema) to ReadWriteTransaction - Add SchemaHash type with sentinel values for cache bypass DataLayer interface changes: - SnapshotReader now takes (Revision, SchemaHash) to thread cache keys - OptimizedRevision/HeadRevision return SchemaHash alongside Revision - Add SchemaMode configuration for migration path (legacy → dual → new) - Add storedSchemaReaderAdapter for reading from StoredSchema protos - Add writeSchemaViaStoredSchema for building and writing StoredSchema Storage implementation: - Add SQLByteChunker generic chunked blob storage for SQL datastores - Add SQLSingleStoreSchemaReaderWriter for read/write of StoredSchema - Add per-datastore ReadStoredSchema/WriteStoredSchema implementations for postgres, crdb, mysql, spanner, and memdb - Add schema table migrations for all SQL datastores - Add populate migrations to backfill from legacy namespace/caveat tables - Rename MySQL schema table to stored_schema (schema is a reserved word) Caching: - Add SchemaHashCache with LRU + singleflight for schema-by-hash lookups Proxy/middleware updates: - Add ReadStoredSchema/WriteStoredSchema pass-through to all datastore proxies (observable, counting, readonly, singleflight, indexcheck, strictreplicated, checkingreplicated, relationshipintegrity, hashcache) - Update consistency middleware for new HeadRevision/OptimizedRevision signatures Proto changes: - Add StoredSchema message to core.proto with V1StoredSchema containing schema_text, schema_hash, namespace_definitions, caveat_definitions
1 parent fd6ca35 commit 61106a9

File tree

127 files changed

+7047
-409
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

127 files changed

+7047
-409
lines changed

internal/caveats/run_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ func TestRunCaveatExpressions(t *testing.T) {
471471
req.NoError(err)
472472

473473
dl := datalayer.NewDataLayer(ds)
474-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
474+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
475475
req.NoError(err)
476476

477477
for _, debugOption := range []RunCaveatExpressionDebugOption{
@@ -524,7 +524,7 @@ func TestRunCaveatWithMissingMap(t *testing.T) {
524524
req.NoError(err)
525525

526526
dl := datalayer.NewDataLayer(ds)
527-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
527+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
528528
req.NoError(err)
529529

530530
result, err := RunSingleCaveatExpression(
@@ -556,7 +556,7 @@ func TestRunCaveatWithEmptyMap(t *testing.T) {
556556
req.NoError(err)
557557

558558
dl := datalayer.NewDataLayer(ds)
559-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
559+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
560560
req.NoError(err)
561561

562562
_, err = RunSingleCaveatExpression(
@@ -594,7 +594,7 @@ func TestRunCaveatMultipleTimes(t *testing.T) {
594594
req.NoError(err)
595595

596596
dl := datalayer.NewDataLayer(ds)
597-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
597+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
598598
req.NoError(err)
599599

600600
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -662,7 +662,7 @@ func TestRunCaveatWithMissingDefinition(t *testing.T) {
662662
req.NoError(err)
663663

664664
dl := datalayer.NewDataLayer(ds)
665-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
665+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
666666
req.NoError(err)
667667

668668
// Try to run a caveat that doesn't exist
@@ -697,7 +697,7 @@ func TestCaveatRunnerPopulateCaveatDefinitionsForExpr(t *testing.T) {
697697
req.NoError(err)
698698

699699
dl := datalayer.NewDataLayer(ds)
700-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
700+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
701701
req.NoError(err)
702702

703703
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -742,7 +742,7 @@ func TestCaveatRunnerEmptyExpression(t *testing.T) {
742742
req.NoError(err)
743743

744744
dl := datalayer.NewDataLayer(ds)
745-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
745+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
746746
req.NoError(err)
747747

748748
runner := NewCaveatRunner(types.Default.TypeSet)
@@ -823,7 +823,7 @@ func TestUnknownCaveatOperation(t *testing.T) {
823823
req.NoError(err)
824824

825825
dl := datalayer.NewDataLayer(ds)
826-
sr, err := dl.SnapshotReader(headRevision).ReadSchema()
826+
sr, err := dl.SnapshotReader(headRevision, datastore.NoSchemaHashForTesting).ReadSchema()
827827
req.NoError(err)
828828

829829
runner := NewCaveatRunner(types.Default.TypeSet)

internal/datastore/common/chunkbytes.go

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
sq "github.com/Masterminds/squirrel"
99
)
1010

11+
// ErrNoChunksFound is returned when no chunks are found for a given key.
12+
var ErrNoChunksFound = errors.New("no chunks found")
13+
1114
// ChunkedBytesTransaction defines the interface for executing SQL queries within a transaction.
1215
type ChunkedBytesTransaction interface {
1316
// ExecuteWrite executes an INSERT query.
@@ -81,48 +84,70 @@ type SQLByteChunkerConfig[T any] struct {
8184
AliveValue T
8285
}
8386

87+
// WithExecutor returns a copy of the config with the specified executor.
88+
func (c SQLByteChunkerConfig[T]) WithExecutor(executor ChunkedBytesExecutor) SQLByteChunkerConfig[T] {
89+
c.Executor = executor
90+
return c
91+
}
92+
93+
// WithTableName returns a copy of the config with the specified table name.
94+
func (c SQLByteChunkerConfig[T]) WithTableName(tableName string) SQLByteChunkerConfig[T] {
95+
c.TableName = tableName
96+
return c
97+
}
98+
8499
// SQLByteChunker provides methods for reading and writing byte data
85100
// that is chunked across multiple rows in a SQL table.
86101
type SQLByteChunker[T any] struct {
87102
config SQLByteChunkerConfig[T]
88103
}
89104

90-
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
91-
// Panics if the configuration is invalid.
92-
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
105+
// NewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
106+
// Returns an error if the configuration is invalid.
107+
func NewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) (*SQLByteChunker[T], error) {
93108
if config.MaxChunkSize <= 0 {
94-
panic("maxChunkSize must be greater than 0")
109+
return nil, errors.New("maxChunkSize must be greater than 0")
95110
}
96111
if config.TableName == "" {
97-
panic("tableName cannot be empty")
112+
return nil, errors.New("tableName cannot be empty")
98113
}
99114
if config.NameColumn == "" {
100-
panic("nameColumn cannot be empty")
115+
return nil, errors.New("nameColumn cannot be empty")
101116
}
102117
if config.ChunkIndexColumn == "" {
103-
panic("chunkIndexColumn cannot be empty")
118+
return nil, errors.New("chunkIndexColumn cannot be empty")
104119
}
105120
if config.ChunkDataColumn == "" {
106-
panic("chunkDataColumn cannot be empty")
121+
return nil, errors.New("chunkDataColumn cannot be empty")
107122
}
108123
if config.PlaceholderFormat == nil {
109-
panic("placeholderFormat cannot be nil")
124+
return nil, errors.New("placeholderFormat cannot be nil")
110125
}
111126
if config.Executor == nil {
112-
panic("executor cannot be nil")
127+
return nil, errors.New("executor cannot be nil")
113128
}
114129
if config.WriteMode == WriteModeInsertWithTombstones {
115130
if config.CreatedAtColumn == "" {
116-
panic("createdAtColumn is required when using WriteModeInsertWithTombstones")
131+
return nil, errors.New("createdAtColumn is required when using WriteModeInsertWithTombstones")
117132
}
118133
if config.DeletedAtColumn == "" {
119-
panic("deletedAtColumn is required when using WriteModeInsertWithTombstones")
134+
return nil, errors.New("deletedAtColumn is required when using WriteModeInsertWithTombstones")
120135
}
121136
}
122137

123138
return &SQLByteChunker[T]{
124139
config: config,
140+
}, nil
141+
}
142+
143+
// MustNewSQLByteChunker creates a new SQLByteChunker with the specified configuration.
144+
// Panics if the configuration is invalid.
145+
func MustNewSQLByteChunker[T any](config SQLByteChunkerConfig[T]) *SQLByteChunker[T] {
146+
chunker, err := NewSQLByteChunker(config)
147+
if err != nil {
148+
panic(err)
125149
}
150+
return chunker
126151
}
127152

128153
// WriteChunkedBytes writes chunked byte data to the database within a transaction.
@@ -305,7 +330,7 @@ func (c *SQLByteChunker[T]) ReadChunkedBytes(
305330
// into the original byte array. It validates that all chunks are present and in order.
306331
func (c *SQLByteChunker[T]) reassembleChunks(chunks map[int][]byte) ([]byte, error) {
307332
if len(chunks) == 0 {
308-
return nil, errors.New("no chunks found")
333+
return nil, ErrNoChunksFound
309334
}
310335

311336
// Validate that we have all chunks from 0 to N-1 and calculate total size
@@ -338,7 +363,10 @@ func (c *SQLByteChunker[T]) chunkData(data []byte) [][]byte {
338363
chunks := make([][]byte, 0, numChunks)
339364

340365
for i := 0; i < len(data); i += c.config.MaxChunkSize {
341-
end := min(i+c.config.MaxChunkSize, len(data))
366+
end := i + c.config.MaxChunkSize
367+
if end > len(data) {
368+
end = len(data)
369+
}
342370
chunks = append(chunks, data[i:end])
343371
}
344372

internal/datastore/common/chunkbytes_test.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,17 @@ type fakeExecutor struct {
5757
readResult map[int][]byte
5858
readErr error
5959
transaction *fakeTransaction
60+
onRead func() // Optional callback invoked on each read
6061
}
6162

6263
func (m *fakeExecutor) BeginTransaction(ctx context.Context) (ChunkedBytesTransaction, error) {
6364
return m.transaction, nil
6465
}
6566

6667
func (m *fakeExecutor) ExecuteRead(ctx context.Context, builder sq.SelectBuilder) (map[int][]byte, error) {
68+
if m.onRead != nil {
69+
m.onRead()
70+
}
6771
if m.readErr != nil {
6872
return nil, m.readErr
6973
}
@@ -392,6 +396,7 @@ func TestReadChunkedBytes(t *testing.T) {
392396
name string
393397
chunks map[int][]byte
394398
expectedData []byte
399+
expectedErr error
395400
expectedError string
396401
}{
397402
{
@@ -418,9 +423,9 @@ func TestReadChunkedBytes(t *testing.T) {
418423
expectedData: []byte{},
419424
},
420425
{
421-
name: "no chunks",
422-
chunks: map[int][]byte{},
423-
expectedError: "no chunks found",
426+
name: "no chunks",
427+
chunks: map[int][]byte{},
428+
expectedErr: ErrNoChunksFound,
424429
},
425430
{
426431
name: "missing chunk in sequence",
@@ -450,10 +455,13 @@ func TestReadChunkedBytes(t *testing.T) {
450455

451456
data, err := chunker.ReadChunkedBytes(context.Background(), "test-key")
452457

453-
if tt.expectedError != "" {
458+
switch {
459+
case tt.expectedErr != nil:
460+
require.ErrorIs(t, err, tt.expectedErr)
461+
case tt.expectedError != "":
454462
require.Error(t, err)
455463
require.Contains(t, err.Error(), tt.expectedError)
456-
} else {
464+
default:
457465
require.NoError(t, err)
458466
require.Equal(t, tt.expectedData, data)
459467
}

0 commit comments

Comments
 (0)