Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/99designs/keyring v1.2.2
github.com/apache/pulsar-client-go v0.13.1
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/hamba/avro/v2 v2.28.0
github.com/mark3labs/mcp-go v0.26.0
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -43,7 +44,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hamba/avro/v2 v2.26.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down Expand Up @@ -75,10 +75,11 @@ require (
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/mod v0.20.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/term v0.28.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/mod v0.23.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)

Expand Down
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hamba/avro/v2 v2.26.0 h1:IaT5l6W3zh7K67sMrT2+RreJyDTllBGVJm4+Hedk9qE=
github.com/hamba/avro/v2 v2.26.0/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0=
github.com/hamba/avro/v2 v2.28.0 h1:E8J5D27biyAulWKNiEBhV85QPc9xRMCUCGJewS0KYCE=
github.com/hamba/avro/v2 v2.28.0/go.mod h1:9TVrlt1cG1kkTUtm9u2eO5Qb7rZXlYzoKqPt8TSH+TA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -251,16 +251,16 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0=
golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM=
golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70=
golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -272,16 +272,16 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y=
golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
13 changes: 4 additions & 9 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,18 @@ go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kT
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 h1:B6caxRw+hozq68X2MY7jEpZh/cr4/aHLv9xU8Kkadrw=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44=
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
Expand All @@ -163,6 +156,8 @@ golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d h1:W07d4xkoAUSNOkOzdzXCdFGxT7o2rW4q8M34tB2i//k=
golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24=
golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ=
golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY=
golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
google.golang.org/api v0.30.0 h1:yfrXXP61wVuLb0vBcG6qaOoIoqYEzOQS8jum51jkv2w=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
Expand Down
14 changes: 12 additions & 2 deletions pkg/mcp/kafka_admin_groups_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
mcp.Description("The name of the Kafka topic to operate on. "+
"Required for the 'delete' operation. "+
"Must be an existing topic name in the Kafka cluster.")),
mcp.WithString("partition",
mcp.WithNumber("partition",
mcp.Description("The partition number to set offset for. "+
"Required for the 'set-offset' operation. "+
"Must be a valid partition number for the specified topic.")),
Expand Down Expand Up @@ -191,7 +191,17 @@ func handleKafkaGroupDescribe(ctx context.Context, admin *kadm.Client, request m
return mcp.NewToolResultError(fmt.Sprintf("Failed to describe group: %v", err)), nil
}

jsonBytes, err := json.Marshal(response)
lags, err := admin.Lag(ctx, groupName)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get lag: %v", err)), nil
}

result := map[string]interface{}{
"group": response,
"lag": lags,
}

jsonBytes, err := json.Marshal(result)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil
}
Expand Down
83 changes: 75 additions & 8 deletions pkg/mcp/kafka_client_consume_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ import (
"slices"
"time"

"github.com/hamba/avro/v2"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
"github.com/sirupsen/logrus"
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sr"
)

var logger *logrus.Logger
Expand All @@ -52,7 +54,7 @@ func KafkaClientAddConsumeTools(s *server.MCPServer, _ bool, logrusLogger *logru
"1. Basic consumption - Get 10 earliest messages from a topic:\n" +
" topic: \"my-topic\"\n" +
" max-messages: 10\n\n" +
"2. Consume from beginning - Get messages from the start of a topic:\n" +
"2. Consumer group - Join an existing consumer group and resume from committed offset:\n" +
" topic: \"my-topic\"\n" +
" offset: \"atstart\"\n" +
" max-messages: 20\n\n" +
Expand Down Expand Up @@ -132,6 +134,14 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
timeoutSec = 10 // Default to 10 seconds
}

group, hasGroup := optionalParam[string](request.Params.Arguments, "group")
if !hasGroup {
group = ""
}
if group != "" {
opts = append(opts, kgo.ConsumerGroup(group))
}

offsetStr, hasOffset := optionalParam[string](request.Params.Arguments, "offset")
if !hasOffset {
offsetStr = "atstart" // Default to starting at the beginning
Expand All @@ -148,10 +158,8 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
default:
offset = kgo.NewOffset().AtStart()
}

opts = append(opts, kgo.ConsumeResetOffset(offset))

logger.Infof("Consuming from topic: %s, offset: %s, max-messages: %d, timeout: %d", topicName, offsetStr, int(maxMessages), int(timeoutSec))
logger.Infof("Consuming from topic: %s, group: %s, max-messages: %d, timeout: %d", topicName, group, int(maxMessages), int(timeoutSec))

// Create Kafka client using the new Kafka package
kafkaClient, err := kafka.GetKafkaClient(opts...)
Expand All @@ -160,6 +168,13 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
}
defer kafkaClient.Close()

srClient, err := kafka.GetKafkaSchemaRegistryClient()
schemaReady := false
var serde sr.Serde
if err == nil && srClient != nil {
schemaReady = true
}

// Set timeout
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second)
defer cancel()
Expand All @@ -168,10 +183,44 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp.
return mcp.NewToolResultError(fmt.Sprintf("Failed to ping Kafka cluster: %v", err)), nil
}

topics := kafkaClient.OptValue("ConsumeTopics")
logger.Infof("Consuming from topics: %v\n", topics)
if schemaReady {
subjSchema, err := srClient.SchemaByVersion(timeoutCtx, topicName+"-value", -1)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get schema: %v", err)), nil
}
logger.Infof("Schema ID: %d", subjSchema.ID)
switch subjSchema.Type {
case sr.TypeAvro:
avroSchema, err := avro.Parse(subjSchema.Schema.Schema)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to parse avro schema: %v", err)), nil
}
serde.Register(
subjSchema.ID,
map[string]any{},
sr.EncodeFn(func(v any) ([]byte, error) {
return avro.Marshal(avroSchema, v)
}),
sr.DecodeFn(func(data []byte, v any) error {
return avro.Unmarshal(avroSchema, data, v)
}),
)
case sr.TypeJSON:
serde.Register(
subjSchema.ID,
map[string]any{},
sr.EncodeFn(json.Marshal),
sr.DecodeFn(json.Unmarshal),
)
case sr.TypeProtobuf:
default:
// TODO: support other schema types
logger.Infof("Unsupported schema type: %s", subjSchema.Type)
schemaReady = false
}
}

results := make([]*kgo.Record, 0)
results := make([]any, 0)
consumerLoop:
for {
fetches := kafkaClient.PollRecords(timeoutCtx, int(maxMessages))
Expand All @@ -186,13 +235,31 @@ consumerLoop:

for !iter.Done() {
record := iter.Next()
results = append(results, record)
if schemaReady {
var value map[string]any
err := serde.Decode(record.Value, &value)
if err != nil {
logger.Infof("Failed to decode value: %v", err)
results = append(results, record.Value)
} else {
results = append(results, value)
}
} else {
results = append(results, record.Value)
}
if len(results) >= int(maxMessages) {
break consumerLoop
}
}
}

err = kafkaClient.CommitUncommittedOffsets(timeoutCtx)
if err != nil {
if err != context.Canceled {
logger.Infof("Failed to commit offsets: %v", err)
}
}

jsonResults, err := json.Marshal(results)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal results: %v", err)), nil
Expand Down
Loading