Skip to content

Commit 918008e

Browse files
committed
refactor: remove unused SingleStoreSchemaHashWatcher interface
The SingleStoreSchemaHashWatcher interface and all its implementations were only used for testing and not in production code. This commit removes: - SingleStoreSchemaHashWatcher interface and SchemaWatchCallback type - All datastore schemawatch.go implementation files (postgres, mysql, crdb, spanner, memdb) - SchemaHashWatcherForTesting methods from all datastores and proxies - UnifiedSchemaHashWatchTest test function Changes made: - Inlined readSchemaHash logic into SchemaHashReaderForTesting methods - Added missing SchemaModeForTesting to observable proxy and validating datastore wrapper - Fixed test files to pass schema hash parameter to SnapshotReader - Fixed linter issues (perfsprint, testifylint) All datastore tests pass (postgres, mysql, crdb).
1 parent 6eabefe commit 918008e

File tree

20 files changed

+194
-833
lines changed

20 files changed

+194
-833
lines changed

internal/datastore/crdb/crdb.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -537,26 +537,29 @@ func (cds *crdbDatastore) SchemaHashReaderForTesting() interface {
537537
return &crdbSchemaHashReaderForTesting{query: cds.readPool}
538538
}
539539

540-
// SchemaHashWatcherForTesting returns a test-only interface for watching schema hash changes.
541-
func (cds *crdbDatastore) SchemaHashWatcherForTesting() datastore.SingleStoreSchemaHashWatcher {
542-
// Determine the schema changefeed query format based on the relationship changefeed query
543-
schemaChangefeedFormat := querySchemaChangefeed
544-
switch cds.beginChangefeedQuery {
545-
case queryChangefeedPreV22:
546-
schemaChangefeedFormat = querySchemaChangefeedPreV22
547-
case queryChangefeedPreV25:
548-
schemaChangefeedFormat = querySchemaChangefeedPreV25
549-
}
550-
return newCRDBSchemaHashWatcher(cds.readPool, schemaChangefeedFormat)
540+
// SchemaModeForTesting returns the current schema mode for testing purposes.
541+
func (cds *crdbDatastore) SchemaModeForTesting() (options.SchemaMode, error) {
542+
return cds.schemaMode, nil
551543
}
552544

553545
type crdbSchemaHashReaderForTesting struct {
554546
query *pool.RetryPool
555547
}
556548

557549
func (r *crdbSchemaHashReaderForTesting) ReadSchemaHash(ctx context.Context) (string, error) {
558-
watcher := &crdbSchemaHashWatcher{query: r.query}
559-
return watcher.readSchemaHash(ctx)
550+
var hashBytes []byte
551+
552+
err := r.query.QueryRowFunc(ctx, func(ctx context.Context, row pgx.Row) error {
553+
return row.Scan(&hashBytes)
554+
}, "SELECT hash FROM schema_revision WHERE name = 'current'")
555+
if err != nil {
556+
if errors.Is(err, pgx.ErrNoRows) {
557+
return "", datastore.ErrSchemaNotFound
558+
}
559+
return "", fmt.Errorf("failed to query schema hash: %w", err)
560+
}
561+
562+
return string(hashBytes), nil
560563
}
561564

562565
func (cds *crdbDatastore) HeadRevision(ctx context.Context) (datastore.Revision, datastore.SchemaHash, error) {

internal/datastore/crdb/schemawatch.go

Lines changed: 0 additions & 161 deletions
This file was deleted.

internal/datastore/dsfortesting/dsfortesting.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ func (vds validatingDatastore) SchemaHashReaderForTesting() interface {
6464
return nil
6565
}
6666

67-
// SchemaHashWatcherForTesting delegates to the underlying datastore if it implements the test interface
68-
func (vds validatingDatastore) SchemaHashWatcherForTesting() datastore.SingleStoreSchemaHashWatcher {
69-
type schemaHashWatcherProvider interface {
70-
SchemaHashWatcherForTesting() datastore.SingleStoreSchemaHashWatcher
67+
// SchemaModeForTesting delegates to the underlying datastore if it implements the test interface
68+
func (vds validatingDatastore) SchemaModeForTesting() (options.SchemaMode, error) {
69+
type schemaModeProvider interface {
70+
SchemaModeForTesting() (options.SchemaMode, error)
7171
}
7272

73-
if hashWatcher, ok := vds.Datastore.(schemaHashWatcherProvider); ok {
74-
return hashWatcher.SchemaHashWatcherForTesting()
73+
if provider, ok := vds.Datastore.(schemaModeProvider); ok {
74+
return provider.SchemaModeForTesting()
7575
}
76-
return nil
76+
return options.SchemaModeReadLegacyWriteLegacy, errors.New("delegate datastore does not implement SchemaModeForTesting()")
7777
}
7878

7979
type validatingReader struct {

internal/datastore/memdb/memdb.go

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -421,18 +421,38 @@ func (mdb *memdbDatastore) SchemaHashReaderForTesting() interface {
421421
return &memdbSchemaHashReaderForTesting{db: mdb}
422422
}
423423

424-
// SchemaHashWatcherForTesting returns a test-only interface for watching schema hash changes.
425-
func (mdb *memdbDatastore) SchemaHashWatcherForTesting() datastore.SingleStoreSchemaHashWatcher {
426-
return newMemdbSchemaHashWatcher(mdb)
424+
// SchemaModeForTesting returns the current schema mode for testing purposes.
425+
// MemDB always operates in a mode equivalent to ReadNewWriteNew.
426+
func (mdb *memdbDatastore) SchemaModeForTesting() (options.SchemaMode, error) {
427+
return options.SchemaModeReadNewWriteNew, nil
427428
}
428429

429430
type memdbSchemaHashReaderForTesting struct {
430431
db *memdbDatastore
431432
}
432433

433434
func (r *memdbSchemaHashReaderForTesting) ReadSchemaHash(ctx context.Context) (string, error) {
434-
watcher := &memdbSchemaHashWatcher{db: r.db}
435-
return watcher.readSchemaHash()
435+
r.db.RLock()
436+
defer r.db.RUnlock()
437+
438+
tx := r.db.db.Txn(false)
439+
defer tx.Abort()
440+
441+
raw, err := tx.First(tableSchemaRevision, indexID, "current")
442+
if err != nil {
443+
return "", fmt.Errorf("failed to query schema hash: %w", err)
444+
}
445+
446+
if raw == nil {
447+
return "", datastore.ErrSchemaNotFound
448+
}
449+
450+
revisionData, ok := raw.(*schemaRevisionData)
451+
if !ok {
452+
return "", errors.New("invalid schema revision data type")
453+
}
454+
455+
return string(revisionData.hash), nil
436456
}
437457

438458
// This code assumes that the RWMutex has been acquired.

internal/datastore/memdb/schemawatch.go

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)