Skip to content

Commit 6bc52ce

Browse files
committed
Add warnings against using Kafka tools for Pulsar operations and vice versa in respective tool descriptions.
1 parent 4f695d7 commit 6bc52ce

File tree

6 files changed

+9
-99
lines changed

6 files changed

+9
-99
lines changed

pkg/mcp/kafka_admin_topics_tools.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func KafkaAdminAddTopicTools(s *server.MCPServer, readOnly bool, features []stri
4949
toolDesc := "Unified tool for managing Apache Kafka topics.\n" +
5050
"This tool provides access to various Kafka topic operations, including creation, deletion, listing, and configuration retrieval.\n" +
5151
"Kafka topics are distributed append-only logs that store ordered, immutable records. Topics can be partitioned for parallel processing.\n\n" +
52+
"Do not use this tool for Pulsar protocol operations. Use 'pulsar_admin_topics' instead." +
5253
"Usage Examples:\n\n" +
5354
"1. List all topics:\n" +
5455
" resource: \"topics\"\n" +

pkg/mcp/kafka_client_consume_tools.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func KafkaClientAddConsumeTools(s *server.MCPServer, _ bool, logrusLogger *logru
4747
"- Offsets track the position of consumers in each partition, allowing resumption after failures\n" +
4848
"- Partitions are independent ordered sequences of messages that enable parallel processing\n\n" +
4949
"This tool provides a temporary consumer instance for diagnostic and testing purposes. " +
50-
"It does not commit offsets back to Kafka unless the 'group' parameter is explicitly specified.\n\n" +
50+
"It does not commit offsets back to Kafka unless the 'group' parameter is explicitly specified. Do not use this tool for Pulsar protocol operations. Use 'pulsar_client_consume' instead.\n\n" +
5151
"Usage Examples:\n\n" +
5252
"1. Basic consumption - Get 10 earliest messages from a topic:\n" +
5353
" topic: \"my-topic\"\n" +

pkg/mcp/kafka_client_produce_tools.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func KafkaClientAddProduceTools(s *server.MCPServer, readOnly bool, features []s
4949
"- Messages can include a key, which determines the partition assignment (consistent hashing)\n" +
5050
"- Headers can be added to messages to include metadata without affecting the message payload\n" +
5151
"- Partitions enable parallel processing and ordered delivery within a single partition\n\n" +
52-
"This tool provides a simple producer instance for diagnostic and testing purposes.\n\n" +
52+
"This tool provides a simple producer instance for diagnostic and testing purposes. Do not use this tool for Pulsar protocol operations. Use 'pulsar_client_produce' instead.\n\n" +
5353
"Usage Examples:\n\n" +
5454
"1. Basic message production - Send a simple message to a topic:\n" +
5555
" topic: \"my-topic\"\n" +

pkg/mcp/pulsar_admin_topic_tools.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func PulsarAdminAddTopicTools(s *server.MCPServer, readOnly bool, features []str
4747
"Topics follow a hierarchical naming structure: persistent://tenant/namespace/topic. " +
4848
"This tool supports various operations on topics including creation, deletion, lookup, compaction, " +
4949
"offloading, and retrieving statistics. " +
50+
"Do not use this tool for Kafka protocol operations. Use 'kafka_admin_topics' instead." +
5051
"Most operations require namespace admin permissions."
5152

5253
resourceDesc := "Resource to operate on. Available resources:\n" +

pkg/mcp/pulsar_client_consume_tools.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func PulsarClientAddConsumerTools(s *server.MCPServer, _ bool, features []string
4141
consumeTool := mcp.NewTool("pulsar_client_consume",
4242
mcp.WithDescription("Consume messages from a Pulsar topic. This tool allows you to consume messages "+
4343
"from a specified Pulsar topic with various options to control the subscription behavior, "+
44-
"message processing, and display format."),
44+
"message processing, and display format. Do not use this tool for Kafka protocol operations. Use 'kafka_client_consume' instead."),
4545
mcp.WithString("topic", mcp.Required(),
4646
mcp.Description("Topic to consume from"),
4747
),

pkg/mcp/pulsar_client_produce_tools.go

Lines changed: 4 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ func PulsarClientAddProducerTools(s *server.MCPServer, _ bool, features []string
4242
produceTool := mcp.NewTool("pulsar_client_produce",
4343
mcp.WithDescription("Produce messages to a Pulsar topic. This tool allows you to send messages "+
4444
"to a specified Pulsar topic with various options to control message format, "+
45-
"batching, and properties."),
45+
"batching, and properties. Do not use this tool for Kafka protocol operations. Use 'kafka_client_produce' instead."),
4646
mcp.WithString("topic", mcp.Required(),
4747
mcp.Description("Topic to produce to"),
4848
),
4949
mcp.WithArray("messages",
50-
mcp.Description("Messages to send. Specify multiple times for multiple messages."),
50+
mcp.Description("Messages to send. Specify multiple times for multiple messages. IMPORTANT: Use this parameter to provide message content."),
5151
),
5252
mcp.WithArray("files",
5353
mcp.Description("Files to send as message content. Specify multiple times for multiple files."),
@@ -73,30 +73,6 @@ func PulsarClientAddProducerTools(s *server.MCPServer, _ bool, features []string
7373
mcp.WithString("key",
7474
mcp.Description("Partitioning key to add to each message"),
7575
),
76-
mcp.WithString("key-value-key",
77-
mcp.Description("Value to add as message key in KeyValue schema"),
78-
),
79-
mcp.WithString("key-value-key-file",
80-
mcp.Description("Path to file containing the value to add as message key in KeyValue schema"),
81-
),
82-
mcp.WithString("value-schema",
83-
mcp.Description("Schema type for the value (can be string, bytes, json, avro) (default: bytes)"),
84-
),
85-
mcp.WithString("key-schema",
86-
mcp.Description("Schema type for the key (can be string, bytes, json, avro) (default: string)"),
87-
),
88-
mcp.WithString("key-value-encoding-type",
89-
mcp.Description("Key Value Encoding Type (can be separated or inline)"),
90-
),
91-
mcp.WithString("encryption-key-name",
92-
mcp.Description("The public key name to encrypt payload"),
93-
),
94-
mcp.WithString("encryption-key-value",
95-
mcp.Description("The URI of public key to encrypt payload"),
96-
),
97-
mcp.WithBoolean("disable-replication",
98-
mcp.Description("Disable geo-replication for messages (default: false)"),
99-
),
10076
)
10177
s.AddTool(produceTool, handleClientProduce)
10278
}
@@ -128,7 +104,7 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp
128104
}
129105

130106
if len(messages) == 0 && len(files) == 0 {
131-
return mcp.NewToolResultError("Please supply message content with either --messages or --files"), nil
107+
return mcp.NewToolResultError("Please supply message content with 'messages' parameter."), nil
132108
}
133109

134110
numProduce := 1
@@ -170,50 +146,6 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp
170146
key = val
171147
}
172148

173-
keyValueKey := ""
174-
if val, exists := optionalParam[string](request.Params.Arguments, "key-value-key"); exists {
175-
keyValueKey = val
176-
}
177-
178-
keyValueKeyFile := ""
179-
if val, exists := optionalParam[string](request.Params.Arguments, "key-value-key-file"); exists {
180-
keyValueKeyFile = val
181-
}
182-
183-
// Read but not used due to API compatibility issues
184-
_ = ""
185-
if val, exists := optionalParam[string](request.Params.Arguments, "value-schema"); exists && val != "" {
186-
_ = val
187-
}
188-
189-
_ = ""
190-
if val, exists := optionalParam[string](request.Params.Arguments, "key-schema"); exists && val != "" {
191-
_ = val
192-
}
193-
194-
_ = ""
195-
if val, exists := optionalParam[string](request.Params.Arguments, "key-value-encoding-type"); exists {
196-
_ = val
197-
if val != "separated" && val != "inline" {
198-
return mcp.NewToolResultError("Invalid key-value-encoding-type: must be 'separated' or 'inline'"), nil
199-
}
200-
}
201-
202-
_ = ""
203-
if val, exists := optionalParam[string](request.Params.Arguments, "encryption-key-name"); exists {
204-
_ = val
205-
}
206-
207-
_ = ""
208-
if val, exists := optionalParam[string](request.Params.Arguments, "encryption-key-value"); exists {
209-
_ = val
210-
}
211-
212-
disableReplication := false
213-
if val, exists := optionalParam[bool](request.Params.Arguments, "disable-replication"); exists {
214-
disableReplication = val
215-
}
216-
217149
// Split messages by separator if needed
218150
if separator != "" && len(messages) > 0 {
219151
var splitMessages []string
@@ -269,20 +201,6 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp
269201
}
270202
}
271203

272-
// Get key value key bytes
273-
_ = []byte(nil)
274-
if keyValueKey != "" {
275-
// We'd validate but not using for now
276-
_ = []byte(keyValueKey)
277-
} else if keyValueKeyFile != "" {
278-
// We'd validate but not using for now
279-
data, err := os.ReadFile(keyValueKeyFile)
280-
if err != nil {
281-
return mcp.NewToolResultError(fmt.Sprintf("Failed to read key-value-key-file: %v", err)), nil
282-
}
283-
_ = data
284-
}
285-
286204
// Setup rate limiter
287205
var limiter *time.Ticker
288206
if rate > 0 {
@@ -294,7 +212,7 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp
294212
// Send messages
295213
numMessagesSent := 0
296214
var lastMessageID pulsar.MessageID
297-
for i := 0; i < numProduce; i++ {
215+
for range numProduce {
298216
for _, payload := range messagePayloads {
299217
// Apply rate limiting if enabled
300218
if limiter != nil {
@@ -316,16 +234,6 @@ func handleClientProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp
316234
msg.Key = key
317235
}
318236

319-
// Handle geo-replication
320-
if disableReplication {
321-
msg.DisableReplication = true
322-
}
323-
324-
// Note: KeyValue handling is commented out until correct API is confirmed
325-
// if keyValueEncodingType != "" && keyValueKeyBytes != nil {
326-
// // TODO: Implement KeyValue schema handling when API is confirmed
327-
// }
328-
329237
// Send the message
330238
msgID, err := producer.Send(ctx, msg)
331239
if err != nil {

0 commit comments

Comments
 (0)