diff --git a/pkg/mcp/kafka_admin_groups_tools.go b/pkg/mcp/kafka_admin_groups_tools.go index 68e9490..a4ddb67 100644 --- a/pkg/mcp/kafka_admin_groups_tools.go +++ b/pkg/mcp/kafka_admin_groups_tools.go @@ -36,7 +36,7 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str } resourceDesc := "Resource to operate on. Available resources:\n" + - "- group: A single Kafka Consumer Group for operations on individual groups (describe, remove-members)\n" + + "- group: A single Kafka Consumer Group for operations on individual groups (describe, remove-members, set-offset, delete-offset)\n" + "- groups: Collection of Kafka Consumer Groups for bulk operations (list)" operationDesc := "Operation to perform. Available operations:\n" + @@ -44,7 +44,8 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str "- describe: Get detailed information about a specific Consumer Group, including members, offsets, and lag\n" + "- remove-members: Remove specific members from a Consumer Group to force rebalancing or troubleshoot issues\n" + "- offsets: Get offsets for a specific consumer group\n" + - "- delete-offset: Delete a specific offset for a consumer group of a topic" + "- delete-offset: Delete a specific offset for a consumer group of a topic\n" + + "- set-offset: Set a specific offset for a consumer group's topic-partition" toolDesc := "Unified tool for managing Apache Kafka Consumer Groups.\n" + "This tool provides access to Kafka consumer group operations including listing, describing, and managing group membership.\n" + @@ -75,6 +76,13 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str " operation: \"delete-offset\"\n" + " group: \"my-consumer-group\"\n" + " topic: \"my-topic\"\n\n" + + "6. Set a specific offset for a consumer group's topic-partition:\n" + + " resource: \"group\"\n" + + " operation: \"set-offset\"\n" + + " group: \"my-consumer-group\"\n" + + " topic: \"my-topic\"\n" + + " partition: 0\n" + + " offset: 1000\n\n" + "This tool requires Kafka super-user permissions." @@ -101,6 +109,14 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str mcp.Description("The name of the Kafka topic to operate on. "+ "Required for the 'delete' operation. "+ "Must be an existing topic name in the Kafka cluster.")), + mcp.WithString("partition", + mcp.Description("The partition number to set offset for. "+ + "Required for the 'set-offset' operation. "+ + "Must be a valid partition number for the specified topic.")), + mcp.WithNumber("offset", + mcp.Description("The offset value to set. "+ + "Required for the 'set-offset' operation. "+ + "Use -1 for earliest offset, -2 for latest offset, or a specific positive value.")), ) s.AddTool(kafkaGroupsTool, handleKafkaGroupsTool(readOnly)) @@ -146,6 +162,8 @@ func handleKafkaGroupsTool(readOnly bool) func(context.Context, mcp.CallToolRequ return handleKafkaGroupOffsets(ctx, admin, request) case "delete-offset": return handleKafkaGroupDeleteOffset(ctx, admin, request) + case "set-offset": + return handleKafkaGroupSetOffset(ctx, admin, request) default: return mcp.NewToolResultError(fmt.Sprintf("Invalid operation for resource 'group': %s", operation)), nil } @@ -249,12 +267,70 @@ func handleKafkaGroupDeleteOffset(ctx context.Context, admin *kadm.Client, reque return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil } - response, err := admin.FetchOffsetsForTopics(ctx, groupName, topic) + // Create a TopicsSet with the specified topic + // This will target all partitions for the given topic + topicsSet := make(kadm.TopicsSet) + topicsSet.Add(topic) + + // Call DeleteOffsets to delete the offsets for the specified topic + responses, err := admin.DeleteOffsets(ctx, groupName, topicsSet) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to delete offsets: %v", err)), nil } - jsonBytes, err := json.Marshal(response) + // Check for errors in the response + if err := responses.Error(); err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Errors occurred during offset deletion: %v", err)), nil + } + + jsonBytes, err := json.Marshal(responses) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil + } + + return mcp.NewToolResultText(string(jsonBytes)), nil +} + +func handleKafkaGroupSetOffset(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { + // Get required parameters + groupName, err := requiredParam[string](request.Params.Arguments, "group") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil + } + + topic, err := requiredParam[string](request.Params.Arguments, "topic") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil + } + + partition, err := requiredParam[float64](request.Params.Arguments, "partition") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get partition number: %v", err)), nil + } + partitionInt := int32(partition) + + offset, err := requiredParam[float64](request.Params.Arguments, "offset") + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get offset value: %v", err)), nil + } + offsetInt := int64(offset) + + // Create Offsets object with the specified topic, partition, and offset + offsets := make(kadm.Offsets) + offsets.AddOffset(topic, partitionInt, offsetInt, -1) // Using -1 for leaderEpoch as it's optional + + // Commit the offsets + responses, err := admin.CommitOffsets(ctx, groupName, offsets) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to commit offsets: %v", err)), nil + } + + // Check for errors in the response + if err := responses.Error(); err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Errors occurred during offset commit: %v", err)), nil + } + + jsonBytes, err := json.Marshal(responses) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil }