Skip to content

Commit b67ea08

Browse files
authored
[kafka_admin_groups] fix delete-offset, add set-offset (#4)
- fix `delete-offset` that not call correct api - add `set-offset` to allow agents commit offset manually
1 parent aa5c1a0 commit b67ea08

File tree

1 file changed

+80
-4
lines changed

1 file changed

+80
-4
lines changed

pkg/mcp/kafka_admin_groups_tools.go

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,16 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
3636
}
3737

3838
resourceDesc := "Resource to operate on. Available resources:\n" +
39-
"- group: A single Kafka Consumer Group for operations on individual groups (describe, remove-members)\n" +
39+
"- group: A single Kafka Consumer Group for operations on individual groups (describe, remove-members, set-offset, delete-offset)\n" +
4040
"- groups: Collection of Kafka Consumer Groups for bulk operations (list)"
4141

4242
operationDesc := "Operation to perform. Available operations:\n" +
4343
"- list: List all Kafka Consumer Groups in the cluster\n" +
4444
"- describe: Get detailed information about a specific Consumer Group, including members, offsets, and lag\n" +
4545
"- remove-members: Remove specific members from a Consumer Group to force rebalancing or troubleshoot issues\n" +
4646
"- offsets: Get offsets for a specific consumer group\n" +
47-
"- delete-offset: Delete a specific offset for a consumer group of a topic"
47+
"- delete-offset: Delete a specific offset for a consumer group of a topic\n" +
48+
"- set-offset: Set a specific offset for a consumer group's topic-partition"
4849

4950
toolDesc := "Unified tool for managing Apache Kafka Consumer Groups.\n" +
5051
"This tool provides access to Kafka consumer group operations including listing, describing, and managing group membership.\n" +
@@ -75,6 +76,13 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
7576
" operation: \"delete-offset\"\n" +
7677
" group: \"my-consumer-group\"\n" +
7778
" topic: \"my-topic\"\n\n" +
79+
"6. Set a specific offset for a consumer group's topic-partition:\n" +
80+
" resource: \"group\"\n" +
81+
" operation: \"set-offset\"\n" +
82+
" group: \"my-consumer-group\"\n" +
83+
" topic: \"my-topic\"\n" +
84+
" partition: 0\n" +
85+
" offset: 1000\n\n" +
7886

7987
"This tool requires Kafka super-user permissions."
8088

@@ -101,6 +109,14 @@ func KafkaAdminAddGroupsTools(s *server.MCPServer, readOnly bool, features []str
101109
mcp.Description("The name of the Kafka topic to operate on. "+
102110
"Required for the 'delete' operation. "+
103111
"Must be an existing topic name in the Kafka cluster.")),
112+
mcp.WithString("partition",
113+
mcp.Description("The partition number to set offset for. "+
114+
"Required for the 'set-offset' operation. "+
115+
"Must be a valid partition number for the specified topic.")),
116+
mcp.WithNumber("offset",
117+
mcp.Description("The offset value to set. "+
118+
"Required for the 'set-offset' operation. "+
119+
"Use -1 for earliest offset, -2 for latest offset, or a specific positive value.")),
104120
)
105121

106122
s.AddTool(kafkaGroupsTool, handleKafkaGroupsTool(readOnly))
@@ -146,6 +162,8 @@ func handleKafkaGroupsTool(readOnly bool) func(context.Context, mcp.CallToolRequ
146162
return handleKafkaGroupOffsets(ctx, admin, request)
147163
case "delete-offset":
148164
return handleKafkaGroupDeleteOffset(ctx, admin, request)
165+
case "set-offset":
166+
return handleKafkaGroupSetOffset(ctx, admin, request)
149167
default:
150168
return mcp.NewToolResultError(fmt.Sprintf("Invalid operation for resource 'group': %s", operation)), nil
151169
}
@@ -249,12 +267,70 @@ func handleKafkaGroupDeleteOffset(ctx context.Context, admin *kadm.Client, reque
249267
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
250268
}
251269

252-
response, err := admin.FetchOffsetsForTopics(ctx, groupName, topic)
270+
// Create a TopicsSet with the specified topic
271+
// This will target all partitions for the given topic
272+
topicsSet := make(kadm.TopicsSet)
273+
topicsSet.Add(topic)
274+
275+
// Call DeleteOffsets to delete the offsets for the specified topic
276+
responses, err := admin.DeleteOffsets(ctx, groupName, topicsSet)
253277
if err != nil {
254278
return mcp.NewToolResultError(fmt.Sprintf("Failed to delete offsets: %v", err)), nil
255279
}
256280

257-
jsonBytes, err := json.Marshal(response)
281+
// Check for errors in the response
282+
if err := responses.Error(); err != nil {
283+
return mcp.NewToolResultError(fmt.Sprintf("Errors occurred during offset deletion: %v", err)), nil
284+
}
285+
286+
jsonBytes, err := json.Marshal(responses)
287+
if err != nil {
288+
return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil
289+
}
290+
291+
return mcp.NewToolResultText(string(jsonBytes)), nil
292+
}
293+
294+
func handleKafkaGroupSetOffset(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
295+
// Get required parameters
296+
groupName, err := requiredParam[string](request.Params.Arguments, "group")
297+
if err != nil {
298+
return mcp.NewToolResultError(fmt.Sprintf("Failed to get group name: %v", err)), nil
299+
}
300+
301+
topic, err := requiredParam[string](request.Params.Arguments, "topic")
302+
if err != nil {
303+
return mcp.NewToolResultError(fmt.Sprintf("Failed to get topic name: %v", err)), nil
304+
}
305+
306+
partition, err := requiredParam[float64](request.Params.Arguments, "partition")
307+
if err != nil {
308+
return mcp.NewToolResultError(fmt.Sprintf("Failed to get partition number: %v", err)), nil
309+
}
310+
partitionInt := int32(partition)
311+
312+
offset, err := requiredParam[float64](request.Params.Arguments, "offset")
313+
if err != nil {
314+
return mcp.NewToolResultError(fmt.Sprintf("Failed to get offset value: %v", err)), nil
315+
}
316+
offsetInt := int64(offset)
317+
318+
// Create Offsets object with the specified topic, partition, and offset
319+
offsets := make(kadm.Offsets)
320+
offsets.AddOffset(topic, partitionInt, offsetInt, -1) // Using -1 for leaderEpoch as it's optional
321+
322+
// Commit the offsets
323+
responses, err := admin.CommitOffsets(ctx, groupName, offsets)
324+
if err != nil {
325+
return mcp.NewToolResultError(fmt.Sprintf("Failed to commit offsets: %v", err)), nil
326+
}
327+
328+
// Check for errors in the response
329+
if err := responses.Error(); err != nil {
330+
return mcp.NewToolResultError(fmt.Sprintf("Errors occurred during offset commit: %v", err)), nil
331+
}
332+
333+
jsonBytes, err := json.Marshal(responses)
258334
if err != nil {
259335
return mcp.NewToolResultError(fmt.Sprintf("Failed to marshal response: %v", err)), nil
260336
}

0 commit comments

Comments
 (0)