Skip to content

Commit f711903

Browse files
domdomeggclaude
andcommitted
Fix atomic latest version update to prevent missing isLatest flags
Previously, updating the old latest version and creating the new version were separate operations that could fail independently, potentially leaving no version marked as isLatest. This change makes the operation atomic: - CreateServer now accepts oldLatestVersionID parameter - PostgreSQL implementation uses a transaction for UPDATE + INSERT atomicity - Service layer maintains WithPublishLock for concurrent operation serialization - Both mechanisms work together: lock prevents races, transaction ensures atomicity 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent ddcc586 commit f711903

File tree

4 files changed

+50
-28
lines changed

4 files changed

+50
-28
lines changed

internal/database/database.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,10 @@ type Database interface {
4040
GetByServerIDAndVersion(ctx context.Context, serverID string, version string) (*apiv0.ServerJSON, error)
4141
// Retrieve all versions of a server by server ID
4242
GetAllVersionsByServerID(ctx context.Context, serverID string) ([]*apiv0.ServerJSON, error)
43-
// CreateServer adds a new server to the database
44-
CreateServer(ctx context.Context, server *apiv0.ServerJSON) (*apiv0.ServerJSON, error)
43+
// CreateServer atomically publishes a new server version, optionally unmarking a previous latest version
44+
// If oldLatestVersionID is provided, it will be updated to set IsLatest=false before creating the new version
45+
// This operation happens within a transaction with an advisory lock to prevent race conditions
46+
CreateServer(ctx context.Context, newServer *apiv0.ServerJSON, oldLatestVersionID *string) (*apiv0.ServerJSON, error)
4547
// UpdateServer updates an existing server record
4648
UpdateServer(ctx context.Context, id string, server *apiv0.ServerJSON) (*apiv0.ServerJSON, error)
4749
// WithPublishLock executes a function with an exclusive lock for publishing a server

internal/database/postgres.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -328,8 +328,9 @@ func (db *PostgreSQL) GetAllVersionsByServerID(ctx context.Context, serverID str
328328
return results, nil
329329
}
330330

331-
// CreateServer adds a new server to the database
332-
func (db *PostgreSQL) CreateServer(ctx context.Context, server *apiv0.ServerJSON) (*apiv0.ServerJSON, error) {
331+
// CreateServer atomically publishes a new server version, optionally unmarking a previous latest version
332+
// Must be called within WithPublishLock to ensure proper serialization
333+
func (db *PostgreSQL) CreateServer(ctx context.Context, server *apiv0.ServerJSON, oldLatestVersionID *string) (*apiv0.ServerJSON, error) {
333334
if ctx.Err() != nil {
334335
return nil, ctx.Err()
335336
}
@@ -346,23 +347,53 @@ func (db *PostgreSQL) CreateServer(ctx context.Context, server *apiv0.ServerJSON
346347
return nil, fmt.Errorf("server must have both ServerID and VersionID in registry metadata")
347348
}
348349

350+
// Begin a transaction for atomicity of UPDATE + INSERT
351+
tx, err := db.pool.Begin(ctx)
352+
if err != nil {
353+
return nil, fmt.Errorf("failed to begin transaction: %w", err)
354+
}
355+
defer func() {
356+
_ = tx.Rollback(ctx)
357+
}()
358+
359+
// If there's a previous latest version, unmark it
360+
if oldLatestVersionID != nil && *oldLatestVersionID != "" {
361+
updateQuery := `
362+
UPDATE servers
363+
SET value = jsonb_set(
364+
value,
365+
'{_meta,io.modelcontextprotocol.registry/official,isLatest}',
366+
'false'::jsonb
367+
)
368+
WHERE version_id = $1
369+
`
370+
_, err := tx.Exec(ctx, updateQuery, *oldLatestVersionID)
371+
if err != nil {
372+
return nil, fmt.Errorf("failed to unmark previous latest version: %w", err)
373+
}
374+
}
375+
349376
// Marshal the complete server to JSONB
350377
valueJSON, err := json.Marshal(server)
351378
if err != nil {
352379
return nil, fmt.Errorf("failed to marshal server JSON: %w", err)
353380
}
354381

355-
// Insert into servers table with new schema (only version_id column, serverId is in JSON)
356-
query := `
382+
// Insert the new version
383+
insertQuery := `
357384
INSERT INTO servers (version_id, value)
358385
VALUES ($1, $2)
359386
`
360-
361-
_, err = db.pool.Exec(ctx, query, versionID, valueJSON)
387+
_, err = tx.Exec(ctx, insertQuery, versionID, valueJSON)
362388
if err != nil {
363389
return nil, fmt.Errorf("failed to insert server: %w", err)
364390
}
365391

392+
// Commit the transaction
393+
if err := tx.Commit(ctx); err != nil {
394+
return nil, fmt.Errorf("failed to commit transaction: %w", err)
395+
}
396+
366397
return server, nil
367398
}
368399

internal/importer/importer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (s *Service) ImportFromPath(ctx context.Context, path string) error {
3737

3838
// Import each server using CreateServer
3939
for _, server := range servers {
40-
_, err := s.db.CreateServer(ctx, server)
40+
_, err := s.db.CreateServer(ctx, server, nil)
4141
if err != nil {
4242
return fmt.Errorf("failed to import server %s: %w", server.Name, err)
4343
}
@@ -181,4 +181,4 @@ func convertServerResponseToRecord(response apiv0.ServerJSON) *apiv0.ServerJSON
181181
// The response is already in the correct flattened format
182182
// Just return a pointer to it
183183
return &response
184-
}
184+
}

internal/service/registry_service.go

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -176,33 +176,22 @@ func (s *registryServiceImpl) Publish(req apiv0.ServerJSON) (*apiv0.ServerJSON,
176176
) > 0
177177
}
178178

179-
// Mark previous latest as no longer latest BEFORE creating new version
180-
// This prevents violating the unique constraint on isLatest
179+
// Prepare the old latest version ID if we need to unmark it
180+
var oldLatestVersionID *string
181181
if isNewLatest && existingLatest != nil {
182-
var existingLatestVersionID string
183182
if existingLatest.Meta != nil && existingLatest.Meta.Official != nil {
184-
existingLatestVersionID = existingLatest.Meta.Official.VersionID
185-
}
186-
if existingLatestVersionID != "" {
187-
// Update the existing server to set isLatest = false
188-
existingLatest.Meta.Official.IsLatest = false
189-
existingLatest.Meta.Official.UpdatedAt = time.Now()
190-
if _, err := s.db.UpdateServer(lockCtx, existingLatestVersionID, existingLatest); err != nil {
191-
return nil, err
183+
versionID := existingLatest.Meta.Official.VersionID
184+
if versionID != "" {
185+
oldLatestVersionID = &versionID
192186
}
193187
}
194188
}
195189

196190
// Create complete server with metadata
197191
server := s.createServerWithMetadata(serverJSON, existingServerVersions, publishTime, isNewLatest)
198192

199-
// Create server in database
200-
serverRecord, err := s.db.CreateServer(lockCtx, &server)
201-
if err != nil {
202-
return nil, err
203-
}
204-
205-
return serverRecord, nil
193+
// Publish server version atomically (unmarks old latest and creates new version in one transaction)
194+
return s.db.CreateServer(lockCtx, &server, oldLatestVersionID)
206195
})
207196

208197
if err != nil {

0 commit comments

Comments
 (0)