Skip to content

Commit 586202b

Browse files
authored
Keep grouped servers in sync with client (#1344)
1 parent b5c580c commit 586202b

File tree

3 files changed

+166
-105
lines changed

3 files changed

+166
-105
lines changed

pkg/client/manager.go

Lines changed: 137 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
ct "github.com/stacklok/toolhive/pkg/container"
1010
rt "github.com/stacklok/toolhive/pkg/container/runtime"
1111
"github.com/stacklok/toolhive/pkg/core"
12+
"github.com/stacklok/toolhive/pkg/groups"
1213
"github.com/stacklok/toolhive/pkg/labels"
1314
"github.com/stacklok/toolhive/pkg/logger"
1415
)
@@ -30,10 +31,15 @@ type Manager interface {
3031
RegisterClients(clients []Client, workloads []core.Workload) error
3132
// UnregisterClients unregisters multiple clients from ToolHive.
3233
UnregisterClients(ctx context.Context, clients []Client) error
34+
// AddServerToClients adds an MCP server to the appropriate client configurations.
35+
AddServerToClients(ctx context.Context, serverName, serverURL, transportType, group string) error
36+
// RemoveServerFromClients removes an MCP server from the appropriate client configurations.
37+
RemoveServerFromClients(ctx context.Context, serverName, group string) error
3338
}
3439

3540
type defaultManager struct {
36-
runtime rt.Runtime
41+
runtime rt.Runtime
42+
groupManager groups.Manager
3743
}
3844

3945
// NewManager creates a new client manager instance.
@@ -43,8 +49,14 @@ func NewManager(ctx context.Context) (Manager, error) {
4349
return nil, err
4450
}
4551

52+
groupManager, err := groups.NewManager()
53+
if err != nil {
54+
return nil, err
55+
}
56+
4657
return &defaultManager{
47-
runtime: runtime,
58+
runtime: runtime,
59+
groupManager: groupManager,
4860
}, nil
4961
}
5062

@@ -96,6 +108,49 @@ func (m *defaultManager) UnregisterClients(ctx context.Context, clients []Client
96108
return nil
97109
}
98110

111+
// AddServerToClients adds an MCP server to the appropriate client configurations.
112+
// If the workload belongs to a group, only clients registered with that group are updated.
113+
// If the workload has no group, all registered clients are updated (backward compatibility).
114+
func (m *defaultManager) AddServerToClients(
115+
ctx context.Context, serverName, serverURL, transportType, group string,
116+
) error {
117+
targetClients := m.getTargetClients(ctx, serverName, group)
118+
119+
if len(targetClients) == 0 {
120+
logger.Infof("No target clients found for server %s", serverName)
121+
return nil
122+
}
123+
124+
// Add the server to each target client
125+
for _, clientName := range targetClients {
126+
if err := m.updateClientWithServer(clientName, serverName, serverURL, transportType); err != nil {
127+
logger.Warnf("Warning: Failed to update client %s: %v", clientName, err)
128+
}
129+
}
130+
return nil
131+
}
132+
133+
// RemoveServerFromClients removes an MCP server from the appropriate client configurations.
134+
// If the server belongs to a group, only clients registered with that group are updated.
135+
// If the server has no group, all registered clients are updated (backward compatibility).
136+
func (m *defaultManager) RemoveServerFromClients(ctx context.Context, serverName, group string) error {
137+
targetClients := m.getTargetClients(ctx, serverName, group)
138+
139+
if len(targetClients) == 0 {
140+
logger.Infof("No target clients found for server %s", serverName)
141+
return nil
142+
}
143+
144+
// Remove the server from each target client
145+
for _, clientName := range targetClients {
146+
if err := m.removeServerFromClient(MCPClient(clientName), serverName); err != nil {
147+
logger.Warnf("Warning: Failed to remove server from client %s: %v", clientName, err)
148+
}
149+
}
150+
151+
return nil
152+
}
153+
99154
// removeMCPsFromClient removes currently running MCP servers from the specified client's configuration
100155
func (m *defaultManager) removeMCPsFromClient(ctx context.Context, clientType MCPClient) error {
101156
// List workloads
@@ -117,12 +172,6 @@ func (m *defaultManager) removeMCPsFromClient(ctx context.Context, clientType MC
117172
return nil
118173
}
119174

120-
// Find the client configuration for the specified client
121-
clientConfig, err := FindClientConfig(clientType)
122-
if err != nil {
123-
return fmt.Errorf("failed to find client configurations: %w", err)
124-
}
125-
126175
// For each running container, remove it from the client configuration
127176
for _, c := range runningContainers {
128177
// Get container name from labels
@@ -139,34 +188,17 @@ func (m *defaultManager) removeMCPsFromClient(ctx context.Context, clientType MC
139188
continue
140189
}
141190

142-
// Remove the MCP server configuration with locking
143-
if err := clientConfig.ConfigUpdater.Remove(name); err != nil {
144-
logger.Warnf("Warning: Failed to remove MCP server configuration from %s: %v", clientConfig.Path, err)
191+
if err := m.removeServerFromClient(clientType, name); err != nil {
192+
logger.Warnf("Warning: %v", err)
145193
continue
146194
}
147-
148-
logger.Infof("Removed MCP server %s from client %s\n", name, clientType)
149195
}
150196

151197
return nil
152198
}
153199

154200
// addWorkloadsToClient adds the specified workloads to the client's configuration
155-
func (*defaultManager) addWorkloadsToClient(clientType MCPClient, workloads []core.Workload) error {
156-
// Find the client configuration for the specified client
157-
clientConfig, err := FindClientConfig(clientType)
158-
if err != nil {
159-
if errors.Is(err, ErrConfigFileNotFound) {
160-
// Create a new client configuration if it doesn't exist
161-
clientConfig, err = CreateClientConfig(clientType)
162-
if err != nil {
163-
return fmt.Errorf("failed to create client configuration for %s: %w", clientType, err)
164-
}
165-
} else {
166-
return fmt.Errorf("failed to find client configuration: %w", err)
167-
}
168-
}
169-
201+
func (m *defaultManager) addWorkloadsToClient(clientType MCPClient, workloads []core.Workload) error {
170202
if len(workloads) == 0 {
171203
// No workloads to add, nothing more to do
172204
return nil
@@ -178,14 +210,87 @@ func (*defaultManager) addWorkloadsToClient(clientType MCPClient, workloads []co
178210
continue
179211
}
180212

181-
// Update the MCP server configuration with locking
182-
if err := Upsert(*clientConfig, workload.Name, workload.URL, string(workload.TransportType)); err != nil {
183-
logger.Warnf("Warning: Failed to update MCP server configuration in %s: %v", clientConfig.Path, err)
184-
continue
213+
// Use the common update function (creates config if needed)
214+
err := m.updateClientWithServer(
215+
string(clientType), workload.Name, workload.URL, string(workload.TransportType),
216+
)
217+
if err != nil {
218+
return fmt.Errorf("failed to add workload %s to client %s: %v", workload.Name, clientType, err)
185219
}
186220

187221
logger.Infof("Added MCP server %s to client %s\n", workload.Name, clientType)
188222
}
189223

190224
return nil
191225
}
226+
227+
// removeServerFromClient removes an MCP server from a single client configuration
228+
func (*defaultManager) removeServerFromClient(clientName MCPClient, serverName string) error {
229+
clientConfig, err := FindClientConfig(clientName)
230+
if err != nil {
231+
return fmt.Errorf("failed to find client configurations: %w", err)
232+
}
233+
234+
// Remove the MCP server configuration with locking
235+
if err := clientConfig.ConfigUpdater.Remove(serverName); err != nil {
236+
return fmt.Errorf("failed to remove MCP server configuration from %s: %v", clientConfig.Path, err)
237+
}
238+
239+
logger.Infof("Removed MCP server %s from client %s\n", serverName, clientName)
240+
return nil
241+
}
242+
243+
// updateClientWithServer updates a single client with an MCP server configuration, creating config if needed
244+
func (*defaultManager) updateClientWithServer(clientName, serverName, serverURL, transportType string) error {
245+
clientConfig, err := FindClientConfig(MCPClient(clientName))
246+
if err != nil {
247+
if errors.Is(err, ErrConfigFileNotFound) {
248+
// Create a new client configuration if it doesn't exist
249+
clientConfig, err = CreateClientConfig(MCPClient(clientName))
250+
if err != nil {
251+
return fmt.Errorf("failed to create client configuration for %s: %w", clientName, err)
252+
}
253+
} else {
254+
return fmt.Errorf("failed to find client configuration: %w", err)
255+
}
256+
}
257+
258+
logger.Infof("Updating client configuration: %s", clientConfig.Path)
259+
260+
if err := Upsert(*clientConfig, serverName, serverURL, transportType); err != nil {
261+
return fmt.Errorf("failed to update MCP server configuration in %s: %v", clientConfig.Path, err)
262+
}
263+
264+
logger.Infof("Successfully updated client configuration: %s", clientConfig.Path)
265+
return nil
266+
}
267+
268+
// getTargetClients determines which clients should be updated based on workload group
269+
func (m *defaultManager) getTargetClients(ctx context.Context, serverName, groupName string) []string {
270+
// Server belongs to a group - only update clients registered with that group
271+
if groupName != "" {
272+
group, err := m.groupManager.Get(ctx, groupName)
273+
if err != nil {
274+
logger.Warnf(
275+
"Warning: Failed to get group %s for server %s, skipping client config updates: %v",
276+
group, serverName, err,
277+
)
278+
return nil
279+
}
280+
281+
logger.Infof(
282+
"Server %s belongs to group %s, updating %d registered client(s)",
283+
serverName, group, len(group.RegisteredClients),
284+
)
285+
return group.RegisteredClients
286+
}
287+
288+
// Server has no group - use backward compatible behavior (update all registered clients)
289+
appConfig := config.GetConfig()
290+
targetClients := appConfig.Clients.RegisteredClients
291+
logger.Infof(
292+
"Server %s has no group, updating %d globally registered client(s) for backward compatibility",
293+
serverName, len(targetClients),
294+
)
295+
return targetClients
296+
}

pkg/runner/runner.go

Lines changed: 10 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,16 @@ func (r *Runner) Run(ctx context.Context) error {
210210
// Update client configurations with the MCP server URL.
211211
// Note that this function checks the configuration to determine which
212212
// clients should be updated, if any.
213-
if err := updateClientConfigurations(r.Config.ContainerName, r.Config.ContainerLabels, "localhost", r.Config.Port); err != nil {
214-
logger.Warnf("Warning: Failed to update client configurations: %v", err)
213+
clientManager, err := client.NewManager(ctx)
214+
if err != nil {
215+
logger.Warnf("Warning: Failed to create client manager: %v", err)
216+
} else {
217+
transportType := labels.GetTransportType(r.Config.ContainerLabels)
218+
serverURL := transport.GenerateMCPServerURL(transportType, "localhost", r.Config.Port, r.Config.ContainerName)
219+
220+
if err := clientManager.AddServerToClients(ctx, r.Config.ContainerName, serverURL, transportType, r.Config.Group); err != nil {
221+
logger.Warnf("Warning: Failed to add server to client configurations: %v", err)
222+
}
215223
}
216224

217225
// Define a function to stop the MCP server
@@ -314,40 +322,3 @@ func (r *Runner) Cleanup(ctx context.Context) error {
314322
}
315323
return nil
316324
}
317-
318-
// updateClientConfigurations updates client configuration files with the MCP server URL
319-
func updateClientConfigurations(
320-
containerName string,
321-
containerLabels map[string]string,
322-
host string,
323-
proxyPort int,
324-
) error {
325-
// Find client configuration files
326-
clientConfigs, err := client.FindRegisteredClientConfigs()
327-
if err != nil {
328-
return fmt.Errorf("failed to find client configurations: %w", err)
329-
}
330-
331-
if len(clientConfigs) == 0 {
332-
logger.Infof("No client configuration files found")
333-
return nil
334-
}
335-
336-
// Generate the URL for the MCP server
337-
transportType := labels.GetTransportType(containerLabels)
338-
url := transport.GenerateMCPServerURL(transportType, host, proxyPort, containerName)
339-
340-
// Update each configuration file
341-
for _, clientConfig := range clientConfigs {
342-
logger.Infof("Updating client configuration: %s", clientConfig.Path)
343-
344-
if err := client.Upsert(clientConfig, containerName, url, transportType); err != nil {
345-
fmt.Printf("Warning: Failed to update MCP server configuration in %s: %v\n", clientConfig.Path, err)
346-
continue
347-
}
348-
349-
logger.Infof("Successfully updated client configuration: %s", clientConfig.Path)
350-
}
351-
352-
return nil
353-
}

pkg/workloads/manager.go

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -364,12 +364,10 @@ func (d *defaultManager) DeleteWorkloads(ctx context.Context, names []string) (*
364364

365365
logger.Infof("Container %s removed", name)
366366

367-
if shouldRemoveClientConfig() {
368-
if err := removeClientConfigurations(name); err != nil {
369-
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
370-
} else {
371-
logger.Infof("Client configurations for %s removed", name)
372-
}
367+
if err := removeClientConfigurations(name); err != nil {
368+
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
369+
} else {
370+
logger.Infof("Client configurations for %s removed", name)
373371
}
374372
}
375373

@@ -470,37 +468,26 @@ func (d *defaultManager) RestartWorkloads(ctx context.Context, names []string, f
470468
return group, nil
471469
}
472470

473-
func shouldRemoveClientConfig() bool {
474-
c := config.GetConfig()
475-
return len(c.Clients.RegisteredClients) > 0
476-
}
477-
478471
// TODO: Move to dedicated config management interface.
479472
// updateClientConfigurations updates client configuration files with the MCP server URL
480473
func removeClientConfigurations(containerName string) error {
481-
// Find client configuration files
482-
configs, err := client.FindRegisteredClientConfigs()
474+
// Get the workload's group by loading its run config
475+
runConfig, err := runner.LoadState(context.Background(), containerName)
476+
var group string
483477
if err != nil {
484-
return fmt.Errorf("failed to find client configurations: %w", err)
478+
logger.Warnf("Warning: Failed to load run config for %s, will use backward compatible behavior: %v", containerName, err)
479+
// Continue with empty group (backward compatibility)
480+
} else {
481+
group = runConfig.Group
485482
}
486483

487-
if len(configs) == 0 {
488-
logger.Info("No client configuration files found")
484+
clientManager, err := client.NewManager(context.Background())
485+
if err != nil {
486+
logger.Warnf("Warning: Failed to create client manager for %s, skipping client config removal: %v", containerName, err)
489487
return nil
490488
}
491489

492-
for _, c := range configs {
493-
logger.Infof("Removing MCP server from client configuration: %s", c.Path)
494-
495-
if err := c.ConfigUpdater.Remove(containerName); err != nil {
496-
logger.Warnf("Warning: Failed to remove MCP server from client configuration %s: %v", c.Path, err)
497-
continue
498-
}
499-
500-
logger.Infof("Successfully removed MCP server from client configuration: %s", c.Path)
501-
}
502-
503-
return nil
490+
return clientManager.RemoveServerFromClients(context.Background(), containerName, group)
504491
}
505492

506493
// loadRunnerFromState attempts to load a Runner from the state store
@@ -569,12 +556,10 @@ func (d *defaultManager) stopWorkloads(ctx context.Context, workloads []*rt.Cont
569556
return fmt.Errorf("failed to stop container: %w", err)
570557
}
571558

572-
if shouldRemoveClientConfig() {
573-
if err := removeClientConfigurations(name); err != nil {
574-
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
575-
} else {
576-
logger.Infof("Client configurations for %s removed", name)
577-
}
559+
if err := removeClientConfigurations(name); err != nil {
560+
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
561+
} else {
562+
logger.Infof("Client configurations for %s removed", name)
578563
}
579564

580565
d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusStopped, "")

0 commit comments

Comments
 (0)