Skip to content

Commit 07b65d7

Browse files
authored
examples/server/distributed: a stateless distributed server
Add an example of a server that proxies HTTP requests to multiple stateless streamable MCP backends. For #148
1 parent 24fc13e commit 07b65d7

File tree

3 files changed

+198
-8
lines changed

3 files changed

+198
-8
lines changed

examples/client/loadtest/main.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ var (
3333
workers = flag.Int("workers", 10, "number of concurrent workers")
3434
timeout = flag.Duration("timeout", 1*time.Second, "request timeout")
3535
qps = flag.Int("qps", 100, "tool calls per second, per worker")
36-
v = flag.Bool("v", false, "if set, enable verbose logging of results")
36+
verbose = flag.Bool("v", false, "if set, enable verbose logging")
3737
)
3838

3939
func main() {
@@ -56,8 +56,8 @@ func main() {
5656

5757
parentCtx, cancel := context.WithTimeout(context.Background(), *duration)
5858
defer cancel()
59-
parentCtx, restoreSignal := signal.NotifyContext(parentCtx, os.Interrupt)
60-
defer restoreSignal()
59+
parentCtx, stop := signal.NotifyContext(parentCtx, os.Interrupt)
60+
defer stop()
6161

6262
var (
6363
start = time.Now()
@@ -91,12 +91,12 @@ func main() {
9191
return // test ended
9292
}
9393
failure.Add(1)
94-
if *v {
94+
if *verbose {
9595
log.Printf("FAILURE: %v", err)
9696
}
9797
} else {
9898
success.Add(1)
99-
if *v {
99+
if *verbose {
100100
data, err := json.Marshal(res)
101101
if err != nil {
102102
log.Fatalf("marshalling result: %v", err)
@@ -108,7 +108,7 @@ func main() {
108108
}()
109109
}
110110
wg.Wait()
111-
restoreSignal() // call restore signal (redundantly) here to allow ctrl-c to work again
111+
stop() // restore the interrupt signal
112112

113113
// Print stats.
114114
var (
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
// Copyright 2025 The Go MCP SDK Authors. All rights reserved.
2+
// Use of this source code is governed by an MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
// The distributed command is an example of a distributed MCP server.
6+
//
7+
// It forks multiple child processes (according to the -child_ports flag), each
8+
// of which is a streamable HTTP MCP server with the 'inc' tool, and proxies
9+
// incoming http requests to them.
10+
//
11+
// Distributed MCP servers must be stateless, because there's no guarantee that
12+
// subsequent requests for a session land on the same backend. However, they
13+
// may still have logical session IDs, as can be seen with verbose logging
14+
// (-v).
15+
//
16+
// Example:
17+
//
18+
// ./distributed -http=localhost:8080 -child_ports=8081,8082
19+
package main
20+
21+
import (
22+
"context"
23+
"flag"
24+
"fmt"
25+
"log"
26+
"net/http"
27+
"net/http/httputil"
28+
"net/url"
29+
"os"
30+
"os/exec"
31+
"os/signal"
32+
"strings"
33+
"sync"
34+
"sync/atomic"
35+
"time"
36+
37+
"github.com/modelcontextprotocol/go-sdk/mcp"
38+
)
39+
40+
const childPortVar = "MCP_CHILD_PORT"
41+
42+
var (
43+
httpAddr = flag.String("http", "", "if set, use streamable HTTP at this address, instead of stdin/stdout")
44+
childPorts = flag.String("child_ports", "", "comma-separated child ports to distribute to")
45+
verbose = flag.Bool("v", false, "if set, enable verbose logging")
46+
)
47+
48+
func main() {
49+
// This command runs as either a parent or a child, depending on whether
50+
// childPortVar is set (a.k.a. the fork-and-exec trick).
51+
//
52+
// Each child is a streamable HTTP server, and the parent is a reverse proxy.
53+
flag.Parse()
54+
if v := os.Getenv(childPortVar); v != "" {
55+
child(v)
56+
} else {
57+
parent()
58+
}
59+
}
60+
61+
func parent() {
62+
exe, err := os.Executable()
63+
if err != nil {
64+
log.Fatal(err)
65+
}
66+
67+
if *httpAddr == "" {
68+
log.Fatal("must provide -http")
69+
}
70+
if *childPorts == "" {
71+
log.Fatal("must provide -child_ports")
72+
}
73+
74+
// Ensure that children are cleaned up on CTRL-C
75+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
76+
defer stop()
77+
78+
// Start the child processes.
79+
ports := strings.Split(*childPorts, ",")
80+
var wg sync.WaitGroup
81+
childURLs := make([]*url.URL, len(ports))
82+
for i, port := range ports {
83+
wg.Add(1)
84+
childURL := fmt.Sprintf("http://localhost:%s", port)
85+
childURLs[i], err = url.Parse(childURL)
86+
if err != nil {
87+
log.Fatal(err)
88+
}
89+
cmd := exec.CommandContext(ctx, exe, os.Args[1:]...)
90+
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", childPortVar, port))
91+
cmd.Stderr = os.Stderr
92+
go func() {
93+
defer wg.Done()
94+
log.Printf("starting child %d at %s", i, childURL)
95+
if err := cmd.Run(); err != nil && ctx.Err() == nil {
96+
log.Printf("child %d failed: %v", i, err)
97+
} else {
98+
log.Printf("child %d exited gracefully", i)
99+
}
100+
}()
101+
}
102+
103+
// Start a reverse proxy that round-robin's requests to each backend.
104+
var nextBackend atomic.Int64
105+
server := http.Server{
106+
Addr: *httpAddr,
107+
Handler: &httputil.ReverseProxy{
108+
Rewrite: func(r *httputil.ProxyRequest) {
109+
child := int(nextBackend.Add(1)) % len(childURLs)
110+
if *verbose {
111+
log.Printf("dispatching to localhost:%s", ports[child])
112+
}
113+
r.SetURL(childURLs[child])
114+
},
115+
},
116+
}
117+
118+
wg.Add(1)
119+
go func() {
120+
defer wg.Done()
121+
if err := server.ListenAndServe(); err != nil && ctx.Err() == nil {
122+
log.Printf("Server failed: %v", err)
123+
}
124+
}()
125+
126+
log.Printf("Serving at %s (CTRL-C to cancel)", *httpAddr)
127+
128+
<-ctx.Done()
129+
stop() // restore the interrupt signal
130+
131+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
132+
defer cancel()
133+
134+
// Attempt the graceful shutdown.
135+
if err := server.Shutdown(shutdownCtx); err != nil {
136+
log.Fatalf("Server shutdown failed: %v", err)
137+
}
138+
139+
// Wait for the subprocesses and http server to stop.
140+
wg.Wait()
141+
142+
log.Println("Server shutdown gracefully.")
143+
}
144+
145+
func child(port string) {
146+
// Create a server with a single tool that increments a counter.
147+
server := mcp.NewServer(&mcp.Implementation{Name: "counter"}, nil)
148+
149+
var count atomic.Int64
150+
inc := func(ctx context.Context, req *mcp.CallToolRequest, args struct{}) (*mcp.CallToolResult, struct{ Count int64 }, error) {
151+
n := count.Add(1)
152+
if *verbose {
153+
log.Printf("request %d (session %s)", n, req.Session.ID())
154+
}
155+
return nil, struct{ Count int64 }{n}, nil
156+
}
157+
mcp.AddTool(server, &mcp.Tool{Name: "inc"}, inc)
158+
159+
handler := mcp.NewStreamableHTTPHandler(func(*http.Request) *mcp.Server {
160+
return server
161+
}, &mcp.StreamableHTTPOptions{
162+
Stateless: true,
163+
})
164+
log.Printf("child listening on localhost:%s", port)
165+
log.Fatal(http.ListenAndServe(fmt.Sprintf("localhost:%s", port), handler))
166+
}

examples/server/everything/main.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,40 @@ import (
1111
"fmt"
1212
"log"
1313
"net/http"
14+
_ "net/http/pprof"
1415
"net/url"
1516
"os"
17+
"runtime"
1618
"strings"
1719

1820
"github.com/google/jsonschema-go/jsonschema"
1921
"github.com/modelcontextprotocol/go-sdk/mcp"
2022
)
2123

22-
var httpAddr = flag.String("http", "", "if set, use streamable HTTP at this address, instead of stdin/stdout")
24+
var (
25+
httpAddr = flag.String("http", "", "if set, use streamable HTTP at this address, instead of stdin/stdout")
26+
pprofAddr = flag.String("pprof", "", "if set, host the pprof debugging server at this address")
27+
)
2328

2429
func main() {
2530
flag.Parse()
2631

32+
if *pprofAddr != "" {
33+
// For debugging memory leaks, add an endpoint to trigger a few garbage
34+
// collection cycles and ensure the /debug/pprof/heap endpoint only reports
35+
// reachable memory.
36+
http.DefaultServeMux.Handle("/gc", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
37+
for range 3 {
38+
runtime.GC()
39+
}
40+
fmt.Fprintln(w, "GC'ed")
41+
}))
42+
go func() {
43+
// DefaultServeMux was mutated by the /debug/pprof import.
44+
http.ListenAndServe(*pprofAddr, http.DefaultServeMux)
45+
}()
46+
}
47+
2748
opts := &mcp.ServerOptions{
2849
Instructions: "Use this server!",
2950
CompletionHandler: complete, // support completions by setting this handler
@@ -56,7 +77,10 @@ func main() {
5677
return server
5778
}, nil)
5879
log.Printf("MCP handler listening at %s", *httpAddr)
59-
http.ListenAndServe(*httpAddr, handler)
80+
if *pprofAddr != "" {
81+
log.Printf("pprof listening at http://%s/debug/pprof", *pprofAddr)
82+
}
83+
log.Fatal(http.ListenAndServe(*httpAddr, handler))
6084
} else {
6185
t := &mcp.LoggingTransport{Transport: &mcp.StdioTransport{}, Writer: os.Stderr}
6286
if err := server.Run(context.Background(), t); err != nil {

0 commit comments

Comments
 (0)