diff --git a/go.mod b/go.mod index e9b9c7b..ec12368 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/99designs/keyring v1.2.2 github.com/apache/pulsar-client-go v0.13.1 github.com/dgrijalva/jwt-go v3.2.0+incompatible + github.com/hamba/avro/v2 v2.28.0 github.com/mark3labs/mcp-go v0.26.0 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 @@ -43,7 +44,6 @@ require ( github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect - github.com/hamba/avro/v2 v2.26.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -75,10 +75,11 @@ require ( github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect github.com/yosida95/uritemplate/v3 v3.0.2 // indirect go.uber.org/atomic v1.11.0 // indirect - golang.org/x/crypto v0.32.0 // indirect - golang.org/x/mod v0.20.0 // indirect - golang.org/x/sys v0.29.0 // indirect - golang.org/x/term v0.28.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/mod v0.23.0 // indirect + golang.org/x/net v0.38.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/term v0.30.0 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/go.sum b/go.sum index 77d7cd0..0f4bfb7 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU= github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= -github.com/hamba/avro/v2 v2.26.0 h1:IaT5l6W3zh7K67sMrT2+RreJyDTllBGVJm4+Hedk9qE= -github.com/hamba/avro/v2 v2.26.0/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0= +github.com/hamba/avro/v2 v2.28.0 h1:E8J5D27biyAulWKNiEBhV85QPc9xRMCUCGJewS0KYCE= +github.com/hamba/avro/v2 v2.28.0/go.mod h1:9TVrlt1cG1kkTUtm9u2eO5Qb7rZXlYzoKqPt8TSH+TA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -251,16 +251,16 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= -golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= -golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/mod v0.23.0 h1:Zb7khfcRGKk+kqfxFaP5tZqCnDZMjC5VtUBs87Hr6QM= +golang.org/x/mod v0.23.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70= golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -272,16 +272,16 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= -golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= +golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= +golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/go.work.sum b/go.work.sum index 8bed584..3317a42 100644 --- a/go.work.sum +++ b/go.work.sum @@ -136,25 +136,18 @@ go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kT go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= -golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= -golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6 h1:QE6XYQK6naiK1EPAe1g/ILLxN5RBoH5xkJk3CqlMI/Y= golang.org/x/image v0.0.0-20190802002840-cff245a6509b h1:+qEpEAPhDZ1o0x3tHzZTQDArnOixOzGD9HUJfcg0mb4= golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG1In4gJY5YRhtpDNeDeHWs= golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= -golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642 h1:B6caxRw+hozq68X2MY7jEpZh/cr4/aHLv9xU8Kkadrw= -golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= -golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= -golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -163,6 +156,8 @@ golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d h1:W07d4xkoAUSNOkOzdzXCdFGxT7o2rW4q8M34tB2i//k= golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= +golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= +golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= google.golang.org/api v0.30.0 h1:yfrXXP61wVuLb0vBcG6qaOoIoqYEzOQS8jum51jkv2w= google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= diff --git a/pkg/mcp/kafka_admin_groups_tools.go b/pkg/mcp/kafka_admin_groups_tools.go index 31f3bd6..b4ab028 100644 --- a/pkg/mcp/kafka_admin_groups_tools.go +++ b/pkg/mcp/kafka_admin_groups_tools.go @@ -109,7 +109,7 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str mcp.Description("The name of the Kafka topic to operate on. "+ "Required for the 'delete' operation. "+ "Must be an existing topic name in the Kafka cluster.")), - mcp.WithString("partition", + mcp.WithNumber("partition", mcp.Description("The partition number to set offset for. "+ "Required for the 'set-offset' operation. "+ "Must be a valid partition number for the specified topic.")), @@ -191,7 +191,17 @@ func handleKafkaGroupDescribe(ctx context.Context, admin *kadm.Client, request m return mcp.NewToolResultError(fmt.Sprintf("Failed to describe group: %v", err)), nil } - jsonBytes, err := json.Marshal(response) + lags, err := admin.Lag(ctx, groupName) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get lag: %v", err)), nil + } + + result := map[string]interface{}{ + "group": response, + "lag": lags, + } + + jsonBytes, err := json.Marshal(result) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil } diff --git a/pkg/mcp/kafka_client_consume_tools.go b/pkg/mcp/kafka_client_consume_tools.go index 35bcb88..10f907c 100644 --- a/pkg/mcp/kafka_client_consume_tools.go +++ b/pkg/mcp/kafka_client_consume_tools.go @@ -24,11 +24,13 @@ import ( "slices" "time" + "github.com/hamba/avro/v2" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" "github.com/sirupsen/logrus" "github.com/streamnative/streamnative-mcp-server/pkg/kafka" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sr" ) var logger *logrus.Logger @@ -52,7 +54,7 @@ func KafkaClientAddConsumeTools(s *server.MCPServer, _ bool, logrusLogger *logru "1. Basic consumption - Get 10 earliest messages from a topic:\n" + " topic: \"my-topic\"\n" + " max-messages: 10\n\n" + - "2. Consume from beginning - Get messages from the start of a topic:\n" + + "2. Consumer group - Join an existing consumer group and resume from committed offset:\n" + " topic: \"my-topic\"\n" + " offset: \"atstart\"\n" + " max-messages: 20\n\n" + @@ -132,6 +134,14 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp. timeoutSec = 10 // Default to 10 seconds } + group, hasGroup := optionalParam[string](request.Params.Arguments, "group") + if !hasGroup { + group = "" + } + if group != "" { + opts = append(opts, kgo.ConsumerGroup(group)) + } + offsetStr, hasOffset := optionalParam[string](request.Params.Arguments, "offset") if !hasOffset { offsetStr = "atstart" // Default to starting at the beginning @@ -148,10 +158,8 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp. default: offset = kgo.NewOffset().AtStart() } - opts = append(opts, kgo.ConsumeResetOffset(offset)) - - logger.Infof("Consuming from topic: %s, offset: %s, max-messages: %d, timeout: %d", topicName, offsetStr, int(maxMessages), int(timeoutSec)) + logger.Infof("Consuming from topic: %s, group: %s, max-messages: %d, timeout: %d", topicName, group, int(maxMessages), int(timeoutSec)) // Create Kafka client using the new Kafka package kafkaClient, err := kafka.GetKafkaClient(opts...) @@ -160,6 +168,13 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp. } defer kafkaClient.Close() + srClient, err := kafka.GetKafkaSchemaRegistryClient() + schemaReady := false + var serde sr.Serde + if err == nil && srClient != nil { + schemaReady = true + } + // Set timeout timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeoutSec)*time.Second) defer cancel() @@ -168,10 +183,44 @@ func handleKafkaConsume(ctx context.Context, request mcp.CallToolRequest) (*mcp. return mcp.NewToolResultError(fmt.Sprintf("Failed to ping Kafka cluster: %v", err)), nil } - topics := kafkaClient.OptValue("ConsumeTopics") - logger.Infof("Consuming from topics: %v\n", topics) + if schemaReady { + subjSchema, err := srClient.SchemaByVersion(timeoutCtx, topicName+"-value", -1) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get schema: %v", err)), nil + } + logger.Infof("Schema ID: %d", subjSchema.ID) + switch subjSchema.Type { + case sr.TypeAvro: + avroSchema, err := avro.Parse(subjSchema.Schema.Schema) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to parse avro schema: %v", err)), nil + } + serde.Register( + subjSchema.ID, + map[string]any{}, + sr.EncodeFn(func(v any) ([]byte, error) { + return avro.Marshal(avroSchema, v) + }), + sr.DecodeFn(func(data []byte, v any) error { + return avro.Unmarshal(avroSchema, data, v) + }), + ) + case sr.TypeJSON: + serde.Register( + subjSchema.ID, + map[string]any{}, + sr.EncodeFn(json.Marshal), + sr.DecodeFn(json.Unmarshal), + ) + case sr.TypeProtobuf: + default: + // TODO: support other schema types + logger.Infof("Unsupported schema type: %s", subjSchema.Type) + schemaReady = false + } + } - results := make([]*kgo.Record, 0) + results := make([]any, 0) consumerLoop: for { fetches := kafkaClient.PollRecords(timeoutCtx, int(maxMessages)) @@ -186,13 +235,31 @@ consumerLoop: for !iter.Done() { record := iter.Next() - results = append(results, record) + if schemaReady { + var value map[string]any + err := serde.Decode(record.Value, &value) + if err != nil { + logger.Infof("Failed to decode value: %v", err) + results = append(results, record.Value) + } else { + results = append(results, value) + } + } else { + results = append(results, record.Value) + } if len(results) >= int(maxMessages) { break consumerLoop } } } + err = kafkaClient.CommitUncommittedOffsets(timeoutCtx) + if err != nil { + if err != context.Canceled { + logger.Infof("Failed to commit offsets: %v", err) + } + } + jsonResults, err := json.Marshal(results) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal results: %v", err)), nil diff --git a/pkg/mcp/kafka_client_produce_tools.go b/pkg/mcp/kafka_client_produce_tools.go index b133ee2..abe2a34 100644 --- a/pkg/mcp/kafka_client_produce_tools.go +++ b/pkg/mcp/kafka_client_produce_tools.go @@ -21,14 +21,15 @@ import ( "context" "encoding/json" "fmt" - "os" "slices" "time" + "github.com/hamba/avro/v2" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" "github.com/streamnative/streamnative-mcp-server/pkg/kafka" "github.com/twmb/franz-go/pkg/kgo" + "github.com/twmb/franz-go/pkg/sr" ) // KafkaClientAddProduceTools adds Kafka client produce tools to the MCP server @@ -81,39 +82,23 @@ func KafkaClientAddProduceTools(s *server.MCPServer, readOnly bool, features []s mcp.WithString("key", mcp.Description("The key for the message. "+ "Optional. Keys are used for partition assignment and maintaining order for related messages. "+ - "Messages with the same key will be sent to the same partition. "+ - "If not provided, the message will be assigned to a random partition."), + "Messages with the same key will be sent to the same partition."), ), mcp.WithString("value", + mcp.Required(), mcp.Description("The value/content of the message to send. "+ - "Required if 'messages' is not provided. This is the actual payload that will be delivered to consumers."), - ), - mcp.WithNumber("partition", - mcp.Description("The specific partition number to produce to. "+ - "Optional. Partitions are zero-indexed (0, 1, 2, etc). "+ - "If specified, the message will be sent to this partition regardless of the key. "+ - "If not specified, the partition will be determined by the key hash or randomly assigned."), + "This is the actual payload that will be delivered to consumers. It can be a JSON string, and the system will automatically serialize it to the appropriate format based on the schema registry if it is available."), ), mcp.WithArray("headers", - mcp.Description("Message headers in the format of [{\"key\": \"header-key\", \"value\": \"header-value\"}]. "+ + mcp.Description("Message headers in the format of [{\"key\": \"value\"}]. "+ "Optional. Headers allow you to attach metadata to messages without modifying the payload. "+ - "They are passed along with the message to consumers but are not used for routing."), - ), - mcp.WithArray("messages", - mcp.Description("Batch of messages to send in the format of [{\"key\": \"key1\", \"value\": \"value1\", \"headers\": [...], \"partition\": 0}, ...]. "+ - "Optional. Allows sending multiple messages with different keys, values, headers, and partitions in a single request. "+ - "If provided, the individual key/value/headers/partition parameters are ignored."), + "They are passed along with the message to consumers."), ), mcp.WithBoolean("sync", mcp.Description("Whether to wait for server acknowledgment before returning. "+ "Optional. Default is true. When true, ensures the message was successfully written "+ "to the topic before the tool returns a success response."), ), - mcp.WithString("file", - mcp.Description("Path to a file containing the message value. "+ - "Optional. If provided, the file content will be used as the message value instead of the 'value' parameter. "+ - "Useful for sending larger messages or binary data."), - ), ) s.AddTool(produceTool, handleKafkaProduce) } @@ -126,33 +111,15 @@ func handleKafkaProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp. return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil } - // Process batch messages first if provided - messages, hasMessages := optionalParam[[]interface{}](request.Params.Arguments, "messages") - if hasMessages && len(messages) > 0 { - return handleBatchMessages(ctx, topicName, messages) - } - // Handle single message case // Get value from parameter or file - value, hasValue := optionalParam[string](request.Params.Arguments, "value") - filePath, hasFile := optionalParam[string](request.Params.Arguments, "file") - - if !hasValue && !hasFile { - return mcp.NewToolResultError("Either 'value' or 'file' parameter is required when not using 'messages'"), nil - } - - // If file is provided, read its content - if hasFile { - fileContent, err := os.ReadFile(filePath) - if err != nil { - return mcp.NewToolResultError(fmt.Sprintf("Failed to read file: %v", err)), nil - } - value = string(fileContent) + value, err := requiredParam[string](request.Params.Arguments, "value") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get value: %v", err)), nil } // Get optional parameters key, hasKey := optionalParam[string](request.Params.Arguments, "key") - partition, hasPartition := optionalParam[float64](request.Params.Arguments, "partition") headers, hasHeaders := optionalParam[[]interface{}](request.Params.Arguments, "headers") sync := true if syncVal, hasSync := optionalParam[bool](request.Params.Arguments, "sync"); hasSync { @@ -166,6 +133,53 @@ func handleKafkaProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp. } defer kafkaClient.Close() + srClient, err := kafka.GetKafkaSchemaRegistryClient() + schemaReady := false + var serde sr.Serde + if err == nil && srClient != nil { + schemaReady = true + } + + if schemaReady { + // Set timeout + timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + subjSchema, err := srClient.SchemaByVersion(timeoutCtx, topicName+"-value", -1) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get schema: %v", err)), nil + } + logger.Infof("Schema ID: %d", subjSchema.ID) + switch subjSchema.Type { + case sr.TypeAvro: + avroSchema, err := avro.Parse(subjSchema.Schema.Schema) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to parse avro schema: %v", err)), nil + } + serde.Register( + subjSchema.ID, + map[string]any{}, + sr.EncodeFn(func(v any) ([]byte, error) { + return avro.Marshal(avroSchema, v) + }), + sr.DecodeFn(func(data []byte, v any) error { + return avro.Unmarshal(avroSchema, data, v) + }), + ) + case sr.TypeJSON: + serde.Register( + subjSchema.ID, + map[string]any{}, + sr.EncodeFn(json.Marshal), + sr.DecodeFn(json.Unmarshal), + ) + case sr.TypeProtobuf: + default: + // TODO: support other schema types + logger.Infof("Unsupported schema type: %s", subjSchema.Type) + schemaReady = false + } + } + // Prepare record record := &kgo.Record{ Topic: topicName, @@ -177,110 +191,30 @@ func handleKafkaProduce(ctx context.Context, request mcp.CallToolRequest) (*mcp. record.Key = []byte(key) } - // Set partition if provided - if hasPartition { - record.Partition = int32(partition) - } - // Add headers if provided if hasHeaders { record.Headers = parseHeaders(headers) } - // Produce the message - result, err := produceRecord(ctx, kafkaClient, record, sync) - if err != nil { - return mcp.NewToolResultError(fmt.Sprintf("Failed to produce message: %v", err)), nil - } - - return result, nil -} - -// handleBatchMessages processes multiple messages from the messages array -func handleBatchMessages(ctx context.Context, topicName string, messages []interface{}) (*mcp.CallToolResult, error) { - // Create Kafka client - kafkaClient, err := kafka.GetKafkaClient() - if err != nil { - return mcp.NewToolResultError(fmt.Sprintf("Failed to create Kafka client: %v", err)), nil - } - defer kafkaClient.Close() - - // Process each message - var records []*kgo.Record - for i, msg := range messages { - msgMap, ok := msg.(map[string]interface{}) - if !ok { - return mcp.NewToolResultError(fmt.Sprintf("Invalid message format at index %d", i)), nil - } - - // Value is required - valueInterface, exists := msgMap["value"] - if !exists { - return mcp.NewToolResultError(fmt.Sprintf("Message at index %d is missing required 'value' field", i)), nil - } - value, ok := valueInterface.(string) - if !ok { - return mcp.NewToolResultError(fmt.Sprintf("Value at index %d must be a string", i)), nil - } - - // Create record - record := &kgo.Record{ - Topic: topicName, - Value: []byte(value), - } - - // Add key if provided - if keyInterface, exists := msgMap["key"]; exists { - if key, ok := keyInterface.(string); ok { - record.Key = []byte(key) - } - } - - // Set partition if provided - if partitionInterface, exists := msgMap["partition"]; exists { - if partition, ok := partitionInterface.(float64); ok { - record.Partition = int32(partition) - } - } - - // Add headers if provided - if headersInterface, exists := msgMap["headers"]; exists { - if headers, ok := headersInterface.([]interface{}); ok { - record.Headers = parseHeaders(headers) + if schemaReady { + var structValue map[string]any + if err := json.Unmarshal([]byte(value), &structValue); err == nil { + record.Value, err = serde.Encode(structValue) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to encode value: %v", err)), nil } + } else { + return mcp.NewToolResultError(fmt.Sprintf("Failed to unmarshal value: %v", err)), nil } - - records = append(records, record) - } - - // No valid records to produce - if len(records) == 0 { - return mcp.NewToolResultError("No valid messages to produce"), nil } - // Produce all records - results := kafkaClient.ProduceSync(ctx, records...) - // Check for errors in produce results - for _, result := range results { - if result.Err != nil { - return mcp.NewToolResultError(fmt.Sprintf("Failed to produce messages: %v", result.Err)), nil - } - } - - // Create result - result := map[string]interface{}{ - "status": "success", - "topic": topicName, - "message_count": len(records), - "timestamp": time.Now().Format(time.RFC3339), - } - - jsonBytes, err := json.Marshal(result) + // Produce the message + result, err := produceRecord(ctx, kafkaClient, record, sync) if err != nil { - return mcp.NewToolResultError(fmt.Sprintf("Failed to create result: %v", err)), nil + return mcp.NewToolResultError(fmt.Sprintf("Failed to produce message: %v", err)), nil } - return mcp.NewToolResultText(string(jsonBytes)), nil + return result, nil } // parseHeaders converts the headers array to kgo.RecordHeader format @@ -293,13 +227,10 @@ func parseHeaders(headers []interface{}) []kgo.RecordHeader { continue } - key, hasKey := headerMap["key"].(string) - value, hasValue := headerMap["value"].(string) - - if hasKey && hasValue { + for key, value := range headerMap { recordHeaders = append(recordHeaders, kgo.RecordHeader{ Key: key, - Value: []byte(value), + Value: []byte(value.(string)), }) } }