Skip to content

Commit 4d76890

Browse files
authored
fix(controller): resolve mutex deadlock between agent and remotemcpserver reconcilers (#1214)
## Summary Fixes controller reconciliation deadlock by removing the application-level `upsertLock` mutex. Database-level concurrency control (atomic upserts and transactions) now handles synchronization. ## Problem The shared `upsertLock` mutex blocked all Agent reconciliations when RemoteMCPServer reconciliation performed slow network I/O while holding the lock. ## Solution **Phase 1**: Reduced lock scope to exclude network I/O **Phase 2**: Fixed Agent predicate to ensure Create events are always processed **Phase 3**: Removed mutex entirely - database handles concurrency via: - Atomic upserts: `INSERT ... ON CONFLICT DO UPDATE` - Transactions: Atomic tool replacement in `RefreshToolsForServer()` **Phase 4**: Documentation and tests - Added architecture doc explaining concurrency model - Added idempotence tests for `StoreAgent` and `StoreToolServer` - Added IMPORTANT comment to `RefreshToolsForServer` about transaction scope --------- Signed-off-by: Josh Bell <[email protected]>
1 parent 70214df commit 4d76890

File tree

5 files changed

+382
-59
lines changed

5 files changed

+382
-59
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Controller Reconciliation Architecture
2+
3+
This document explains how kagent's Kubernetes controllers reconcile resources and share state.
4+
5+
## Overview
6+
7+
The kagent controller manager runs multiple controllers that share a single `kagentReconciler` instance:
8+
9+
```text
10+
┌─────────────────────────────────────────────────────────────────┐
11+
│ Controller Manager │
12+
│ │
13+
│ ┌──────────────────┐ ┌──────────────────┐ ┌───────────────┐ │
14+
│ │ AgentController │ │ RemoteMCPServer │ │ MCPServer │ │
15+
│ │ │ │ Controller │ │ Controller │ │
16+
│ └────────┬─────────┘ └────────┬─────────┘ └───────┬───────┘ │
17+
│ │ │ │ │
18+
│ └─────────────────────┼────────────────────┘ │
19+
│ │ │
20+
│ ▼ │
21+
│ ┌────────────────────────┐ │
22+
│ │ kagentReconciler │ │
23+
│ │ (shared instance) │ │
24+
│ │ │ │
25+
│ │ - adkTranslator │ │
26+
│ │ - kube client │ │
27+
│ │ - dbClient │ │
28+
│ └────────────────────────┘ │
29+
│ │ │
30+
│ ▼ │
31+
│ ┌────────────────────────┐ │
32+
│ │ SQLite DB │ │
33+
│ └────────────────────────┘ │
34+
└─────────────────────────────────────────────────────────────────┘
35+
```
36+
37+
## Concurrency Model
38+
39+
The reconciler uses database-level concurrency control instead of application-level locks:
40+
41+
**Atomic Upserts**: Database operations for storing agents and tool servers use SQL `INSERT ... ON CONFLICT DO UPDATE` semantics. This makes the operations idempotent and safe for concurrent execution.
42+
43+
**Transactions**: Tool refresh operations wrap multiple statements (delete all existing tools, insert new tools) in a database transaction to ensure atomicity.
44+
45+
**No Application Locks**: The reconciler does not use mutexes or other Go synchronization primitives. SQLite handles write serialization internally.
46+
47+
## Reconciliation Flows
48+
49+
### Agent Reconciliation
50+
51+
When an Agent CR is created or updated:
52+
53+
1. The `AgentController` receives the event
54+
2. Delegates to the shared `kagentReconciler`
55+
3. The reconciler translates the Agent spec into Kubernetes manifests (Deployment, ConfigMap, etc.)
56+
4. Reconciles the desired state with the cluster (create/update/delete owned resources)
57+
5. Stores the agent configuration in the SQLite database (atomic upsert)
58+
6. Updates the Agent status
59+
60+
### RemoteMCPServer Reconciliation
61+
62+
When a RemoteMCPServer CR is created or updated:
63+
64+
1. The RemoteMCPServer controller receives the event
65+
2. Stores the tool server metadata in the database (atomic upsert)
66+
3. Connects to the remote MCP server over the network
67+
4. Lists available tools from the server
68+
5. Replaces all tools for this server in the database (transaction)
69+
6. Updates the RemoteMCPServer status with discovered tools
70+
71+
### Key Design Point
72+
73+
Network I/O (connecting to remote MCP servers, listing tools) happens **outside** of database transactions. This prevents long-running network operations from holding database locks and blocking other reconciliations.
74+
75+
## Event Filtering
76+
77+
The `AgentController` uses a custom event predicate to control which Kubernetes events trigger reconciliation:
78+
79+
- **Create events**: Always processed (ensures all agents reconcile on controller startup)
80+
- **Delete events**: Always processed
81+
- **Update events**: Only processed if the agent's generation or labels changed
82+
83+
This filtering prevents unnecessary reconciliations when only the agent's status changes.
84+
85+
## Related Files
86+
87+
- [reconciler.go](../../go/internal/controller/reconciler/reconciler.go) - Shared reconciler implementation
88+
- [agent_controller.go](../../go/internal/controller/agent_controller.go) - Agent controller setup
89+
- [service.go](../../go/internal/database/service.go) - Database helpers with atomic upserts
90+
- [client.go](../../go/internal/database/client.go) - Database client implementation

go/internal/controller/agent_controller.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ func (r *AgentController) SetupWithManager(mgr ctrl.Manager) error {
7070
WithOptions(controller.Options{
7171
NeedLeaderElection: ptr.To(true),
7272
}).
73-
For(&v1alpha2.Agent{}, builder.WithPredicates(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.LabelChangedPredicate{})))
73+
// Use custom predicate that always processes Create/Delete events,
74+
// but filters Update events to only process when generation or labels change.
75+
// This fixes a bug where some agents were not reconciled on startup.
76+
For(&v1alpha2.Agent{}, builder.WithPredicates(agentPredicate{}))
7477

7578
// Setup owns relationships for resources created by the Agent controller -
7679
// for now ownership of agent resources is handled by the ADK translator
@@ -328,3 +331,48 @@ type typedOwnedObjectPredicate[object metav1.Object] struct {
328331
func (typedOwnedObjectPredicate[object]) Create(e event.TypedCreateEvent[object]) bool {
329332
return false
330333
}
334+
335+
// agentPredicate is a custom predicate for Agent resources that:
336+
// - Always processes Create events (ensures all agents are reconciled on startup)
337+
// - Always processes Delete events
338+
// - For Update events, only processes if generation or labels changed
339+
// This fixes a bug where GenerationChangedPredicate combined with LabelChangedPredicate
340+
// could inconsistently filter out Create events for some agents.
341+
type agentPredicate struct{}
342+
343+
func (agentPredicate) Create(e event.CreateEvent) bool {
344+
// Always process Create events - this ensures all agents are reconciled on startup
345+
return true
346+
}
347+
348+
func (agentPredicate) Delete(e event.DeleteEvent) bool {
349+
// Always process Delete events
350+
return true
351+
}
352+
353+
func (agentPredicate) Update(e event.UpdateEvent) bool {
354+
if e.ObjectOld == nil || e.ObjectNew == nil {
355+
return false
356+
}
357+
// Process if generation changed (spec was modified)
358+
if e.ObjectNew.GetGeneration() != e.ObjectOld.GetGeneration() {
359+
return true
360+
}
361+
// Process if labels changed
362+
oldLabels := e.ObjectOld.GetLabels()
363+
newLabels := e.ObjectNew.GetLabels()
364+
if len(oldLabels) != len(newLabels) {
365+
return true
366+
}
367+
for k, v := range oldLabels {
368+
if newLabels[k] != v {
369+
return true
370+
}
371+
}
372+
return false
373+
}
374+
375+
func (agentPredicate) Generic(e event.GenericEvent) bool {
376+
// Always process Generic events
377+
return true
378+
}

go/internal/controller/reconciler/reconciler.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"reflect"
1111
"slices"
1212
"strings"
13-
"sync"
1413

1514
"github.com/hashicorp/go-multierror"
1615
reconcilerutils "github.com/kagent-dev/kagent/go/internal/controller/reconciler/utils"
@@ -56,9 +55,6 @@ type kagentReconciler struct {
5655
dbClient database.Client
5756

5857
defaultModelConfig types.NamespacedName
59-
60-
// TODO: Remove this lock since we have a DB which we can batch anyway
61-
upsertLock sync.Mutex
6258
}
6359

6460
func NewKagentReconciler(
@@ -620,10 +616,6 @@ func (a *kagentReconciler) deleteObjects(ctx context.Context, objects map[types.
620616
}
621617

622618
func (a *kagentReconciler) upsertAgent(ctx context.Context, agent *v1alpha2.Agent, agentOutputs *agent_translator.AgentOutputs) error {
623-
// lock to prevent races
624-
a.upsertLock.Lock()
625-
defer a.upsertLock.Unlock()
626-
627619
id := utils.ConvertToPythonIdentifier(utils.GetObjectRef(agent))
628620
dbAgent := &database.Agent{
629621
ID: id,
@@ -639,14 +631,12 @@ func (a *kagentReconciler) upsertAgent(ctx context.Context, agent *v1alpha2.Agen
639631
}
640632

641633
func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Context, toolServer *database.ToolServer, remoteMcpServer *v1alpha2.RemoteMCPServerSpec, namespace string) ([]*v1alpha2.MCPTool, error) {
642-
// lock to prevent races
643-
a.upsertLock.Lock()
644-
defer a.upsertLock.Unlock()
645-
634+
// Store tool server - database handles concurrency via atomic upsert
646635
if _, err := a.dbClient.StoreToolServer(toolServer); err != nil {
647636
return nil, fmt.Errorf("failed to store toolServer %s: %v", toolServer.Name, err)
648637
}
649638

639+
// Create transport and list tools from remote MCP server
650640
tsp, err := a.createMcpTransport(ctx, remoteMcpServer, namespace)
651641
if err != nil {
652642
return nil, fmt.Errorf("failed to create client for toolServer %s: %v", toolServer.Name, err)
@@ -657,6 +647,7 @@ func (a *kagentReconciler) upsertToolServerForRemoteMCPServer(ctx context.Contex
657647
return nil, fmt.Errorf("failed to fetch tools for toolServer %s: %v", toolServer.Name, err)
658648
}
659649

650+
// Refresh tools in database - uses transaction for atomicity
660651
if err := a.dbClient.RefreshToolsForServer(toolServer.Name, toolServer.GroupKind, tools...); err != nil {
661652
return nil, fmt.Errorf("failed to refresh tools for toolServer %s: %v", toolServer.Name, err)
662653
}

go/internal/database/client.go

Lines changed: 22 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7-
"slices"
87
"time"
98

109
"github.com/kagent-dev/kagent/go/api/v1alpha2"
@@ -261,59 +260,36 @@ func (c *clientImpl) ListToolsForServer(serverName string, groupKind string) ([]
261260
Clause{Key: "group_kind", Value: groupKind})
262261
}
263262

264-
// RefreshToolsForServer refreshes a tool server
265-
// TODO: Use a transaction to ensure atomicity
263+
// RefreshToolsForServer atomically replaces all tools for a server.
264+
// Uses a database transaction to ensure consistency under concurrent access.
265+
//
266+
// IMPORTANT: This function should only contain fast database operations.
267+
// Network I/O (e.g., fetching tools from remote MCP servers) must happen
268+
// BEFORE calling this function, not inside it. Holding a database transaction
269+
// during slow operations can cause contention and degrade performance.
266270
func (c *clientImpl) RefreshToolsForServer(serverName string, groupKind string, tools ...*v1alpha2.MCPTool) error {
267-
existingTools, err := c.ListToolsForServer(serverName, groupKind)
268-
if err != nil {
269-
return err
270-
}
271-
272-
// Check if the tool exists in the existing tools
273-
// If it does, update it
274-
// If it doesn't, create it
275-
// If it's in the existing tools but not in the new tools, delete it
276-
for _, tool := range tools {
277-
existingToolIndex := slices.IndexFunc(existingTools, func(t Tool) bool {
278-
return t.ID == tool.Name
279-
})
280-
if existingToolIndex != -1 {
281-
existingTool := existingTools[existingToolIndex]
282-
existingTool.ServerName = serverName
283-
existingTool.GroupKind = groupKind
284-
existingTool.Description = tool.Description
285-
err = save(c.db, &existingTool)
286-
if err != nil {
287-
return err
288-
}
289-
} else {
290-
err = save(c.db, &Tool{
271+
return c.db.Transaction(func(tx *gorm.DB) error {
272+
// Delete all existing tools for this server in the transaction
273+
if err := delete[Tool](tx,
274+
Clause{Key: "server_name", Value: serverName},
275+
Clause{Key: "group_kind", Value: groupKind}); err != nil {
276+
return fmt.Errorf("failed to delete existing tools: %w", err)
277+
}
278+
279+
// Insert all new tools
280+
for _, tool := range tools {
281+
if err := save(tx, &Tool{
291282
ID: tool.Name,
292283
ServerName: serverName,
293284
GroupKind: groupKind,
294285
Description: tool.Description,
295-
})
296-
if err != nil {
297-
return fmt.Errorf("failed to create tool %s: %v", tool.Name, err)
286+
}); err != nil {
287+
return fmt.Errorf("failed to create tool %s: %w", tool.Name, err)
298288
}
299289
}
300-
}
301290

302-
// Delete any tools that are in the existing tools but not in the new tools
303-
for _, existingTool := range existingTools {
304-
if !slices.ContainsFunc(tools, func(t *v1alpha2.MCPTool) bool {
305-
return t.Name == existingTool.ID
306-
}) {
307-
err = delete[Tool](c.db,
308-
Clause{Key: "id", Value: existingTool.ID},
309-
Clause{Key: "server_name", Value: serverName},
310-
Clause{Key: "group_kind", Value: groupKind})
311-
if err != nil {
312-
return fmt.Errorf("failed to delete tool %s: %v", existingTool.ID, err)
313-
}
314-
}
315-
}
316-
return nil
291+
return nil
292+
})
317293
}
318294

319295
// ListMessagesForRun retrieves messages for a specific run (helper method)

0 commit comments

Comments
 (0)