Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions pkg/mcp/kafka_admin_groups_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
}

resourceDesc := "Resource to operate on. Available resources:\n" +
"- group: A single Kafka Consumer Group for operations on individual groups (describe, remove-members)\n" +
"- group: A single Kafka Consumer Group for operations on individual groups (describe, remove-members, set-offset, delete-offset)\n" +
"- groups: Collection of Kafka Consumer Groups for bulk operations (list)"

operationDesc := "Operation to perform. Available operations:\n" +
"- list: List all Kafka Consumer Groups in the cluster\n" +
"- describe: Get detailed information about a specific Consumer Group, including members, offsets, and lag\n" +
"- remove-members: Remove specific members from a Consumer Group to force rebalancing or troubleshoot issues\n" +
"- offsets: Get offsets for a specific consumer group\n" +
"- delete-offset: Delete a specific offset for a consumer group of a topic"
"- delete-offset: Delete a specific offset for a consumer group of a topic\n" +
"- set-offset: Set a specific offset for a consumer group's topic-partition"

toolDesc := "Unified tool for managing Apache Kafka Consumer Groups.\n" +
"This tool provides access to Kafka consumer group operations including listing, describing, and managing group membership.\n" +
Expand Down Expand Up @@ -75,6 +76,13 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
" operation: \"delete-offset\"\n" +
" group: \"my-consumer-group\"\n" +
" topic: \"my-topic\"\n\n" +
"6. Set a specific offset for a consumer group's topic-partition:\n" +
" resource: \"group\"\n" +
" operation: \"set-offset\"\n" +
" group: \"my-consumer-group\"\n" +
" topic: \"my-topic\"\n" +
" partition: 0\n" +
" offset: 1000\n\n" +

"This tool requires Kafka super-user permissions."

Expand All @@ -101,6 +109,14 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
mcp.Description("The name of the Kafka topic to operate on. "+
"Required for the 'delete' operation. "+
"Must be an existing topic name in the Kafka cluster.")),
mcp.WithString("partition",
mcp.Description("The partition number to set offset for. "+
"Required for the 'set-offset' operation. "+
"Must be a valid partition number for the specified topic.")),
mcp.WithNumber("offset",
mcp.Description("The offset value to set. "+
"Required for the 'set-offset' operation. "+
"Use -1 for earliest offset, -2 for latest offset, or a specific positive value.")),
)

s.AddTool(kafkaGroupsTool, handleKafkaGroupsTool(readOnly))
Expand Down Expand Up @@ -146,6 +162,8 @@ func handleKafkaGroupsTool(readOnly bool) func(context.Context, mcp.CallToolRequ
return handleKafkaGroupOffsets(ctx, admin, request)
case "delete-offset":
return handleKafkaGroupDeleteOffset(ctx, admin, request)
case "set-offset":
return handleKafkaGroupSetOffset(ctx, admin, request)
default:
return mcp.NewToolResultError(fmt.Sprintf("Invalid operation for resource 'group': %s", operation)), nil
}
Expand Down Expand Up @@ -249,12 +267,70 @@ func handleKafkaGroupDeleteOffset(ctx context.Context, admin *kadm.Client, reque
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
}

response, err := admin.FetchOffsetsForTopics(ctx, groupName, topic)
// Create a TopicsSet with the specified topic
// This will target all partitions for the given topic
topicsSet := make(kadm.TopicsSet)
topicsSet.Add(topic)

// Call DeleteOffsets to delete the offsets for the specified topic
responses, err := admin.DeleteOffsets(ctx, groupName, topicsSet)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to delete offsets: %v", err)), nil
}

jsonBytes, err := json.Marshal(response)
// Check for errors in the response
if err := responses.Error(); err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Errors occurred during offset deletion: %v", err)), nil
}

jsonBytes, err := json.Marshal(responses)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil
}

return mcp.NewToolResultText(string(jsonBytes)), nil
}

func handleKafkaGroupSetOffset(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
// Get required parameters
groupName, err := requiredParam[string](request.Params.Arguments, "group")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
}

topic, err := requiredParam[string](request.Params.Arguments, "topic")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
}

partition, err := requiredParam[float64](request.Params.Arguments, "partition")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get partition number: %v", err)), nil
}
partitionInt := int32(partition)

offset, err := requiredParam[float64](request.Params.Arguments, "offset")
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to get offset value: %v", err)), nil
}
offsetInt := int64(offset)

// Create Offsets object with the specified topic, partition, and offset
offsets := make(kadm.Offsets)
offsets.AddOffset(topic, partitionInt, offsetInt, -1) // Using -1 for leaderEpoch as it's optional

// Commit the offsets
responses, err := admin.CommitOffsets(ctx, groupName, offsets)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to commit offsets: %v", err)), nil
}

// Check for errors in the response
if err := responses.Error(); err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Errors occurred during offset commit: %v", err)), nil
}

jsonBytes, err := json.Marshal(responses)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil
}
Expand Down