Skip to content

Commit 0cff0d2

Browse files
josephschorrclaude
andcommitted
fix: use dual schema writer in MemDB to write both legacy and unified schemas
MemDB supports both legacy and unified schema storage, but was only using LegacySchemaWriterAdapter which after our changes no longer writes the legacy schema hash. This caused TestUnifiedSchemaHashWatch to fail with a timeout waiting for hash updates. Changes: 1. Updated SchemaWriter() to return a dualSchemaWriter that writes to both legacy and unified storage (SchemaModeReadNewWriteBoth) 2. Refactored schema writing methods to avoid deadlocks: - Added writeStoredSchemaNoLock() that uses the transaction's db handle without acquiring additional locks - Added writeSchemaHashNoLock() for transaction-scoped hash writes - Added writeLegacySchemaHashFromDefinitionsNoLock() for legacy hash 3. Updated WriteStoredSchema() and WriteLegacySchemaHashFromDefinitions() to use the transaction's db handle via txSource() This ensures MemDB writes to both storages and properly updates the schema hash for watchers, fixing the unified schema hash watch test. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 1c280b6 commit 0cff0d2

File tree

2 files changed

+27
-40
lines changed

2 files changed

+27
-40
lines changed

internal/datastore/memdb/memdb.go

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -461,45 +461,27 @@ func (mdb *memdbDatastore) readStoredSchemaInternal() (*corev1.StoredSchema, err
461461
return mdb.storedSchema.CloneVT(), nil
462462
}
463463

464-
// writeStoredSchemaInternal is an internal method for transactions to write the stored schema.
465-
// This should NOT be called directly - use transactions instead.
466-
func (mdb *memdbDatastore) writeStoredSchemaInternal(schema *corev1.StoredSchema) error {
467-
if schema == nil {
468-
return errors.New("stored schema cannot be nil")
469-
}
470-
471-
if schema.Version == 0 {
472-
return errors.New("stored schema version cannot be 0")
473-
}
474-
475-
mdb.Lock()
476-
defer mdb.Unlock()
477-
478-
if err := mdb.checkNotClosed(); err != nil {
479-
return err
480-
}
481-
464+
// writeStoredSchemaNoLock writes the stored schema using the provided transaction.
465+
// This is called from within an existing transaction, so it doesn't acquire locks or commit.
466+
func (mdb *memdbDatastore) writeStoredSchemaNoLock(tx *memdb.Txn, schema *corev1.StoredSchema) error {
482467
// Store a copy to prevent external mutations
483468
mdb.storedSchema = schema.CloneVT()
484469

485470
// Write the schema hash to the schema revision table for fast lookups
486-
if err := mdb.writeSchemaHashInternal(schema); err != nil {
471+
if err := mdb.writeSchemaHashNoLock(tx, schema); err != nil {
487472
return fmt.Errorf("failed to write schema hash: %w", err)
488473
}
489474

490475
return nil
491476
}
492477

493-
// writeSchemaHashInternal writes the schema hash to the in-memory schema revision table
494-
func (mdb *memdbDatastore) writeSchemaHashInternal(schema *corev1.StoredSchema) error {
478+
// writeSchemaHashNoLock writes the schema hash to the in-memory schema revision table using the provided transaction.
479+
func (mdb *memdbDatastore) writeSchemaHashNoLock(tx *memdb.Txn, schema *corev1.StoredSchema) error {
495480
v1 := schema.GetV1()
496481
if v1 == nil {
497482
return fmt.Errorf("unsupported schema version: %d", schema.Version)
498483
}
499484

500-
tx := mdb.db.Txn(true)
501-
defer tx.Abort()
502-
503485
// Delete existing hash (if any)
504486
if existing, err := tx.First(tableSchemaRevision, indexID, "current"); err == nil && existing != nil {
505487
if err := tx.Delete(tableSchemaRevision, existing); err != nil {
@@ -517,12 +499,14 @@ func (mdb *memdbDatastore) writeSchemaHashInternal(schema *corev1.StoredSchema)
517499
return fmt.Errorf("failed to insert hash: %w", err)
518500
}
519501

520-
tx.Commit()
502+
// Note: Don't commit here - the caller will commit the transaction
521503
return nil
522504
}
523505

524506
// writeLegacySchemaHashFromDefinitionsInternal writes the schema hash computed from the given definitions
525-
func (mdb *memdbDatastore) writeLegacySchemaHashFromDefinitionsInternal(ctx context.Context, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
507+
// writeLegacySchemaHashFromDefinitionsNoLock writes the schema hash using the provided transaction.
508+
// This is called from within an existing transaction, so it doesn't acquire locks.
509+
func (mdb *memdbDatastore) writeLegacySchemaHashFromDefinitionsNoLock(ctx context.Context, tx *memdb.Txn, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
526510
// Build schema definitions list
527511
definitions := make([]compiler.SchemaDefinition, 0, len(namespaces)+len(caveats))
528512
for _, ns := range namespaces {
@@ -547,16 +531,6 @@ func (mdb *memdbDatastore) writeLegacySchemaHashFromDefinitionsInternal(ctx cont
547531
hash := sha256.Sum256([]byte(schemaText))
548532
schemaHash := hex.EncodeToString(hash[:])
549533

550-
mdb.Lock()
551-
defer mdb.Unlock()
552-
553-
if err := mdb.checkNotClosed(); err != nil {
554-
return err
555-
}
556-
557-
tx := mdb.db.Txn(true)
558-
defer tx.Abort()
559-
560534
// Delete existing hash (if any)
561535
if existing, err := tx.First(tableSchemaRevision, indexID, "current"); err == nil && existing != nil {
562536
if err := tx.Delete(tableSchemaRevision, existing); err != nil {
@@ -574,7 +548,7 @@ func (mdb *memdbDatastore) writeLegacySchemaHashFromDefinitionsInternal(ctx cont
574548
return fmt.Errorf("failed to insert hash: %w", err)
575549
}
576550

577-
tx.Commit()
551+
// Note: Don't commit here - the caller will commit the transaction
578552
return nil
579553
}
580554

internal/datastore/memdb/readwrite.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,17 +348,30 @@ func (rwt *memdbReadWriteTx) LegacyDeleteNamespaces(ctx context.Context, nsNames
348348
}
349349

350350
func (rwt *memdbReadWriteTx) SchemaWriter() (datastore.SchemaWriter, error) {
351-
return schemautil.NewLegacySchemaWriterAdapter(rwt, rwt), nil
351+
// MemDB supports both legacy and unified schema storage.
352+
// Use write-to-both mode to ensure both are updated.
353+
return schemautil.NewSchemaWriter(rwt, rwt, options.SchemaModeReadNewWriteBoth), nil
352354
}
353355

354356
// WriteStoredSchema implements datastore.SingleStoreSchemaWriter
355357
func (rwt *memdbReadWriteTx) WriteStoredSchema(ctx context.Context, schema *core.StoredSchema) error {
356-
return rwt.datastore.writeStoredSchemaInternal(schema)
358+
// Called from within a transaction - use the transaction's db handle directly
359+
tx, err := rwt.txSource()
360+
if err != nil {
361+
return err
362+
}
363+
return rwt.datastore.writeStoredSchemaNoLock(tx, schema)
357364
}
358365

359366
// WriteLegacySchemaHashFromDefinitions implements datastore.LegacySchemaHashWriter
360367
func (rwt *memdbReadWriteTx) WriteLegacySchemaHashFromDefinitions(ctx context.Context, namespaces []datastore.RevisionedNamespace, caveats []datastore.RevisionedCaveat) error {
361-
return rwt.datastore.writeLegacySchemaHashFromDefinitionsInternal(ctx, namespaces, caveats)
368+
// Called from within a transaction - use the transaction's db handle directly
369+
// without trying to acquire additional locks
370+
tx, err := rwt.txSource()
371+
if err != nil {
372+
return err
373+
}
374+
return rwt.datastore.writeLegacySchemaHashFromDefinitionsNoLock(ctx, tx, namespaces, caveats)
362375
}
363376

364377
func (rwt *memdbReadWriteTx) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) {

0 commit comments

Comments
 (0)