Skip to content

Commit f2d5129

Browse files
committed
Refactor Kafka Client Tools with Builder Pattern
- Introduced builder pattern for Kafka client consume and produce tools, enhancing modularity and maintainability. - Migrated tool definitions for Kafka consumption and production to respective builders, improving code organization. - Updated tool registration logic to utilize the new builders, ensuring a unified approach for managing Kafka operations. - Added comprehensive tests for the new builders, ensuring robust functionality and adherence to requirements. - Enhanced error handling and added comments for better clarity and understanding of the code structure.
1 parent 7b72ff2 commit f2d5129

File tree

9 files changed

+1474
-1057
lines changed

9 files changed

+1474
-1057
lines changed

pkg/mcp/builders/kafka/consume.go

Lines changed: 355 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,355 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kafka
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"time"
25+
26+
"github.com/hamba/avro/v2"
27+
"github.com/mark3labs/mcp-go/mcp"
28+
"github.com/mark3labs/mcp-go/server"
29+
"github.com/sirupsen/logrus"
30+
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
31+
"github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders"
32+
"github.com/twmb/franz-go/pkg/kgo"
33+
"github.com/twmb/franz-go/pkg/sr"
34+
)
35+
36+
// KafkaConsumeToolBuilder implements the ToolBuilder interface for Kafka client consume operations
37+
// It provides functionality to build Kafka consumer tools
38+
type KafkaConsumeToolBuilder struct {
39+
*builders.BaseToolBuilder
40+
logger *logrus.Logger
41+
}
42+
43+
// NewKafkaConsumeToolBuilder creates a new Kafka consume tool builder instance
44+
func NewKafkaConsumeToolBuilder() *KafkaConsumeToolBuilder {
45+
metadata := builders.ToolMetadata{
46+
Name: "kafka_consume",
47+
Version: "1.0.0",
48+
Description: "Kafka client consume tools",
49+
Category: "kafka_client",
50+
Tags: []string{"kafka", "client", "consume"},
51+
}
52+
53+
features := []string{
54+
"kafka-client",
55+
"all",
56+
"all-kafka",
57+
}
58+
59+
return &KafkaConsumeToolBuilder{
60+
BaseToolBuilder: builders.NewBaseToolBuilder(metadata, features),
61+
}
62+
}
63+
64+
// BuildTools builds the Kafka consume tool list
65+
// This is the core method implementing the ToolBuilder interface
66+
func (b *KafkaConsumeToolBuilder) BuildTools(_ context.Context, config builders.ToolBuildConfig) ([]server.ServerTool, error) {
67+
// Check features - return empty list if no required features are present
68+
if !b.HasAnyRequiredFeature(config.Features) {
69+
return nil, nil
70+
}
71+
72+
// Validate configuration (only validate when matching features are present)
73+
if err := b.Validate(config); err != nil {
74+
return nil, err
75+
}
76+
77+
// Extract logger from options if provided
78+
if loggerOpt, ok := config.Options["logger"]; ok {
79+
if logger, ok := loggerOpt.(*logrus.Logger); ok {
80+
b.logger = logger
81+
}
82+
}
83+
84+
// Build tools
85+
tool := b.buildKafkaConsumeTool()
86+
handler := b.buildKafkaConsumeHandler()
87+
88+
return []server.ServerTool{
89+
{
90+
Tool: tool,
91+
Handler: handler,
92+
},
93+
}, nil
94+
}
95+
96+
// buildKafkaConsumeTool builds the Kafka consume MCP tool definition
97+
// Migrated from the original tool definition logic
98+
func (b *KafkaConsumeToolBuilder) buildKafkaConsumeTool() mcp.Tool {
99+
toolDesc := "Consume messages from a Kafka topic.\n" +
100+
"This tool allows you to read messages from Kafka topics, specifying various consumption parameters.\n\n" +
101+
"Kafka Consumer Concepts:\n" +
102+
"- Consumers read data from Kafka topics, which can be spread across multiple partitions\n" +
103+
"- Consumer Groups enable multiple consumers to cooperatively process messages from the same topic\n" +
104+
"- Offsets track the position of consumers in each partition, allowing resumption after failures\n" +
105+
"- Partitions are independent ordered sequences of messages that enable parallel processing\n\n" +
106+
"This tool provides a temporary consumer instance for diagnostic and testing purposes. " +
107+
"It does not commit offsets back to Kafka unless the 'group' parameter is explicitly specified. Do not use this tool for Pulsar protocol operations. Use 'pulsar_client_consume' instead.\n\n" +
108+
"Usage Examples:\n\n" +
109+
"1. Basic consumption - Get 10 earliest messages from a topic:\n" +
110+
" topic: \"my-topic\"\n" +
111+
" max-messages: 10\n\n" +
112+
"2. Consumer group - Join an existing consumer group and resume from committed offset:\n" +
113+
" topic: \"my-topic\"\n" +
114+
" offset: \"atstart\"\n" +
115+
" max-messages: 20\n\n" +
116+
"3. Consumer group - Join an existing consumer group and resume from committed offset:\n" +
117+
" topic: \"my-topic\"\n" +
118+
" group: \"my-consumer-group\"\n" +
119+
" offset: \"atcommitted\"\n\n" +
120+
"4. Time-limited consumption - Set a longer timeout for slow topics:\n" +
121+
" topic: \"my-topic\"\n" +
122+
" max-messages: 100\n" +
123+
" timeout: 30\n\n" +
124+
"This tool requires Kafka consumer permissions on the specified topic."
125+
126+
return mcp.NewTool("kafka_client_consume",
127+
mcp.WithDescription(toolDesc),
128+
mcp.WithString("topic", mcp.Required(),
129+
mcp.Description("The name of the Kafka topic to consume messages from. "+
130+
"Must be an existing topic that the user has read permissions for. "+
131+
"For partitioned topics, this will consume from all partitions unless a specific partition is specified."),
132+
),
133+
mcp.WithString("group",
134+
mcp.Description("The consumer group ID to use for consumption. "+
135+
"Optional. If provided, the consumer will join this consumer group and track offsets with Kafka. "+
136+
"If not provided, a random group ID will be generated, and offsets won't be committed back to Kafka. "+
137+
"Using a meaningful group ID is important when you want to resume consumption later or coordinate multiple consumers."),
138+
),
139+
mcp.WithString("offset",
140+
mcp.Description("The offset position to start consuming from. "+
141+
"Optional. Must be one of these values:\n"+
142+
"- 'atstart': Begin from the earliest available message in the topic/partition\n"+
143+
"- 'atend': Begin from the next message that arrives after the consumer starts\n"+
144+
"- 'atcommitted': Begin from the last committed offset (only works with specified 'group')\n"+
145+
"Default: 'atstart'"),
146+
),
147+
mcp.WithNumber("max-messages",
148+
mcp.Description("Maximum number of messages to consume in this request. "+
149+
"Optional. Limits the total number of messages returned, across all partitions if no specific partition is specified. "+
150+
"Higher values retrieve more data but may increase response time and size. "+
151+
"Default: 10"),
152+
),
153+
mcp.WithNumber("timeout",
154+
mcp.Description("Maximum time in seconds to wait for messages. "+
155+
"Optional. The consumer will wait up to this long to collect the requested number of messages. "+
156+
"If fewer than 'max-messages' are available within this time, the available messages are returned. "+
157+
"Longer timeouts are useful for low-volume topics or when consuming with 'atend'. "+
158+
"Default: 10 seconds"),
159+
),
160+
)
161+
}
162+
163+
// buildKafkaConsumeHandler builds the Kafka consume handler function
164+
// Migrated from the original handler logic
165+
func (b *KafkaConsumeToolBuilder) buildKafkaConsumeHandler() func(context.Context, mcp.CallToolRequest) (*mcp.CallToolResult, error) {
166+
return func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
167+
opts := []kgo.Opt{}
168+
// Get required parameters
169+
topicName, err := request.RequireString("topic")
170+
if err != nil {
171+
return b.handleError("get topic name", err), nil
172+
}
173+
opts = append(opts, kgo.ConsumeTopics(topicName))
174+
175+
opts = append(opts, kgo.FetchIsolationLevel(kgo.ReadUncommitted()))
176+
opts = append(opts, kgo.KeepRetryableFetchErrors())
177+
if b.logger != nil {
178+
w := b.logger.Writer()
179+
opts = append(opts, kgo.WithLogger(kgo.BasicLogger(w, kgo.LogLevelInfo, nil)))
180+
}
181+
maxMessages := request.GetFloat("max-messages", 10)
182+
183+
timeoutSec := request.GetFloat("timeout", 10)
184+
185+
group := request.GetString("group", "")
186+
if group != "" {
187+
opts = append(opts, kgo.ConsumerGroup(group))
188+
}
189+
190+
offsetStr := request.GetString("offset", "atstart")
191+
192+
var offset kgo.Offset
193+
switch offsetStr {
194+
case "atstart":
195+
offset = kgo.NewOffset().AtStart()
196+
case "atend":
197+
offset = kgo.NewOffset().AtEnd()
198+
case "atcommitted":
199+
offset = kgo.NewOffset().AtCommitted()
200+
default:
201+
offset = kgo.NewOffset().AtStart()
202+
}
203+
opts = append(opts, kgo.ConsumeResetOffset(offset))
204+
if b.logger != nil {
205+
b.logger.Infof("Consuming from topic: %s, group: %s, max-messages: %d, timeout: %d", topicName, group, int(maxMessages), int(timeoutSec))
206+
}
207+
208+
// Get Kafka session from context
209+
session, err := b.getKafkaSession(ctx)
210+
if err != nil {
211+
return b.handleError("get Kafka session", err), nil
212+
}
213+
214+
// Create Kafka client using the session
215+
kafkaClient, err := session.GetClient(opts...)
216+
if err != nil {
217+
return b.handleError("create Kafka client", err), nil
218+
}
219+
defer kafkaClient.Close()
220+
221+
srClient, err := session.GetSchemaRegistryClient()
222+
schemaReady := false
223+
var serde sr.Serde
224+
if err == nil && srClient != nil {
225+
schemaReady = true
226+
}
227+
228+
// Set timeout
229+
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
230+
defer cancel()
231+
232+
if err = kafkaClient.Ping(timeoutCtx); err != nil { // check connectivity to cluster
233+
return b.handleError("ping Kafka cluster", err), nil
234+
}
235+
236+
if schemaReady {
237+
subjSchema, err := srClient.SchemaByVersion(timeoutCtx, topicName+"-value", -1)
238+
if err != nil {
239+
return b.handleError("get schema", err), nil
240+
}
241+
if b.logger != nil {
242+
b.logger.Infof("Schema ID: %d", subjSchema.ID)
243+
}
244+
switch subjSchema.Type {
245+
case sr.TypeAvro:
246+
avroSchema, err := avro.Parse(subjSchema.Schema.Schema)
247+
if err != nil {
248+
return b.handleError("parse avro schema", err), nil
249+
}
250+
serde.Register(
251+
subjSchema.ID,
252+
map[string]any{},
253+
sr.EncodeFn(func(v any) ([]byte, error) {
254+
return avro.Marshal(avroSchema, v)
255+
}),
256+
sr.DecodeFn(func(data []byte, v any) error {
257+
return avro.Unmarshal(avroSchema, data, v)
258+
}),
259+
)
260+
case sr.TypeJSON:
261+
serde.Register(
262+
subjSchema.ID,
263+
map[string]any{},
264+
sr.EncodeFn(json.Marshal),
265+
sr.DecodeFn(json.Unmarshal),
266+
)
267+
case sr.TypeProtobuf:
268+
default:
269+
// TODO: support other schema types
270+
if b.logger != nil {
271+
b.logger.Infof("Unsupported schema type: %s", subjSchema.Type)
272+
}
273+
schemaReady = false
274+
}
275+
}
276+
277+
results := make([]any, 0)
278+
consumerLoop:
279+
for {
280+
fetches := kafkaClient.PollRecords(timeoutCtx, int(maxMessages))
281+
iter := fetches.RecordIter()
282+
if b.logger != nil {
283+
b.logger.Infof("NumRecords: %d\n", fetches.NumRecords())
284+
}
285+
286+
for _, fetchErr := range fetches.Errors() {
287+
if b.logger != nil {
288+
b.logger.Infof("error consuming from topic: topic=%s, partition=%d, err=%v\n",
289+
fetchErr.Topic, fetchErr.Partition, fetchErr.Err)
290+
}
291+
break consumerLoop
292+
}
293+
294+
for !iter.Done() {
295+
record := iter.Next()
296+
if schemaReady {
297+
var value map[string]any
298+
err := serde.Decode(record.Value, &value)
299+
if err != nil {
300+
if b.logger != nil {
301+
b.logger.Infof("Failed to decode value: %v", err)
302+
}
303+
results = append(results, record.Value)
304+
} else {
305+
results = append(results, value)
306+
}
307+
} else {
308+
results = append(results, record.Value)
309+
}
310+
if len(results) >= int(maxMessages) {
311+
break consumerLoop
312+
}
313+
}
314+
}
315+
316+
err = kafkaClient.CommitUncommittedOffsets(timeoutCtx)
317+
if err != nil {
318+
if err != context.Canceled && b.logger != nil {
319+
b.logger.Infof("Failed to commit offsets: %v", err)
320+
}
321+
}
322+
323+
return b.marshalResponse(results)
324+
}
325+
}
326+
327+
// Unified error handling and utility functions
328+
329+
// handleError provides unified error handling
330+
func (b *KafkaConsumeToolBuilder) handleError(operation string, err error) *mcp.CallToolResult {
331+
return mcp.NewToolResultError(fmt.Sprintf("Failed to %s: %v", operation, err))
332+
}
333+
334+
// marshalResponse provides unified JSON serialization for responses
335+
func (b *KafkaConsumeToolBuilder) marshalResponse(data interface{}) (*mcp.CallToolResult, error) {
336+
jsonBytes, err := json.Marshal(data)
337+
if err != nil {
338+
return b.handleError("marshal response", err), nil
339+
}
340+
return mcp.NewToolResultText(string(jsonBytes)), nil
341+
}
342+
343+
// getKafkaSession retrieves the Kafka session from context
344+
func (b *KafkaConsumeToolBuilder) getKafkaSession(ctx context.Context) (*kafka.Session, error) {
345+
// Get Kafka session from context using the same key as in ctx.go
346+
// We define the key locally to avoid circular import
347+
type contextKey string
348+
const kafkaSessionContextKey contextKey = "kafka_session"
349+
350+
session, ok := ctx.Value(kafkaSessionContextKey).(*kafka.Session)
351+
if !ok || session == nil {
352+
return nil, fmt.Errorf("Kafka session not found in context")
353+
}
354+
return session, nil
355+
}

0 commit comments

Comments
 (0)