1+ // Licensed to the Apache Software Foundation (ASF) under one
2+ // or more contributor license agreements. See the NOTICE file
3+ // distributed with this work for additional information
4+ // regarding copyright ownership. The ASF licenses this file
5+ // to you under the Apache License, Version 2.0 (the
6+ // "License"); you may not use this file except in compliance
7+ // with the License. You may obtain a copy of the License at
8+ //
9+ // http://www.apache.org/licenses/LICENSE-2.0
10+ //
11+ // Unless required by applicable law or agreed to in writing,
12+ // software distributed under the License is distributed on an
13+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+ // KIND, either express or implied. See the License for the
15+ // specific language governing permissions and limitations
16+ // under the License.
17+
18+ package pulsar
19+
20+ import (
21+ "context"
22+ "encoding/json"
23+ "fmt"
24+
25+ "github.com/mark3labs/mcp-go/mcp"
26+ "github.com/mark3labs/mcp-go/server"
27+ "github.com/streamnative/pulsarctl/pkg/cmdutils"
28+ "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders"
29+ "github.com/streamnative/streamnative-mcp-server/pkg/pulsar"
30+ )
31+
32+ // PulsarAdminBrokersToolBuilder implements the ToolBuilder interface for Pulsar admin brokers
33+ type PulsarAdminBrokersToolBuilder struct {
34+ * builders.BaseToolBuilder
35+ }
36+
37+ // NewPulsarAdminBrokersToolBuilder creates a new Pulsar admin brokers tool builder instance
38+ func NewPulsarAdminBrokersToolBuilder () * PulsarAdminBrokersToolBuilder {
39+ metadata := builders.ToolMetadata {
40+ Name : "pulsar_admin_brokers" ,
41+ Version : "1.0.0" ,
42+ Description : "Pulsar admin brokers management tools" ,
43+ Category : "pulsar_admin" ,
44+ Tags : []string {"pulsar" , "admin" , "brokers" },
45+ }
46+
47+ features := []string {
48+ "pulsar-admin-brokers" ,
49+ "all" ,
50+ "all-pulsar" ,
51+ "pulsar-admin" ,
52+ }
53+
54+ return & PulsarAdminBrokersToolBuilder {
55+ BaseToolBuilder : builders .NewBaseToolBuilder (metadata , features ),
56+ }
57+ }
58+
59+ // BuildTools builds the Pulsar admin brokers tool list
60+ func (b * PulsarAdminBrokersToolBuilder ) BuildTools (_ context.Context , config builders.ToolBuildConfig ) ([]server.ServerTool , error ) {
61+ // Check features - return empty list if no required features are present
62+ if ! b .HasAnyRequiredFeature (config .Features ) {
63+ return nil , nil
64+ }
65+
66+ // Validate configuration (only validate when matching features are present)
67+ if err := b .Validate (config ); err != nil {
68+ return nil , err
69+ }
70+
71+ // Build tools
72+ tool := b .buildPulsarAdminBrokersTool ()
73+ handler := b .buildPulsarAdminBrokersHandler (config .ReadOnly )
74+
75+ return []server.ServerTool {
76+ {
77+ Tool : tool ,
78+ Handler : handler ,
79+ },
80+ }, nil
81+ }
82+
83+ // buildPulsarAdminBrokersTool builds the Pulsar admin brokers MCP tool definition
84+ func (b * PulsarAdminBrokersToolBuilder ) buildPulsarAdminBrokersTool () mcp.Tool {
85+ return mcp .NewTool ("pulsar_admin_brokers" ,
86+ mcp .WithDescription ("Unified tool for managing Apache Pulsar broker resources. This tool integrates multiple broker management functions, including:\n " +
87+ "1. List active brokers in a cluster (resource=brokers, operation=list)\n " +
88+ "2. Check broker health status (resource=health, operation=get)\n " +
89+ "3. Manage broker configurations (resource=config, operation=get/update/delete)\n " +
90+ "4. View namespaces owned by a broker (resource=namespaces, operation=get)\n \n " +
91+ "Different functions are accessed by combining resource and operation parameters, with other parameters used selectively based on operation type.\n " +
92+ "Example: {\" resource\" : \" config\" , \" operation\" : \" get\" , \" configType\" : \" dynamic\" } retrieves all dynamic configuration names.\n " +
93+ "This tool requires Pulsar super-user permissions." ),
94+ mcp .WithString ("resource" , mcp .Required (),
95+ mcp .Description ("Type of resource to access, available options:\n " +
96+ "- brokers: Manage broker listings\n " +
97+ "- health: Check broker health status\n " +
98+ "- config: Manage broker configurations\n " +
99+ "- namespaces: Manage namespaces owned by a broker" ),
100+ ),
101+ mcp .WithString ("operation" , mcp .Required (),
102+ mcp .Description ("Operation to perform, available options:\n " +
103+ "- list: List resources (used with brokers)\n " +
104+ "- get: Retrieve resource information (used with health, config, namespaces)\n " +
105+ "- update: Update a resource (used with config)\n " +
106+ "- delete: Delete a resource (used with config)" ),
107+ ),
108+ mcp .WithString ("clusterName" ,
109+ mcp .Description ("Pulsar cluster name, required for these operations:\n " +
110+ "- When resource=brokers, operation=list\n " +
111+ "- When resource=namespaces, operation=get" ),
112+ ),
113+ mcp .WithString ("brokerUrl" ,
114+ mcp .Description ("Broker URL, such as '127.0.0.1:8080', required for these operations:\n " +
115+ "- When resource=namespaces, operation=get" ),
116+ ),
117+ mcp .WithString ("configType" ,
118+ mcp .Description ("Configuration type, required when resource=config, operation=get, available options:\n " +
119+ "- dynamic: Get list of dynamically modifiable configuration names\n " +
120+ "- runtime: Get all runtime configurations (including static and dynamic configs)\n " +
121+ "- internal: Get internal configuration information\n " +
122+ "- all_dynamic: Get all dynamic configurations and their current values" ),
123+ ),
124+ mcp .WithString ("configName" ,
125+ mcp .Description ("Configuration parameter name, required for these operations:\n " +
126+ "- When resource=config, operation=update\n " +
127+ "- When resource=config, operation=delete" ),
128+ ),
129+ mcp .WithString ("configValue" ,
130+ mcp .Description ("Configuration parameter value, required for these operations:\n " +
131+ "- When resource=config, operation=update" ),
132+ ),
133+ )
134+ }
135+
136+ // buildPulsarAdminBrokersHandler builds the Pulsar admin brokers handler function
137+ func (b * PulsarAdminBrokersToolBuilder ) buildPulsarAdminBrokersHandler (readOnly bool ) func (context.Context , mcp.CallToolRequest ) (* mcp.CallToolResult , error ) {
138+ return func (ctx context.Context , request mcp.CallToolRequest ) (* mcp.CallToolResult , error ) {
139+ // Get Pulsar session from context
140+ session := b .getPulsarSession (ctx )
141+ if session == nil {
142+ return mcp .NewToolResultError ("Pulsar session not found in context" ), nil
143+ }
144+
145+ // Get admin client
146+ client , err := session .GetAdminClient ()
147+ if err != nil {
148+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to get admin client: %v" , err )), nil
149+ }
150+
151+ // Get required parameters
152+ resource , err := request .RequireString ("resource" )
153+ if err != nil {
154+ return mcp .NewToolResultError ("Missing required resource parameter. " +
155+ "Please specify one of: brokers, health, config, namespaces." ), nil
156+ }
157+
158+ operation , err := request .RequireString ("operation" )
159+ if err != nil {
160+ return mcp .NewToolResultError ("Missing required operation parameter. " +
161+ "Please specify one of: list, get, update, delete based on the resource type." ), nil
162+ }
163+
164+ // Validate if the parameter combination is valid
165+ validCombination , errMsg := b .validateResourceOperation (resource , operation )
166+ if ! validCombination {
167+ return mcp .NewToolResultError (errMsg ), nil
168+ }
169+
170+ // Process request based on resource type
171+ switch resource {
172+ case "brokers" :
173+ return b .handleBrokersResource (client , operation , request )
174+ case "health" :
175+ return b .handleHealthResource (client , operation , request )
176+ case "config" :
177+ // Check write operation permissions
178+ if (operation == "update" || operation == "delete" ) && readOnly {
179+ return mcp .NewToolResultError ("Configuration update/delete operations not allowed in read-only mode. " +
180+ "Please contact your administrator if you need to modify broker configurations." ), nil
181+ }
182+ return b .handleConfigResource (client , operation , request )
183+ case "namespaces" :
184+ return b .handleNamespacesResource (client , operation , request )
185+ default :
186+ return mcp .NewToolResultError (fmt .Sprintf ("Unsupported resource: %s. " +
187+ "Please use one of: brokers, health, config, namespaces." , resource )), nil
188+ }
189+ }
190+ }
191+
192+ // Helper functions
193+
194+ // getPulsarSession gets the Pulsar session from context
195+ // Uses the same context key as defined in pkg/mcp/ctx.go to ensure consistency
196+ func (b * PulsarAdminBrokersToolBuilder ) getPulsarSession (ctx context.Context ) * pulsar.Session {
197+ type contextKey string
198+ const pulsarSessionContextKey contextKey = "pulsar_session"
199+
200+ session , ok := ctx .Value (pulsarSessionContextKey ).(* pulsar.Session )
201+ if ! ok {
202+ return nil
203+ }
204+ return session
205+ }
206+
207+ // validateResourceOperation validates if the resource and operation combination is valid
208+ func (b * PulsarAdminBrokersToolBuilder ) validateResourceOperation (resource , operation string ) (bool , string ) {
209+ validCombinations := map [string ][]string {
210+ "brokers" : {"list" },
211+ "health" : {"get" },
212+ "config" : {"get" , "update" , "delete" },
213+ "namespaces" : {"get" },
214+ }
215+
216+ if operations , ok := validCombinations [resource ]; ok {
217+ for _ , op := range operations {
218+ if op == operation {
219+ return true , ""
220+ }
221+ }
222+ return false , fmt .Sprintf ("Invalid operation '%s' for resource '%s'. Valid operations are: %v" ,
223+ operation , resource , validCombinations [resource ])
224+ }
225+
226+ return false , fmt .Sprintf ("Invalid resource '%s'. Valid resources are: brokers, health, config, namespaces" , resource )
227+ }
228+
229+ // handleBrokersResource handles brokers resource
230+ func (b * PulsarAdminBrokersToolBuilder ) handleBrokersResource (client cmdutils.Client , operation string , request mcp.CallToolRequest ) (* mcp.CallToolResult , error ) {
231+ switch operation {
232+ case "list" :
233+ clusterName , err := request .RequireString ("clusterName" )
234+ if err != nil {
235+ return mcp .NewToolResultError ("Missing required parameter 'clusterName'. " +
236+ "Please provide the name of the Pulsar cluster to list brokers for." ), nil
237+ }
238+
239+ brokers , err := client .Brokers ().GetActiveBrokers (clusterName )
240+ if err != nil {
241+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to get active brokers: %v. " +
242+ "Please verify the cluster name and ensure the Pulsar service is running." , err )), nil
243+ }
244+
245+ brokersJSON , err := json .Marshal (brokers )
246+ if err != nil {
247+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to serialize brokers list: %v" , err )), nil
248+ }
249+
250+ return mcp .NewToolResultText (string (brokersJSON )), nil
251+ default :
252+ return mcp .NewToolResultError (fmt .Sprintf ("Unsupported operation '%s' for brokers resource. " +
253+ "The only supported operation is 'list'." , operation )), nil
254+ }
255+ }
256+
257+ // handleHealthResource handles health resource
258+ func (b * PulsarAdminBrokersToolBuilder ) handleHealthResource (client cmdutils.Client , operation string , _ mcp.CallToolRequest ) (* mcp.CallToolResult , error ) {
259+ switch operation {
260+ case "get" :
261+ //nolint:staticcheck
262+ err := client .Brokers ().HealthCheck ()
263+ if err != nil {
264+ return mcp .NewToolResultError (fmt .Sprintf ("Broker health check failed: %v. " +
265+ "The broker might be down or experiencing issues." , err )), nil
266+ }
267+ return mcp .NewToolResultText ("ok" ), nil
268+ default :
269+ return mcp .NewToolResultError (fmt .Sprintf ("Unsupported operation '%s' for health resource. " +
270+ "The only supported operation is 'get'." , operation )), nil
271+ }
272+ }
273+
274+ // handleConfigResource handles config resource
275+ func (b * PulsarAdminBrokersToolBuilder ) handleConfigResource (client cmdutils.Client , operation string , request mcp.CallToolRequest ) (* mcp.CallToolResult , error ) {
276+ switch operation {
277+ case "get" :
278+ configType , err := request .RequireString ("configType" )
279+ if err != nil {
280+ return mcp .NewToolResultError ("Missing required parameter 'configType'. " +
281+ "Please specify one of: dynamic, runtime, internal, all_dynamic." ), nil
282+ }
283+
284+ var result interface {}
285+ var fetchErr error
286+
287+ switch configType {
288+ case "dynamic" :
289+ result , fetchErr = client .Brokers ().GetDynamicConfigurationNames ()
290+ case "runtime" :
291+ result , fetchErr = client .Brokers ().GetRuntimeConfigurations ()
292+ case "internal" :
293+ result , fetchErr = client .Brokers ().GetInternalConfigurationData ()
294+ case "all_dynamic" :
295+ result , fetchErr = client .Brokers ().GetAllDynamicConfigurations ()
296+ default :
297+ return mcp .NewToolResultError (fmt .Sprintf ("Invalid config type: '%s'. " +
298+ "Valid types are: dynamic, runtime, internal, all_dynamic." , configType )), nil
299+ }
300+
301+ if fetchErr != nil {
302+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to get %s configuration: %v" , configType , fetchErr )), nil
303+ }
304+
305+ resultJSON , err := json .Marshal (result )
306+ if err != nil {
307+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to serialize configuration: %v" , err )), nil
308+ }
309+
310+ return mcp .NewToolResultText (string (resultJSON )), nil
311+
312+ case "update" :
313+ configName , err := request .RequireString ("configName" )
314+ if err != nil {
315+ return mcp .NewToolResultError ("Missing required parameter 'configName'. " +
316+ "Please provide the name of the configuration parameter to update." ), nil
317+ }
318+
319+ configValue , err := request .RequireString ("configValue" )
320+ if err != nil {
321+ return mcp .NewToolResultError ("Missing required parameter 'configValue'. " +
322+ "Please provide the new value for the configuration parameter." ), nil
323+ }
324+
325+ err = client .Brokers ().UpdateDynamicConfiguration (configName , configValue )
326+ if err != nil {
327+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to update configuration: %v. " +
328+ "Please verify the configuration name is valid and the value is of the correct type." , err )), nil
329+ }
330+
331+ return mcp .NewToolResultText (fmt .Sprintf ("Dynamic configuration '%s' updated successfully to '%s'" ,
332+ configName , configValue )), nil
333+
334+ case "delete" :
335+ configName , err := request .RequireString ("configName" )
336+ if err != nil {
337+ return mcp .NewToolResultError ("Missing required parameter 'configName'. " +
338+ "Please provide the name of the configuration parameter to delete." ), nil
339+ }
340+
341+ err = client .Brokers ().DeleteDynamicConfiguration (configName )
342+ if err != nil {
343+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to delete configuration: %v. " +
344+ "Please verify the configuration name is valid and exists." , err )), nil
345+ }
346+
347+ return mcp .NewToolResultText (fmt .Sprintf ("Dynamic configuration '%s' deleted successfully" , configName )), nil
348+
349+ default :
350+ return mcp .NewToolResultError (fmt .Sprintf ("Unsupported operation '%s' for config resource. " +
351+ "Supported operations are: get, update, delete." , operation )), nil
352+ }
353+ }
354+
355+ // handleNamespacesResource handles namespaces resource
356+ func (b * PulsarAdminBrokersToolBuilder ) handleNamespacesResource (client cmdutils.Client , operation string , request mcp.CallToolRequest ) (* mcp.CallToolResult , error ) {
357+ switch operation {
358+ case "get" :
359+ clusterName , err := request .RequireString ("clusterName" )
360+ if err != nil {
361+ return mcp .NewToolResultError ("Missing required parameter 'clusterName'. " +
362+ "Please provide the name of the Pulsar cluster." ), nil
363+ }
364+
365+ brokerURL , err := request .RequireString ("brokerUrl" )
366+ if err != nil {
367+ return mcp .NewToolResultError ("Missing required parameter 'brokerUrl'. " +
368+ "Please provide the URL of the broker (e.g., '127.0.0.1:8080')." ), nil
369+ }
370+
371+ namespaces , err := client .Brokers ().GetOwnedNamespaces (clusterName , brokerURL )
372+ if err != nil {
373+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to get owned namespaces: %v. " +
374+ "Please verify the cluster name and broker URL are correct." , err )), nil
375+ }
376+
377+ namespacesJSON , err := json .Marshal (namespaces )
378+ if err != nil {
379+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to serialize namespaces: %v" , err )), nil
380+ }
381+
382+ return mcp .NewToolResultText (string (namespacesJSON )), nil
383+
384+ default :
385+ return mcp .NewToolResultError (fmt .Sprintf ("Unsupported operation '%s' for namespaces resource. " +
386+ "The only supported operation is 'get'." , operation )), nil
387+ }
388+ }
0 commit comments