@@ -24,11 +24,13 @@ import (
2424 "slices"
2525 "time"
2626
27+ "github.com/hamba/avro/v2"
2728 "github.com/mark3labs/mcp-go/mcp"
2829 "github.com/mark3labs/mcp-go/server"
2930 "github.com/sirupsen/logrus"
3031 "github.com/streamnative/streamnative-mcp-server/pkg/kafka"
3132 "github.com/twmb/franz-go/pkg/kgo"
33+ "github.com/twmb/franz-go/pkg/sr"
3234)
3335
3436var logger * logrus.Logger
@@ -52,7 +54,7 @@ func KafkaClientAddConsumeTools(s *server.MCPServer, _ bool, logrusLogger *logru
5254 "1. Basic consumption - Get 10 earliest messages from a topic:\n " +
5355 " topic: \" my-topic\" \n " +
5456 " max-messages: 10\n \n " +
55- "2. Consume from beginning - Get messages from the start of a topic :\n " +
57+ "2. Consumer group - Join an existing consumer group and resume from committed offset :\n " +
5658 " topic: \" my-topic\" \n " +
5759 " offset: \" atstart\" \n " +
5860 " max-messages: 20\n \n " +
@@ -132,6 +134,14 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
132134 timeoutSec = 10 // Default to 10 seconds
133135 }
134136
137+ group , hasGroup := optionalParam [string ](request .Params .Arguments , "group" )
138+ if ! hasGroup {
139+ group = ""
140+ }
141+ if group != "" {
142+ opts = append (opts , kgo .ConsumerGroup (group ))
143+ }
144+
135145 offsetStr , hasOffset := optionalParam [string ](request .Params .Arguments , "offset" )
136146 if ! hasOffset {
137147 offsetStr = "atstart" // Default to starting at the beginning
@@ -148,10 +158,8 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
148158 default :
149159 offset = kgo .NewOffset ().AtStart ()
150160 }
151-
152161 opts = append (opts , kgo .ConsumeResetOffset (offset ))
153-
154- logger .Infof ("Consuming from topic: %s, offset: %s, max-messages: %d, timeout: %d" , topicName , offsetStr , int (maxMessages ), int (timeoutSec ))
162+ logger .Infof ("Consuming from topic: %s, group: %s, max-messages: %d, timeout: %d" , topicName , group , int (maxMessages ), int (timeoutSec ))
155163
156164 // Create Kafka client using the new Kafka package
157165 kafkaClient , err := kafka .GetKafkaClient (opts ... )
@@ -160,6 +168,13 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
160168 }
161169 defer kafkaClient .Close ()
162170
171+ srClient , err := kafka .GetKafkaSchemaRegistryClient ()
172+ schemaReady := false
173+ var serde sr.Serde
174+ if err == nil && srClient != nil {
175+ schemaReady = true
176+ }
177+
163178 // Set timeout
164179 timeoutCtx , cancel := context .WithTimeout (ctx , time .Duration (timeoutSec )* time .Second )
165180 defer cancel ()
@@ -168,10 +183,44 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
168183 return mcp .NewToolResultError (fmt .Sprintf ("Failed to ping Kafka cluster: %v" , err )), nil
169184 }
170185
171- topics := kafkaClient .OptValue ("ConsumeTopics" )
172- logger .Infof ("Consuming from topics: %v\n " , topics )
186+ if schemaReady {
187+ subjSchema , err := srClient .SchemaByVersion (timeoutCtx , topicName + "-value" , - 1 )
188+ if err != nil {
189+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to get schema: %v" , err )), nil
190+ }
191+ logger .Infof ("Schema ID: %d" , subjSchema .ID )
192+ switch subjSchema .Type {
193+ case sr .TypeAvro :
194+ avroSchema , err := avro .Parse (subjSchema .Schema .Schema )
195+ if err != nil {
196+ return mcp .NewToolResultError (fmt .Sprintf ("Failed to parse avro schema: %v" , err )), nil
197+ }
198+ serde .Register (
199+ subjSchema .ID ,
200+ map [string ]any {},
201+ sr .EncodeFn (func (v any ) ([]byte , error ) {
202+ return avro .Marshal (avroSchema , v )
203+ }),
204+ sr .DecodeFn (func (data []byte , v any ) error {
205+ return avro .Unmarshal (avroSchema , data , v )
206+ }),
207+ )
208+ case sr .TypeJSON :
209+ serde .Register (
210+ subjSchema .ID ,
211+ map [string ]any {},
212+ sr .EncodeFn (json .Marshal ),
213+ sr .DecodeFn (json .Unmarshal ),
214+ )
215+ case sr .TypeProtobuf :
216+ default :
217+ // TODO: support other schema types
218+ logger .Infof ("Unsupported schema type: %s" , subjSchema .Type )
219+ schemaReady = false
220+ }
221+ }
173222
174- results := make ([]* kgo. Record , 0 )
223+ results := make ([]any , 0 )
175224consumerLoop:
176225 for {
177226 fetches := kafkaClient .PollRecords (timeoutCtx , int (maxMessages ))
@@ -186,13 +235,31 @@ consumerLoop:
186235
187236 for ! iter .Done () {
188237 record := iter .Next ()
189- results = append (results , record )
238+ if schemaReady {
239+ var value map [string ]any
240+ err := serde .Decode (record .Value , & value )
241+ if err != nil {
242+ logger .Infof ("Failed to decode value: %v" , err )
243+ results = append (results , record .Value )
244+ } else {
245+ results = append (results , value )
246+ }
247+ } else {
248+ results = append (results , record .Value )
249+ }
190250 if len (results ) >= int (maxMessages ) {
191251 break consumerLoop
192252 }
193253 }
194254 }
195255
256+ err = kafkaClient .CommitUncommittedOffsets (timeoutCtx )
257+ if err != nil {
258+ if err != context .Canceled {
259+ logger .Infof ("Failed to commit offsets: %v" , err )
260+ }
261+ }
262+
196263 jsonResults , err := json .Marshal (results )
197264 if err != nil {
198265 return mcp .NewToolResultError (fmt .Sprintf ("Failed to marshal results: %v" , err )), nil
0 commit comments