Skip to content

Commit 2454b78

Browse files
domdomeggclaude
andcommitted
Fix atomic latest version update to prevent missing isLatest flags
- Pass transaction via context from WithPublishLock to CreateServer - Use pg_try_advisory_xact_lock with retry to avoid connection pool exhaustion - Update test to check database state (source of truth) vs returned snapshots - Increase publish timeout to 60s to handle high concurrency scenarios - Test now passes with concurrency=100 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent f711903 commit 2454b78

File tree

3 files changed

+73
-46
lines changed

3 files changed

+73
-46
lines changed

internal/database/postgres.go

Lines changed: 58 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ import (
1515
apiv0 "github.com/modelcontextprotocol/registry/pkg/api/v0"
1616
)
1717

18+
type contextKey string
19+
20+
const publishTxKey contextKey = "publishTx"
21+
1822
// PostgreSQL is an implementation of the Database interface using PostgreSQL
1923
type PostgreSQL struct {
2024
pool *pgxpool.Pool
@@ -328,8 +332,9 @@ func (db *PostgreSQL) GetAllVersionsByServerID(ctx context.Context, serverID str
328332
return results, nil
329333
}
330334

331-
// CreateServer atomically publishes a new server version, optionally unmarking a previous latest version
332-
// Must be called within WithPublishLock to ensure proper serialization
335+
// CreateServer publishes a new server version, optionally unmarking a previous latest version
336+
// MUST be called within WithPublishLock to ensure proper serialization and prevent race conditions
337+
// Executes UPDATE and INSERT atomically using the transaction from the context
333338
func (db *PostgreSQL) CreateServer(ctx context.Context, server *apiv0.ServerJSON, oldLatestVersionID *string) (*apiv0.ServerJSON, error) {
334339
if ctx.Err() != nil {
335340
return nil, ctx.Err()
@@ -347,14 +352,11 @@ func (db *PostgreSQL) CreateServer(ctx context.Context, server *apiv0.ServerJSON
347352
return nil, fmt.Errorf("server must have both ServerID and VersionID in registry metadata")
348353
}
349354

350-
// Begin a transaction for atomicity of UPDATE + INSERT
351-
tx, err := db.pool.Begin(ctx)
352-
if err != nil {
353-
return nil, fmt.Errorf("failed to begin transaction: %w", err)
355+
// Get transaction from context (must be called within WithPublishLock)
356+
tx, ok := ctx.Value(publishTxKey).(pgx.Tx)
357+
if !ok {
358+
return nil, fmt.Errorf("CreateServer must be called within WithPublishLock")
354359
}
355-
defer func() {
356-
_ = tx.Rollback(ctx)
357-
}()
358360

359361
// If there's a previous latest version, unmark it
360362
if oldLatestVersionID != nil && *oldLatestVersionID != "" {
@@ -389,11 +391,6 @@ func (db *PostgreSQL) CreateServer(ctx context.Context, server *apiv0.ServerJSON
389391
return nil, fmt.Errorf("failed to insert server: %w", err)
390392
}
391393

392-
// Commit the transaction
393-
if err := tx.Commit(ctx); err != nil {
394-
return nil, fmt.Errorf("failed to commit transaction: %w", err)
395-
}
396-
397394
return server, nil
398395
}
399396

@@ -440,34 +437,57 @@ func (db *PostgreSQL) WithPublishLock(ctx context.Context, serverName string, fn
440437
return ctx.Err()
441438
}
442439

443-
// Begin a transaction
444-
tx, err := db.pool.Begin(ctx)
445-
if err != nil {
446-
return fmt.Errorf("failed to begin transaction: %w", err)
447-
}
448-
defer func() {
449-
_ = tx.Rollback(ctx)
450-
}()
451-
452-
// Acquire advisory lock based on server name hash
453-
// Using pg_advisory_xact_lock which auto-releases on transaction end
454440
lockID := hashServerName(serverName)
455-
_, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock($1)", lockID)
456-
if err != nil {
457-
return fmt.Errorf("failed to acquire publish lock: %w", err)
458-
}
441+
backoff := 10 * time.Millisecond
459442

460-
// Execute the function
461-
if err := fn(ctx); err != nil {
462-
return err
463-
}
443+
for {
444+
// Begin a transaction
445+
tx, err := db.pool.Begin(ctx)
446+
if err != nil {
447+
return fmt.Errorf("failed to begin transaction: %w", err)
448+
}
464449

465-
// Commit the transaction (which also releases the lock)
466-
if err := tx.Commit(ctx); err != nil {
467-
return fmt.Errorf("failed to commit transaction: %w", err)
468-
}
450+
// Try to acquire advisory lock (non-blocking)
451+
// Using pg_try_advisory_xact_lock which returns immediately
452+
var acquired bool
453+
err = tx.QueryRow(ctx, "SELECT pg_try_advisory_xact_lock($1)", lockID).Scan(&acquired)
454+
if err != nil {
455+
_ = tx.Rollback(ctx)
456+
return fmt.Errorf("failed to acquire publish lock: %w", err)
457+
}
469458

470-
return nil
459+
if !acquired {
460+
// Lock not available, rollback and retry
461+
_ = tx.Rollback(ctx)
462+
463+
// Check if context is still valid
464+
if ctx.Err() != nil {
465+
return fmt.Errorf("failed to acquire publish lock: timeout: %w", ctx.Err())
466+
}
467+
468+
// Wait before retrying
469+
time.Sleep(backoff)
470+
backoff = min(backoff*2, 500*time.Millisecond)
471+
continue
472+
}
473+
474+
// Lock acquired! Store transaction in context
475+
ctxWithTx := context.WithValue(ctx, publishTxKey, tx)
476+
477+
// Execute the function with the transaction-aware context
478+
fnErr := fn(ctxWithTx)
479+
if fnErr != nil {
480+
_ = tx.Rollback(ctx)
481+
return fnErr
482+
}
483+
484+
// Commit the transaction (which also releases the lock)
485+
if err := tx.Commit(ctx); err != nil {
486+
return fmt.Errorf("failed to commit transaction: %w", err)
487+
}
488+
489+
return nil
490+
}
471491
}
472492

473493
// hashServerName creates a consistent hash of the server name for advisory locking

internal/service/registry_service.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,8 @@ func (s *registryServiceImpl) GetAllVersionsByServerID(serverID string) ([]apiv0
123123
// Publish publishes a server with flattened _meta extensions
124124
func (s *registryServiceImpl) Publish(req apiv0.ServerJSON) (*apiv0.ServerJSON, error) {
125125
// Create a timeout context for the database operation
126-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
126+
// Longer timeout to handle high concurrency scenarios where operations are serialized by advisory locks
127+
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
127128
defer cancel()
128129

129130
// Validate the request

internal/service/registry_service_test.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ func TestPublishConcurrentVersionsNoRace(t *testing.T) {
384384
testDB := database.NewTestDB(t)
385385
service := NewRegistryService(testDB, &config.Config{EnableRegistryValidation: false})
386386

387-
const concurrency = 1 // @Maintainers: Fix this and increase to higher number, previously 100
387+
const concurrency = 100
388388
results := make([]*apiv0.ServerJSON, concurrency)
389389
errors := make([]error, concurrency)
390390

@@ -419,12 +419,18 @@ func TestPublishConcurrentVersionsNoRace(t *testing.T) {
419419
}
420420
}
421421

422-
latestCount := 0
423-
for _, result := range results {
424-
if result != nil && result.Meta != nil && result.Meta.Official != nil &&
425-
result.Meta.Official.IsLatest {
426-
latestCount++
422+
// Check what's actually in the database (the source of truth)
423+
// Note: returned results are snapshots from when each was published, so they may show
424+
// multiple versions as latest if they were latest at different points in time
425+
filter := &database.ServerFilter{Name: func() *string { s := "com.example/test-concurrent"; return &s }()}
426+
dbVersions, _, err := testDB.List(context.Background(), filter, "", 1000)
427+
assert.NoError(t, err)
428+
dbLatestCount := 0
429+
for _, v := range dbVersions {
430+
if v.Meta != nil && v.Meta.Official != nil && v.Meta.Official.IsLatest {
431+
dbLatestCount++
427432
}
428433
}
429-
assert.Equal(t, 1, latestCount, "should have exactly one latest version")
434+
435+
assert.Equal(t, 1, dbLatestCount, "should have exactly one latest version in database")
430436
}

0 commit comments

Comments
 (0)