Skip to content

Commit 7fe3a22

Browse files
josephschorrclaude
andcommitted
fix: only write legacy schema hash in 'write to both' modes
This change ensures that the legacy schema hash is ONLY written when in "write to both" modes (SchemaModeReadLegacyWriteBoth or SchemaModeReadNewWriteBoth), not in "write to legacy only" mode (SchemaModeReadLegacyWriteLegacy). Key changes: 1. Removed writeLegacySchemaHash() from individual legacy operations (LegacyWriteNamespaces, LegacyWriteCaveats, etc.) across all datastores to prevent write-write conflicts on schema_revision table in CRDB's serializable isolation. 2. Added WriteLegacySchemaHashFromDefinitions implementation to all datastores (CRDB, Postgres, MySQL, Spanner, MemDB) to support writing the hash from buffered definitions without requiring reads. 3. Moved hash-writing logic from LegacySchemaWriterAdapter.WriteSchema() to dualSchemaWriter.WriteSchema() to ensure the hash is only written when using the dual writer (i.e., in "write to both" modes). This fixes the CRDB test failure in TestCaveatSnapshotReads where write conflicts were occurring due to concurrent schema_revision updates, while ensuring the hash is still written correctly for unified schema testing. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 502bd6c commit 7fe3a22

File tree

11 files changed

+248
-374
lines changed

11 files changed

+248
-374
lines changed

internal/datastore/crdb/caveat.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,6 @@ func (rwt *crdbReadWriteTXN) LegacyWriteCaveats(ctx context.Context, caveats []*
156156
return fmt.Errorf(errWriteCaveat, err)
157157
}
158158

159-
// Write the schema hash to the schema_revision table for fast lookups
160-
if err := rwt.writeLegacySchemaHash(ctx); err != nil {
161-
return fmt.Errorf("failed to write schema hash: %w", err)
162-
}
163-
164159
return nil
165160
}
166161

@@ -177,10 +172,5 @@ func (rwt *crdbReadWriteTXN) LegacyDeleteCaveats(ctx context.Context, names []st
177172
return fmt.Errorf(errDeleteCaveats, err)
178173
}
179174

180-
// Write the schema hash to the schema_revision table for fast lookups
181-
if err := rwt.writeLegacySchemaHash(ctx); err != nil {
182-
return fmt.Errorf("failed to write schema hash: %w", err)
183-
}
184-
185175
return nil
186176
}

internal/datastore/crdb/readwrite.go

Lines changed: 44 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,6 @@ func (rwt *crdbReadWriteTXN) LegacyWriteNamespaces(ctx context.Context, newConfi
511511
return fmt.Errorf(errUnableToWriteConfig, err)
512512
}
513513

514-
// Write the schema hash to the schema_revision table for fast lookups
515-
if err := rwt.writeLegacySchemaHash(ctx); err != nil {
516-
return fmt.Errorf("failed to write schema hash: %w", err)
517-
}
518-
519514
return nil
520515
}
521516

@@ -571,11 +566,6 @@ func (rwt *crdbReadWriteTXN) LegacyDeleteNamespaces(ctx context.Context, nsNames
571566
rwt.relCountChange -= numRowsDeleted
572567
}
573568

574-
// Write the schema hash to the schema_revision table for fast lookups
575-
if err := rwt.writeLegacySchemaHash(ctx); err != nil {
576-
return fmt.Errorf("failed to write schema hash: %w", err)
577-
}
578-
579569
return nil
580570
}
581571

@@ -634,20 +624,53 @@ func (w *crdbSchemaWriter) writeSchemaHash(ctx context.Context, schema *core.Sto
634624
return nil
635625
}
636626

637-
// writeLegacySchemaHash writes the schema hash to the schema_revision table by generating
638-
// the schema from current legacy namespaces and caveats
639-
func (rwt *crdbReadWriteTXN) writeLegacySchemaHash(ctx context.Context) error {
640-
// Read all namespaces and caveats
641-
namespaces, err := rwt.LegacyListAllNamespaces(ctx)
642-
if err != nil {
643-
return fmt.Errorf("failed to list namespaces: %w", err)
627+
// ReadStoredSchema implements datastore.SingleStoreSchemaReader to satisfy DualSchemaReader interface requirements
628+
func (w *crdbSchemaWriter) ReadStoredSchema(ctx context.Context) (*core.StoredSchema, error) {
629+
// Create a revision-aware executor that applies AS OF SYSTEM TIME
630+
// Within a transaction, we don't use AS OF SYSTEM TIME, so pass empty atSpecificRevision
631+
executor := &revisionAwareExecutor{
632+
query: w.rwt.query,
633+
addFromToQuery: w.rwt.addFromToQuery,
634+
assertAsOfSysTime: w.rwt.assertHasExpectedAsOfSystemTime,
644635
}
645636

646-
caveats, err := rwt.LegacyListAllCaveats(ctx)
647-
if err != nil {
648-
return fmt.Errorf("failed to list caveats: %w", err)
649-
}
637+
// Use the shared schema reader/writer to read the schema
638+
// Pass empty string to bypass cache (transaction read)
639+
return w.rwt.schemaReaderWriter.ReadSchema(ctx, executor, nil, datastore.NoSchemaHashInTransaction)
640+
}
641+
642+
// LegacyWriteNamespaces delegates to the underlying transaction
643+
func (w *crdbSchemaWriter) LegacyWriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error {
644+
return w.rwt.LegacyWriteNamespaces(ctx, newConfigs...)
645+
}
646+
647+
// LegacyDeleteNamespaces delegates to the underlying transaction
648+
func (w *crdbSchemaWriter) LegacyDeleteNamespaces(ctx context.Context, nsNames []string, delOption datastore.DeleteNamespacesRelationshipsOption) error {
649+
return w.rwt.LegacyDeleteNamespaces(ctx, nsNames, delOption)
650+
}
651+
652+
// LegacyLookupNamespacesWithNames delegates to the underlying transaction
653+
func (w *crdbSchemaWriter) LegacyLookupNamespacesWithNames(ctx context.Context, nsNames []string) ([]datastore.RevisionedDefinition[*core.NamespaceDefinition], error) {
654+
return w.rwt.LegacyLookupNamespacesWithNames(ctx, nsNames)
655+
}
656+
657+
// LegacyWriteCaveats delegates to the underlying transaction
658+
func (w *crdbSchemaWriter) LegacyWriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error {
659+
return w.rwt.LegacyWriteCaveats(ctx, caveats)
660+
}
661+
662+
// LegacyDeleteCaveats delegates to the underlying transaction
663+
func (w *crdbSchemaWriter) LegacyDeleteCaveats(ctx context.Context, names []string) error {
664+
return w.rwt.LegacyDeleteCaveats(ctx, names)
665+
}
650666

667+
// WriteLegacySchemaHashFromDefinitions implements datastore.LegacySchemaHashWriter
668+
func (w *crdbSchemaWriter) WriteLegacySchemaHashFromDefinitions(ctx context.Context, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
669+
return w.rwt.writeLegacySchemaHashFromDefinitions(ctx, namespaces, caveats)
670+
}
671+
672+
// writeLegacySchemaHashFromDefinitions writes the schema hash computed from the given definitions
673+
func (rwt *crdbReadWriteTXN) writeLegacySchemaHashFromDefinitions(ctx context.Context, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
651674
// Build schema definitions list
652675
definitions := make([]compiler.SchemaDefinition, 0, len(namespaces)+len(caveats))
653676
for _, ns := range namespaces {
@@ -689,46 +712,6 @@ func (rwt *crdbReadWriteTXN) writeLegacySchemaHash(ctx context.Context) error {
689712
return nil
690713
}
691714

692-
// ReadStoredSchema implements datastore.SingleStoreSchemaReader to satisfy DualSchemaReader interface requirements
693-
func (w *crdbSchemaWriter) ReadStoredSchema(ctx context.Context) (*core.StoredSchema, error) {
694-
// Create a revision-aware executor that applies AS OF SYSTEM TIME
695-
// Within a transaction, we don't use AS OF SYSTEM TIME, so pass empty atSpecificRevision
696-
executor := &revisionAwareExecutor{
697-
query: w.rwt.query,
698-
addFromToQuery: w.rwt.addFromToQuery,
699-
assertAsOfSysTime: w.rwt.assertHasExpectedAsOfSystemTime,
700-
}
701-
702-
// Use the shared schema reader/writer to read the schema
703-
// Pass empty string to bypass cache (transaction read)
704-
return w.rwt.schemaReaderWriter.ReadSchema(ctx, executor, nil, datastore.NoSchemaHashInTransaction)
705-
}
706-
707-
// LegacyWriteNamespaces delegates to the underlying transaction
708-
func (w *crdbSchemaWriter) LegacyWriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error {
709-
return w.rwt.LegacyWriteNamespaces(ctx, newConfigs...)
710-
}
711-
712-
// LegacyDeleteNamespaces delegates to the underlying transaction
713-
func (w *crdbSchemaWriter) LegacyDeleteNamespaces(ctx context.Context, nsNames []string, delOption datastore.DeleteNamespacesRelationshipsOption) error {
714-
return w.rwt.LegacyDeleteNamespaces(ctx, nsNames, delOption)
715-
}
716-
717-
// LegacyLookupNamespacesWithNames delegates to the underlying transaction
718-
func (w *crdbSchemaWriter) LegacyLookupNamespacesWithNames(ctx context.Context, nsNames []string) ([]datastore.RevisionedDefinition[*core.NamespaceDefinition], error) {
719-
return w.rwt.LegacyLookupNamespacesWithNames(ctx, nsNames)
720-
}
721-
722-
// LegacyWriteCaveats delegates to the underlying transaction
723-
func (w *crdbSchemaWriter) LegacyWriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error {
724-
return w.rwt.LegacyWriteCaveats(ctx, caveats)
725-
}
726-
727-
// LegacyDeleteCaveats delegates to the underlying transaction
728-
func (w *crdbSchemaWriter) LegacyDeleteCaveats(ctx context.Context, names []string) error {
729-
return w.rwt.LegacyDeleteCaveats(ctx, names)
730-
}
731-
732715
// LegacyReadCaveatByName delegates to the underlying transaction
733716
func (w *crdbSchemaWriter) LegacyReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) {
734717
return w.rwt.LegacyReadCaveatByName(ctx, name)

internal/datastore/memdb/caveat.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,6 @@ func (rwt *memdbReadWriteTx) LegacyWriteCaveats(ctx context.Context, caveats []*
119119
return err
120120
}
121121

122-
// Write the schema hash to the schema_revision table for fast lookups
123-
if err := rwt.datastore.writeLegacySchemaHashInternalWithTx(tx); err != nil {
124-
return fmt.Errorf("failed to write schema hash: %w", err)
125-
}
126-
127122
return nil
128123
}
129124

@@ -162,10 +157,5 @@ func (rwt *memdbReadWriteTx) LegacyDeleteCaveats(ctx context.Context, names []st
162157
}
163158
}
164159

165-
// Write the schema hash to the schema_revision table for fast lookups
166-
if err := rwt.datastore.writeLegacySchemaHashInternalWithTx(tx); err != nil {
167-
return fmt.Errorf("failed to write schema hash: %w", err)
168-
}
169-
170160
return nil
171161
}

internal/datastore/memdb/memdb.go

Lines changed: 28 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -521,76 +521,42 @@ func (mdb *memdbDatastore) writeSchemaHashInternal(schema *corev1.StoredSchema)
521521
return nil
522522
}
523523

524-
// writeLegacySchemaHashInternalWithTx writes the schema hash to the schema_revision table by generating
525-
// the schema from current legacy namespaces and caveats using the provided transaction
526-
func (mdb *memdbDatastore) writeLegacySchemaHashInternalWithTx(tx *memdb.Txn) error {
527-
var schemaText string
528-
529-
// Check if we have stored schema (unified schema mode)
530-
if mdb.storedSchema != nil {
531-
v1 := mdb.storedSchema.GetV1()
532-
if v1 != nil && v1.SchemaText != "" {
533-
// Use the schema text from stored schema
534-
schemaText = v1.SchemaText
535-
}
524+
// writeLegacySchemaHashFromDefinitionsInternal writes the schema hash computed from the given definitions
525+
func (mdb *memdbDatastore) writeLegacySchemaHashFromDefinitionsInternal(ctx context.Context, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
526+
// Build schema definitions list
527+
definitions := make([]compiler.SchemaDefinition, 0, len(namespaces)+len(caveats))
528+
for _, ns := range namespaces {
529+
definitions = append(definitions, ns.Definition)
530+
}
531+
for _, caveat := range caveats {
532+
definitions = append(definitions, caveat.Definition)
536533
}
537534

538-
// If no stored schema, generate from legacy definitions with canonical ordering
539-
if schemaText == "" {
540-
// Read all namespaces
541-
iter, err := tx.Get(tableNamespace, indexID)
542-
if err != nil {
543-
return fmt.Errorf("failed to list namespaces: %w", err)
544-
}
545-
546-
var definitions []compiler.SchemaDefinition
547-
for row := iter.Next(); row != nil; row = iter.Next() {
548-
ns := row.(*namespace)
549-
definition := &corev1.NamespaceDefinition{}
550-
if err := definition.UnmarshalVT(ns.configBytes); err != nil {
551-
// If we can't unmarshal a definition, skip hash generation
552-
// This can happen if definitions are in an intermediate/invalid state
553-
return nil
554-
}
555-
definitions = append(definitions, definition)
556-
}
557-
558-
// Read all caveats
559-
caveatIter, err2 := tx.Get(tableCaveats, indexID)
560-
if err2 != nil {
561-
return fmt.Errorf("failed to list caveats: %w", err2)
562-
}
563-
564-
for row := caveatIter.Next(); row != nil; row = caveatIter.Next() {
565-
caveat := row.(*caveat)
566-
definition := &corev1.CaveatDefinition{}
567-
if err := definition.UnmarshalVT(caveat.definition); err != nil {
568-
// If we can't unmarshal a definition, skip hash generation
569-
// This can happen if definitions are in an intermediate/invalid state
570-
return nil
571-
}
572-
definitions = append(definitions, definition)
573-
}
535+
// Sort definitions by name for consistent ordering
536+
sort.Slice(definitions, func(i, j int) bool {
537+
return definitions[i].GetName() < definitions[j].GetName()
538+
})
574539

575-
// Sort definitions by name for consistent ordering
576-
sort.Slice(definitions, func(i, j int) bool {
577-
return definitions[i].GetName() < definitions[j].GetName()
578-
})
579-
580-
// Generate schema text from definitions
581-
var genErr error
582-
schemaText, _, genErr = generator.GenerateSchema(definitions)
583-
if genErr != nil {
584-
// If we can't generate schema, skip hash generation
585-
// This can happen if definitions are incomplete or invalid
586-
return nil
587-
}
540+
// Generate schema text from definitions
541+
schemaText, _, err := generator.GenerateSchema(definitions)
542+
if err != nil {
543+
return fmt.Errorf("failed to generate schema: %w", err)
588544
}
589545

590546
// Compute schema hash (SHA256)
591547
hash := sha256.Sum256([]byte(schemaText))
592548
schemaHash := hex.EncodeToString(hash[:])
593549

550+
mdb.Lock()
551+
defer mdb.Unlock()
552+
553+
if err := mdb.checkNotClosed(); err != nil {
554+
return err
555+
}
556+
557+
tx := mdb.db.Txn(true)
558+
defer tx.Abort()
559+
594560
// Delete existing hash (if any)
595561
if existing, err := tx.First(tableSchemaRevision, indexID, "current"); err == nil && existing != nil {
596562
if err := tx.Delete(tableSchemaRevision, existing); err != nil {
@@ -608,6 +574,7 @@ func (mdb *memdbDatastore) writeLegacySchemaHashInternalWithTx(tx *memdb.Txn) er
608574
return fmt.Errorf("failed to insert hash: %w", err)
609575
}
610576

577+
tx.Commit()
611578
return nil
612579
}
613580

internal/datastore/memdb/readwrite.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,6 @@ func (rwt *memdbReadWriteTx) LegacyWriteNamespaces(ctx context.Context, newConfi
304304
}
305305
}
306306

307-
// Write the schema hash to the schema_revision table for fast lookups
308-
if err := rwt.datastore.writeLegacySchemaHashInternalWithTx(tx); err != nil {
309-
return fmt.Errorf("failed to write schema hash: %w", err)
310-
}
311-
312307
return nil
313308
}
314309

@@ -349,11 +344,6 @@ func (rwt *memdbReadWriteTx) LegacyDeleteNamespaces(ctx context.Context, nsNames
349344
}
350345
}
351346

352-
// Write the schema hash to the schema_revision table for fast lookups
353-
if err := rwt.datastore.writeLegacySchemaHashInternalWithTx(tx); err != nil {
354-
return fmt.Errorf("failed to write schema hash: %w", err)
355-
}
356-
357347
return nil
358348
}
359349

@@ -366,6 +356,11 @@ func (rwt *memdbReadWriteTx) WriteStoredSchema(ctx context.Context, schema *core
366356
return rwt.datastore.writeStoredSchemaInternal(schema)
367357
}
368358

359+
// WriteLegacySchemaHashFromDefinitions implements datastore.LegacySchemaHashWriter
360+
func (rwt *memdbReadWriteTx) WriteLegacySchemaHashFromDefinitions(ctx context.Context, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
361+
return rwt.datastore.writeLegacySchemaHashFromDefinitionsInternal(ctx, namespaces, caveats)
362+
}
363+
369364
func (rwt *memdbReadWriteTx) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) {
370365
var numCopied uint64
371366
var next *tuple.Relationship

internal/datastore/mysql/caveat.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,6 @@ func (rwt *mysqlReadWriteTXN) LegacyWriteCaveats(ctx context.Context, caveats []
144144
return fmt.Errorf(errWriteCaveats, err)
145145
}
146146

147-
// Write the schema hash to the schema_revision table for fast lookups
148-
if err := rwt.writeLegacySchemaHash(ctx); err != nil {
149-
return fmt.Errorf("failed to write schema hash: %w", err)
150-
}
151-
152147
return nil
153148
}
154149

@@ -157,11 +152,6 @@ func (rwt *mysqlReadWriteTXN) LegacyDeleteCaveats(ctx context.Context, names []s
157152
return err
158153
}
159154

160-
// Write the schema hash to the schema_revision table for fast lookups
161-
if err := rwt.writeLegacySchemaHash(ctx); err != nil {
162-
return fmt.Errorf("failed to write schema hash: %w", err)
163-
}
164-
165155
return nil
166156
}
167157

0 commit comments

Comments
 (0)