Skip to content

Commit f44c5c6

Browse files
committed
Trace initializations with spans
1 parent 65cc8e1 commit f44c5c6

File tree

4 files changed

+168
-4
lines changed

4 files changed

+168
-4
lines changed

contrib/mark3labs/mcp-go/README.md

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
This integration provides Datadog tracing for the [mark3labs/mcp-go](https://github.com/mark3labs/mcp-go) library.
44

5+
Both hooks and middleware are used.
6+
57
## Usage
68

79
```go
@@ -12,19 +14,24 @@ import (
1214
)
1315

1416
func main() {
15-
tracer.Start()
1617
if err := tracer.Start(); err != nil {
1718
log.Fatal(err)
1819
}
1920
defer tracer.Stop()
2021

21-
srv := server.NewMCPServer("my-server", "1.0.0",
22-
server.WithToolHandlerMiddleware(mcpgotrace.NewToolHandlerMiddleware()))
23-
_ = srv
22+
// Add tracing to your server hooks
23+
hooks := &server.Hooks{}
24+
cleanup := mcpgotrace.AddServerHooks(hooks)
25+
defer cleanup()
26+
27+
srv := server.NewMCPServer("my-server", "1.0.0",
28+
server.WithHooks(hooks),
29+
server.WithToolHandlerMiddleware(mcpgotrace.NewToolHandlerMiddleware()))
2430
}
2531
```
2632

2733
## Features
2834

2935
The integration automatically traces:
3036
- **Tool calls**: Creates LLMObs tool spans with input/output annotation for all tool invocations
37+
- **Session initialization**: Create LLMObs task spans for session initialization, including client information.

contrib/mark3labs/mcp-go/example_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,13 @@ func Example() {
1515
tracer.Start()
1616
defer tracer.Stop()
1717

18+
// Create server hooks and add Datadog tracing
19+
hooks := &server.Hooks{}
20+
cleanup := mcpgotrace.AddServerHooks(hooks)
21+
defer cleanup()
22+
1823
srv := server.NewMCPServer("my-server", "1.0.0",
24+
server.WithHooks(hooks),
1925
server.WithToolHandlerMiddleware(mcpgotrace.NewToolHandlerMiddleware()))
2026
_ = srv
2127
}

contrib/mark3labs/mcp-go/mcpgo.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ package mcpgo // import "github.com/DataDog/dd-trace-go/contrib/mark3labs/mcp-go
88
import (
99
"context"
1010
"encoding/json"
11+
"time"
1112

1213
"github.com/DataDog/dd-trace-go/v2/instrumentation"
1314
"github.com/DataDog/dd-trace-go/v2/llmobs"
15+
"github.com/jellydator/ttlcache/v3"
1416

1517
"github.com/mark3labs/mcp-go/mcp"
1618
"github.com/mark3labs/mcp-go/server"
@@ -22,6 +24,27 @@ func init() {
2224
instr = instrumentation.Load(instrumentation.PackageMark3LabsMcpGo)
2325
}
2426

27+
type hooks struct {
28+
spanCache *ttlcache.Cache[any, llmobs.Span]
29+
}
30+
31+
type textIOAnnotator interface {
32+
AnnotateTextIO(input, output string, opts ...llmobs.AnnotateOption)
33+
}
34+
35+
// AddServerHooks appends Datadog tracing hooks to an existing server.Hooks object.
36+
// Returns a cleanup function that should be called upon server shutdown.
37+
func AddServerHooks(hooks *server.Hooks) func() {
38+
ddHooks := newHooks()
39+
hooks.AddBeforeInitialize(ddHooks.onBeforeInitialize)
40+
hooks.AddAfterInitialize(ddHooks.onAfterInitialize)
41+
hooks.AddOnError(ddHooks.onError)
42+
43+
return func() {
44+
ddHooks.stop()
45+
}
46+
}
47+
2548
func NewToolHandlerMiddleware() server.ToolHandlerMiddleware {
2649
return func(next server.ToolHandlerFunc) server.ToolHandlerFunc {
2750
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
@@ -48,3 +71,63 @@ func NewToolHandlerMiddleware() server.ToolHandlerMiddleware {
4871
}
4972
}
5073
}
74+
75+
func newHooks() *hooks {
76+
spanCache := ttlcache.New[any, llmobs.Span](
77+
ttlcache.WithTTL[any, llmobs.Span](5 * time.Minute),
78+
)
79+
spanCache.OnEviction(func(ctx context.Context, reason ttlcache.EvictionReason, item *ttlcache.Item[any, llmobs.Span]) {
80+
if span := item.Value(); span != nil {
81+
if reason == ttlcache.EvictionReasonExpired {
82+
span.Finish()
83+
}
84+
}
85+
})
86+
go spanCache.Start()
87+
88+
return &hooks{
89+
spanCache: spanCache,
90+
}
91+
}
92+
93+
func (h *hooks) onBeforeInitialize(ctx context.Context, id any, request *mcp.InitializeRequest) {
94+
taskSpan, _ := llmobs.StartTaskSpan(ctx, "mcp.initialize")
95+
h.spanCache.Set(id, taskSpan, ttlcache.DefaultTTL)
96+
}
97+
98+
func (h *hooks) onAfterInitialize(ctx context.Context, id any, request *mcp.InitializeRequest, result *mcp.InitializeResult) {
99+
finishSpanWithIO(h, id, request, result)
100+
}
101+
102+
func (h *hooks) onError(ctx context.Context, id any, method mcp.MCPMethod, message any, err error) {
103+
if method == mcp.MethodInitialize {
104+
if item := h.spanCache.Get(id); item != nil {
105+
span := item.Value()
106+
if annotator, ok := span.(textIOAnnotator); ok {
107+
inputJSON, _ := json.Marshal(message)
108+
annotator.AnnotateTextIO(string(inputJSON), err.Error())
109+
span.Finish(llmobs.WithError(err))
110+
}
111+
h.spanCache.Delete(id)
112+
}
113+
}
114+
}
115+
116+
func (h *hooks) stop() {
117+
h.spanCache.Stop()
118+
}
119+
120+
func finishSpanWithIO[Req any, Res any](h *hooks, id any, request Req, result Res) {
121+
if item := h.spanCache.Get(id); item != nil {
122+
span := item.Value()
123+
if annotator, ok := span.(textIOAnnotator); ok {
124+
inputJSON, _ := json.Marshal(request)
125+
resultJSON, _ := json.Marshal(result)
126+
outputText := string(resultJSON)
127+
128+
annotator.AnnotateTextIO(string(inputJSON), outputText)
129+
span.Finish()
130+
}
131+
h.spanCache.Delete(id)
132+
}
133+
}

contrib/mark3labs/mcp-go/mcpgo_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,74 @@ func TestNewToolHandlerMiddleware(t *testing.T) {
2929
assert.NotNil(t, middleware)
3030
}
3131

32+
func TestAddServerHooks(t *testing.T) {
33+
mt := mocktracer.Start()
34+
defer mt.Stop()
35+
36+
serverHooks := &server.Hooks{}
37+
cleanup := AddServerHooks(serverHooks)
38+
defer cleanup()
39+
40+
assert.Len(t, serverHooks.OnBeforeInitialize, 1)
41+
assert.Len(t, serverHooks.OnAfterInitialize, 1)
42+
assert.Len(t, serverHooks.OnError, 1)
43+
}
44+
45+
// Integration Tests
46+
47+
func TestIntegrationSessionInitialize(t *testing.T) {
48+
tt := testTracer(t)
49+
defer tt.Stop()
50+
51+
hooks := &server.Hooks{}
52+
cleanup := AddServerHooks(hooks)
53+
defer cleanup()
54+
55+
srv := server.NewMCPServer("test-server", "1.0.0",
56+
server.WithHooks(hooks))
57+
58+
ctx := context.Background()
59+
initRequest := `{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test-client","version":"1.0.0"}}}`
60+
61+
response := srv.HandleMessage(ctx, []byte(initRequest))
62+
assert.NotNil(t, response)
63+
64+
responseBytes, err := json.Marshal(response)
65+
require.NoError(t, err)
66+
67+
var resp map[string]interface{}
68+
err = json.Unmarshal(responseBytes, &resp)
69+
require.NoError(t, err)
70+
assert.Equal(t, "2.0", resp["jsonrpc"])
71+
assert.Equal(t, float64(1), resp["id"])
72+
assert.NotNil(t, resp["result"])
73+
74+
spans := tt.WaitForLLMObsSpans(t, 1)
75+
require.Len(t, spans, 1)
76+
77+
taskSpan := spans[0]
78+
assert.Equal(t, "mcp.initialize", taskSpan.Name)
79+
assert.Equal(t, "task", taskSpan.Meta["span.kind"])
80+
81+
assert.Contains(t, taskSpan.Meta, "input")
82+
assert.Contains(t, taskSpan.Meta, "output")
83+
84+
inputMeta := taskSpan.Meta["input"]
85+
assert.NotNil(t, inputMeta)
86+
inputJSON, err := json.Marshal(inputMeta)
87+
require.NoError(t, err)
88+
inputStr := string(inputJSON)
89+
assert.Contains(t, inputStr, "2024-11-05")
90+
assert.Contains(t, inputStr, "test-client")
91+
92+
outputMeta := taskSpan.Meta["output"]
93+
assert.NotNil(t, outputMeta)
94+
outputJSON, err := json.Marshal(outputMeta)
95+
require.NoError(t, err)
96+
outputStr := string(outputJSON)
97+
assert.Contains(t, outputStr, "serverInfo")
98+
}
99+
32100
func TestIntegrationToolCallSuccess(t *testing.T) {
33101
tt := testTracer(t)
34102
defer tt.Stop()

0 commit comments

Comments
 (0)