Skip to content

Commit 0768bbc

Browse files
committed
Refactor Pulsar Admin Broker Stats Tools with Builder Pattern
- Introduced `PulsarAdminBrokerStatsToolBuilder` to encapsulate broker statistics tools, enhancing modularity and maintainability. - Migrated tool definitions and handler logic to the new builder, improving code organization and readability. - Updated tool registration logic to utilize the builder, ensuring a unified approach for managing Pulsar broker statistics. - Enhanced error handling and added comments for better clarity and understanding of the code structure. - Maintained backward compatibility while integrating the new builder pattern.
1 parent b0443f4 commit 0768bbc

File tree

2 files changed

+243
-158
lines changed

2 files changed

+243
-158
lines changed
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package pulsar
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
25+
"github.com/mark3labs/mcp-go/mcp"
26+
"github.com/mark3labs/mcp-go/server"
27+
"github.com/streamnative/pulsarctl/pkg/cmdutils"
28+
"github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders"
29+
"github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
30+
)
31+
32+
// PulsarAdminBrokerStatsToolBuilder implements the ToolBuilder interface for Pulsar Broker Statistics
33+
type PulsarAdminBrokerStatsToolBuilder struct {
34+
*builders.BaseToolBuilder
35+
}
36+
37+
// NewPulsarAdminBrokerStatsToolBuilder creates a new Pulsar Admin Broker Stats tool builder instance
38+
func NewPulsarAdminBrokerStatsToolBuilder() *PulsarAdminBrokerStatsToolBuilder {
39+
metadata := builders.ToolMetadata{
40+
Name: "pulsar_admin_broker_stats",
41+
Version: "1.0.0",
42+
Description: "Pulsar Broker Statistics administration tools",
43+
Category: "pulsar_admin",
44+
Tags: []string{"pulsar", "broker", "stats", "admin", "monitoring"},
45+
}
46+
47+
features := []string{
48+
"pulsar-admin-brokers-status",
49+
"all",
50+
"all-pulsar",
51+
"pulsar-admin",
52+
}
53+
54+
return &PulsarAdminBrokerStatsToolBuilder{
55+
BaseToolBuilder: builders.NewBaseToolBuilder(metadata, features),
56+
}
57+
}
58+
59+
// BuildTools builds the Pulsar Admin Broker Stats tool list
60+
func (b *PulsarAdminBrokerStatsToolBuilder) BuildTools(_ context.Context, config builders.ToolBuildConfig) ([]server.ServerTool, error) {
61+
// Check features - return empty list if no required features are present
62+
if !b.HasAnyRequiredFeature(config.Features) {
63+
return nil, nil
64+
}
65+
66+
// Validate configuration
67+
if err := b.Validate(config); err != nil {
68+
return nil, err
69+
}
70+
71+
// Build tools
72+
tool := b.buildBrokerStatsTool()
73+
handler := b.buildBrokerStatsHandler(config.ReadOnly)
74+
75+
return []server.ServerTool{
76+
{
77+
Tool: tool,
78+
Handler: handler,
79+
},
80+
}, nil
81+
}
82+
83+
// buildBrokerStatsTool builds the Pulsar Admin Broker Stats MCP tool definition
84+
func (b *PulsarAdminBrokerStatsToolBuilder) buildBrokerStatsTool() mcp.Tool {
85+
resourceDesc := "Type of broker stats resource to access, available options:\n" +
86+
"- monitoring_metrics: Metrics for the broker's monitoring system\n" +
87+
"- mbeans: JVM MBeans statistics\n" +
88+
"- topics: Statistics about all topics managed by the broker\n" +
89+
"- allocator_stats: Memory allocator statistics (requires allocator_name parameter)\n" +
90+
"- load_report: Broker load information"
91+
92+
toolDesc := "Unified tool for retrieving Apache Pulsar broker statistics.\n" +
93+
"This tool provides access to various broker stats resources, including:\n" +
94+
"1. Monitoring metrics (resource=monitoring_metrics): Metrics for the broker's monitoring system\n" +
95+
"2. MBean stats (resource=mbeans): JVM MBeans statistics\n" +
96+
"3. Topics stats (resource=topics): Statistics about all topics managed by the broker\n" +
97+
"4. Allocator stats (resource=allocator_stats): Memory allocator statistics for specific allocator\n" +
98+
"5. Load report (resource=load_report): Broker load information, sometimes the load report is not available, so suggest to use other resources to get the broker metrics\n\n" +
99+
"Example: {\"resource\": \"monitoring_metrics\"} retrieves all monitoring metrics\n" +
100+
"Example: {\"resource\": \"allocator_stats\", \"allocator_name\": \"default\"} retrieves stats for the default allocator\n" +
101+
"This tool requires Pulsar super-user permissions."
102+
103+
return mcp.NewTool("pulsar_admin_broker_stats",
104+
mcp.WithDescription(toolDesc),
105+
mcp.WithString("resource", mcp.Required(),
106+
mcp.Description(resourceDesc),
107+
),
108+
mcp.WithString("allocator_name",
109+
mcp.Description("The name of the allocator to get statistics for. Required only when resource=allocator_stats"),
110+
),
111+
)
112+
}
113+
114+
// buildBrokerStatsHandler builds the Pulsar Admin Broker Stats handler function
115+
func (b *PulsarAdminBrokerStatsToolBuilder) buildBrokerStatsHandler(readOnly bool) func(context.Context, mcp.CallToolRequest) (*mcp.CallToolResult, error) {
116+
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
117+
// Get Pulsar admin client
118+
client, err := b.getPulsarAdminClient(ctx)
119+
if err != nil {
120+
return b.handleError("get admin client", err), nil
121+
}
122+
123+
// Get required resource parameter
124+
resource, err := request.RequireString("resource")
125+
if err != nil {
126+
return mcp.NewToolResultError("Missing required parameter 'resource'. " +
127+
"Please specify one of: monitoring_metrics, mbeans, topics, allocator_stats, load_report."), nil
128+
}
129+
130+
// Process request based on resource type
131+
switch resource {
132+
case "monitoring_metrics":
133+
return b.handleMonitoringMetrics(client)
134+
case "mbeans":
135+
return b.handleMBeans(client)
136+
case "topics":
137+
return b.handleTopics(client)
138+
case "allocator_stats":
139+
allocatorName, err := request.RequireString("allocator_name")
140+
if err != nil {
141+
return mcp.NewToolResultError("Missing required parameter 'allocator_name' for allocator_stats resource. " +
142+
"Please provide the name of the allocator to get statistics for."), nil
143+
}
144+
return b.handleAllocatorStats(client, allocatorName)
145+
case "load_report":
146+
return b.handleLoadReport(client)
147+
default:
148+
return mcp.NewToolResultError(fmt.Sprintf("Unsupported resource: %s. "+
149+
"Please use one of: monitoring_metrics, mbeans, topics, allocator_stats, load_report.", resource)), nil
150+
}
151+
}
152+
}
153+
154+
// Utility functions
155+
156+
// handleError provides unified error handling
157+
func (b *PulsarAdminBrokerStatsToolBuilder) handleError(operation string, err error) *mcp.CallToolResult {
158+
return mcp.NewToolResultError(fmt.Sprintf("Failed to %s: %v", operation, err))
159+
}
160+
161+
// marshalResponse provides unified JSON serialization for responses
162+
func (b *PulsarAdminBrokerStatsToolBuilder) marshalResponse(data interface{}) (*mcp.CallToolResult, error) {
163+
jsonBytes, err := json.Marshal(data)
164+
if err != nil {
165+
return b.handleError("marshal response", err), nil
166+
}
167+
return mcp.NewToolResultText(string(jsonBytes)), nil
168+
}
169+
170+
// getPulsarAdminClient retrieves the Pulsar admin client from context
171+
func (b *PulsarAdminBrokerStatsToolBuilder) getPulsarAdminClient(ctx context.Context) (cmdutils.Client, error) {
172+
// Use the same context key as in the original implementation (pkg/mcp/ctx.go)
173+
// This maintains consistency with the original approach
174+
type contextKey string
175+
const pulsarSessionContextKey contextKey = "pulsar_session"
176+
177+
session, ok := ctx.Value(pulsarSessionContextKey).(*pulsar.Session)
178+
if !ok || session == nil {
179+
return nil, fmt.Errorf("Pulsar session not found in context")
180+
}
181+
return session.GetAdminClient()
182+
}
183+
184+
// Specific operation handler functions
185+
186+
// handleMonitoringMetrics handles retrieving monitoring metrics
187+
func (b *PulsarAdminBrokerStatsToolBuilder) handleMonitoringMetrics(client cmdutils.Client) (*mcp.CallToolResult, error) {
188+
stats, err := client.BrokerStats().GetMetrics()
189+
if err != nil {
190+
return b.handleError("get monitoring metrics", err), nil
191+
}
192+
return b.marshalResponse(stats)
193+
}
194+
195+
// handleMBeans handles retrieving MBeans statistics
196+
func (b *PulsarAdminBrokerStatsToolBuilder) handleMBeans(client cmdutils.Client) (*mcp.CallToolResult, error) {
197+
stats, err := client.BrokerStats().GetMBeans()
198+
if err != nil {
199+
return b.handleError("get MBeans", err), nil
200+
}
201+
return b.marshalResponse(stats)
202+
}
203+
204+
// handleTopics handles retrieving topics statistics
205+
func (b *PulsarAdminBrokerStatsToolBuilder) handleTopics(client cmdutils.Client) (*mcp.CallToolResult, error) {
206+
stats, err := client.BrokerStats().GetTopics()
207+
if err != nil {
208+
return b.handleError("get topics stats", err), nil
209+
}
210+
return b.marshalResponse(stats)
211+
}
212+
213+
// handleAllocatorStats handles retrieving allocator statistics
214+
func (b *PulsarAdminBrokerStatsToolBuilder) handleAllocatorStats(client cmdutils.Client, allocatorName string) (*mcp.CallToolResult, error) {
215+
stats, err := client.BrokerStats().GetAllocatorStats(allocatorName)
216+
if err != nil {
217+
return b.handleError("get allocator stats", err), nil
218+
}
219+
return b.marshalResponse(stats)
220+
}
221+
222+
// handleLoadReport handles retrieving load report
223+
func (b *PulsarAdminBrokerStatsToolBuilder) handleLoadReport(client cmdutils.Client) (*mcp.CallToolResult, error) {
224+
stats, err := client.BrokerStats().GetLoadReport()
225+
if err != nil {
226+
return b.handleError("get load report", err), nil
227+
}
228+
return b.marshalResponse(stats)
229+
}

0 commit comments

Comments
 (0)