Skip to content

Commit a629b08

Browse files
authored
Merge pull request #73 from dgageot/deprecate-stdio-over-tcp
Deprecate stdio over TCP
2 parents c25b743 + 8299175 commit a629b08

File tree

3 files changed

+36
-95
lines changed

3 files changed

+36
-95
lines changed

cmd/docker-mcp/commands/gateway.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package commands
22

33
import (
4+
"errors"
45
"os"
56

67
"github.com/spf13/cobra"
@@ -56,7 +57,11 @@ func gatewayCommand(docker docker.Client) *cobra.Command {
5657
Short: "Run the gateway",
5758
Args: cobra.NoArgs,
5859
RunE: func(cmd *cobra.Command, _ []string) error {
59-
if options.Port == 0 && options.Transport != "stdio" {
60+
if options.Transport == "stdio" {
61+
if options.Port != 0 {
62+
return errors.New("cannot use --port with --transport=stdio")
63+
}
64+
} else if options.Port == 0 {
6065
options.Port = 8811
6166
}
6267

cmd/docker-mcp/internal/gateway/run.go

Lines changed: 24 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"net"
77
"os"
88
"strings"
9-
"sync"
109
"time"
1110

1211
"github.com/mark3labs/mcp-go/server"
@@ -90,34 +89,14 @@ func (g *Gateway) Run(ctx context.Context) error {
9089
}
9190
toolCallbacks := interceptors.Callbacks(g.LogCalls, g.BlockSecrets, customInterceptors)
9291

93-
// TODO: cleanup stopped servers. That happens in stdio over TCP mode.
94-
var (
95-
lock sync.Mutex
96-
changeListeners []func(*Capabilities)
92+
mcpServer := server.NewMCPServer(
93+
"Docker AI MCP Gateway",
94+
"2.0.1",
95+
server.WithToolHandlerMiddleware(toolCallbacks),
9796
)
9897

99-
capabilities, err := g.listServersAndCapabilities(ctx, configuration)
100-
if err != nil {
101-
return fmt.Errorf("listing capabilities: %w", err)
102-
}
103-
104-
newMCPServer := func() *server.MCPServer {
105-
mcpServer := server.NewMCPServer(
106-
"Docker AI MCP Gateway",
107-
"2.0.1",
108-
server.WithToolHandlerMiddleware(toolCallbacks),
109-
)
110-
111-
// TODO: This will create a new server instance with an outdated vision of the capabilities.
112-
refreshCapabilities(mcpServer, capabilities)
113-
114-
lock.Lock()
115-
changeListeners = append(changeListeners, func(newCapabilities *Capabilities) {
116-
refreshCapabilities(mcpServer, newCapabilities)
117-
})
118-
lock.Unlock()
119-
120-
return mcpServer
98+
if err := g.reloadConfiguration(ctx, mcpServer, configuration); err != nil {
99+
return fmt.Errorf("loading configuration: %w", err)
121100
}
122101

123102
// Optionally watch for configuration updates.
@@ -137,19 +116,10 @@ func (g *Gateway) Run(ctx context.Context) error {
137116
continue
138117
}
139118

140-
capabilities, err := g.listServersAndCapabilities(ctx, configuration)
141-
if err != nil {
119+
if err := g.reloadConfiguration(ctx, mcpServer, configuration); err != nil {
142120
logf("> Unable to list capabilities: %s", err)
143121
continue
144122
}
145-
146-
g.health.SetUnhealthy()
147-
lock.Lock()
148-
for _, listener := range changeListeners {
149-
listener(capabilities)
150-
}
151-
lock.Unlock()
152-
g.health.SetHealthy()
153123
}
154124
}
155125
}()
@@ -162,31 +132,25 @@ func (g *Gateway) Run(ctx context.Context) error {
162132
}
163133

164134
// Start the server
165-
g.health.SetHealthy()
166135
switch strings.ToLower(g.Transport) {
167136
case "stdio":
168-
if g.Port == 0 {
169-
log("> Start stdio server")
170-
return g.startStdioServer(ctx, newMCPServer, os.Stdin, os.Stdout)
171-
}
172-
173-
log("> Start stdio over TCP server on port", g.Port)
174-
return g.startStdioOverTCPServer(ctx, newMCPServer, ln)
137+
log("> Start stdio server")
138+
return g.startStdioServer(ctx, mcpServer, os.Stdin, os.Stdout)
175139

176140
case "sse":
177141
log("> Start sse server on port", g.Port)
178-
return g.startSseServer(ctx, newMCPServer, ln)
142+
return g.startSseServer(ctx, mcpServer, ln)
179143

180144
case "streaming":
181145
log("> Start streaming server on port", g.Port)
182-
return g.startStreamingServer(ctx, newMCPServer, ln)
146+
return g.startStreamingServer(ctx, mcpServer, ln)
183147

184148
default:
185149
return fmt.Errorf("unknown transport %q, expected 'stdio', 'sse' or 'streaming", g.Transport)
186150
}
187151
}
188152

189-
func (g *Gateway) listServersAndCapabilities(ctx context.Context, configuration Configuration) (*Capabilities, error) {
153+
func (g *Gateway) reloadConfiguration(ctx context.Context, mcpServer *server.MCPServer, configuration Configuration) error {
190154
// Which servers are enabled in the registry.yaml?
191155
serverNames := configuration.ServerNames()
192156
if len(serverNames) == 0 {
@@ -200,19 +164,20 @@ func (g *Gateway) listServersAndCapabilities(ctx context.Context, configuration
200164
log("- Listing MCP tools...")
201165
capabilities, err := g.listCapabilities(ctx, configuration, serverNames)
202166
if err != nil {
203-
return nil, fmt.Errorf("listing resources: %w", err)
167+
return fmt.Errorf("listing resources: %w", err)
204168
}
205169
log(">", len(capabilities.Tools), "tools listed in", time.Since(startList))
206170

207-
return capabilities, nil
208-
}
209-
210-
func refreshCapabilities(s *server.MCPServer, c *Capabilities) {
211-
s.SetTools(c.Tools...)
212-
s.SetPrompts(c.Prompts...)
213-
s.SetResources(c.Resources...)
214-
s.RemoveAllResourceTemplates()
215-
for _, v := range c.ResourceTemplates {
216-
s.AddResourceTemplate(v.ResourceTemplate, v.Handler)
171+
// Update the server's capabilities.
172+
g.health.SetUnhealthy()
173+
mcpServer.SetTools(capabilities.Tools...)
174+
mcpServer.SetPrompts(capabilities.Prompts...)
175+
mcpServer.SetResources(capabilities.Resources...)
176+
mcpServer.RemoveAllResourceTemplates()
177+
for _, v := range capabilities.ResourceTemplates {
178+
mcpServer.AddResourceTemplate(v.ResourceTemplate, v.Handler)
217179
}
180+
g.health.SetHealthy()
181+
182+
return nil
218183
}

cmd/docker-mcp/internal/gateway/transport.go

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,15 @@ import (
77
"net/http"
88

99
"github.com/mark3labs/mcp-go/server"
10-
11-
"github.com/docker/mcp-gateway/cmd/docker-mcp/internal/sockets"
1210
)
1311

14-
func (g *Gateway) startStdioServer(ctx context.Context, newMCPServer func() *server.MCPServer, stdin io.Reader, stdout io.Writer) error {
15-
return server.NewStdioServer(newMCPServer()).Listen(ctx, stdin, stdout)
12+
func (g *Gateway) startStdioServer(ctx context.Context, mcpServer *server.MCPServer, stdin io.Reader, stdout io.Writer) error {
13+
return server.NewStdioServer(mcpServer).Listen(ctx, stdin, stdout)
1614
}
1715

18-
func (g *Gateway) startSseServer(ctx context.Context, newMCPServer func() *server.MCPServer, ln net.Listener) error {
16+
func (g *Gateway) startSseServer(ctx context.Context, mcpServer *server.MCPServer, ln net.Listener) error {
1917
mux := http.NewServeMux()
20-
sseServer := server.NewSSEServer(newMCPServer())
18+
sseServer := server.NewSSEServer(mcpServer)
2119
mux.Handle("/sse", sseServer.SSEHandler())
2220
mux.Handle("/message", sseServer.MessageHandler())
2321
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -40,9 +38,9 @@ func (g *Gateway) startSseServer(ctx context.Context, newMCPServer func() *serve
4038
return httpServer.Serve(ln)
4139
}
4240

43-
func (g *Gateway) startStreamingServer(ctx context.Context, newMCPServer func() *server.MCPServer, ln net.Listener) error {
41+
func (g *Gateway) startStreamingServer(ctx context.Context, mcpServer *server.MCPServer, ln net.Listener) error {
4442
mux := http.NewServeMux()
45-
mux.Handle("/mcp", server.NewStreamableHTTPServer(newMCPServer()))
43+
mux.Handle("/mcp", server.NewStreamableHTTPServer(mcpServer))
4644
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
4745
http.Redirect(w, r, "/mcp", http.StatusTemporaryRedirect)
4846
})
@@ -63,30 +61,3 @@ func (g *Gateway) startStreamingServer(ctx context.Context, newMCPServer func()
6361
}()
6462
return httpServer.Serve(ln)
6563
}
66-
67-
func (g *Gateway) startStdioOverTCPServer(ctx context.Context, newMCPServer func() *server.MCPServer, ln net.Listener) error {
68-
for {
69-
select {
70-
case <-ctx.Done():
71-
return ctx.Err()
72-
default:
73-
conn, err := sockets.AcceptWithContext(ctx, ln)
74-
if err != nil {
75-
if ctx.Err() != nil {
76-
return ctx.Err()
77-
}
78-
logf("Error accepting the connection: %v", err)
79-
continue
80-
}
81-
82-
newServer := server.NewStdioServer(newMCPServer())
83-
go func() {
84-
defer conn.Close()
85-
86-
if err := newServer.Listen(ctx, conn, conn); err != nil {
87-
logf("Error listening: %v", err)
88-
}
89-
}()
90-
}
91-
}
92-
}

0 commit comments

Comments
 (0)