Skip to content

Commit d850183

Browse files
committed
Refactor Pulsar Admin Tools with Builder Pattern
- Introduced builder pattern for various Pulsar admin tools, enhancing modularity and maintainability. - Migrated tool definitions and handler logic to respective builders, improving code organization and readability. - Updated tool registration logic to utilize the new builders, ensuring a unified approach for managing Pulsar operations. - Enhanced error handling and added comments for better clarity and understanding of the code structure. - Maintained backward compatibility while integrating the new builder pattern.
1 parent 0768bbc commit d850183

34 files changed

+8047
-9820
lines changed

pkg/mcp/builders/kafka/schema_registry.go

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,8 @@ func (b *KafkaSchemaRegistryToolBuilder) handleSchemaSubjectDelete(ctx context.C
348348
return b.handleError("get subject name", err), nil
349349
}
350350

351-
versions, err := client.DeleteSubject(ctx, subject)
351+
// Delete subject using correct API signature (soft delete by default)
352+
versions, err := client.DeleteSubject(ctx, subject, sr.SoftDelete)
352353
if err != nil {
353354
return b.handleError("delete schema subject", err), nil
354355
}
@@ -416,30 +417,41 @@ func (b *KafkaSchemaRegistryToolBuilder) handleSchemaVersionDelete(ctx context.C
416417
return b.handleError("parse version number", err), nil
417418
}
418419

419-
deletedVersion, err := client.DeleteSchemaVersion(ctx, subject, version)
420+
// Delete schema version using correct API signature (soft delete by default)
421+
err = client.DeleteSchema(ctx, subject, version, sr.SoftDelete)
420422
if err != nil {
421423
return b.handleError("delete schema version", err), nil
422424
}
423-
return b.marshalResponse(deletedVersion)
425+
426+
return mcp.NewToolResultText(fmt.Sprintf("Schema version %d for subject %s deleted successfully", version, subject)), nil
424427
}
425428

426429
// handleSchemaCompatibilityGet handles getting compatibility setting
427430
func (b *KafkaSchemaRegistryToolBuilder) handleSchemaCompatibilityGet(ctx context.Context, client *sr.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
428-
subject := request.GetString("subject") // Optional for global compatibility
429-
430-
var compatibility sr.CompatibilityLevel
431-
var err error
431+
subject := request.GetString("subject", "") // Optional for global compatibility
432432

433+
var results []sr.CompatibilityResult
433434
if subject != "" {
434-
compatibility, err = client.Compatibility(ctx, subject)
435+
// Get compatibility for specific subject
436+
results = client.Compatibility(ctx, subject)
435437
} else {
436-
compatibility, err = client.Config(ctx)
438+
// Get global compatibility
439+
results = client.Compatibility(ctx)
437440
}
438441

439-
if err != nil {
440-
return b.handleError("get compatibility setting", err), nil
442+
// Check for errors in results
443+
for _, result := range results {
444+
if result.Err != nil {
445+
return b.handleError("get compatibility setting", result.Err), nil
446+
}
447+
}
448+
449+
// Return the first result (there should only be one)
450+
if len(results) > 0 {
451+
return b.marshalResponse(map[string]string{"compatibility": results[0].Level.String()})
441452
}
442-
return b.marshalResponse(map[string]string{"compatibility": string(compatibility)})
453+
454+
return mcp.NewToolResultError("No compatibility result returned"), nil
443455
}
444456

445457
// handleSchemaCompatibilitySet handles setting compatibility level
@@ -449,31 +461,42 @@ func (b *KafkaSchemaRegistryToolBuilder) handleSchemaCompatibilitySet(ctx contex
449461
return b.handleError("get compatibility level", err), nil
450462
}
451463

452-
subject := request.GetString("subject") // Optional for global compatibility
464+
subject := request.GetString("subject", "") // Optional for global compatibility
453465

454466
// Parse compatibility level
455467
var compatibility sr.CompatibilityLevel
456468
switch strings.ToUpper(compatibilityStr) {
457469
case "BACKWARD":
458-
compatibility = sr.CompatibilityBackward
470+
compatibility = sr.CompatBackward
459471
case "FORWARD":
460-
compatibility = sr.CompatibilityForward
472+
compatibility = sr.CompatForward
461473
case "FULL":
462-
compatibility = sr.CompatibilityFull
474+
compatibility = sr.CompatFull
463475
case "NONE":
464-
compatibility = sr.CompatibilityNone
476+
compatibility = sr.CompatNone
465477
default:
466478
return mcp.NewToolResultError(fmt.Sprintf("Invalid compatibility level: %s. Valid levels: BACKWARD, FORWARD, FULL, NONE", compatibilityStr)), nil
467479
}
468480

481+
// Create SetCompatibility request
482+
setCompat := sr.SetCompatibility{
483+
Level: compatibility,
484+
}
485+
486+
var results []sr.CompatibilityResult
469487
if subject != "" {
470-
_, err = client.SetCompatibility(ctx, subject, compatibility)
488+
// Set compatibility for specific subject
489+
results = client.SetCompatibility(ctx, setCompat, subject)
471490
} else {
472-
_, err = client.SetConfig(ctx, compatibility)
491+
// Set global compatibility
492+
results = client.SetCompatibility(ctx, setCompat)
473493
}
474494

475-
if err != nil {
476-
return b.handleError("set compatibility level", err), nil
495+
// Check for errors in results
496+
for _, result := range results {
497+
if result.Err != nil {
498+
return b.handleError("set compatibility level", result.Err), nil
499+
}
477500
}
478501

479502
return mcp.NewToolResultText(fmt.Sprintf("Compatibility level set to %s", compatibilityStr)), nil

pkg/mcp/builders/kafka/topics.go

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@ import (
2121
"context"
2222
"encoding/json"
2323
"fmt"
24-
"strconv"
2524
"strings"
2625

2726
"github.com/mark3labs/mcp-go/mcp"
2827
"github.com/mark3labs/mcp-go/server"
29-
"github.com/streamnative/streamnative-mcp-server/pkg/common"
3028
"github.com/streamnative/streamnative-mcp-server/pkg/kafka"
3129
"github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders"
3230
"github.com/twmb/franz-go/pkg/kadm"
@@ -266,12 +264,7 @@ func (b *KafkaTopicsToolBuilder) getKafkaAdminClient(ctx context.Context) (*kadm
266264

267265
// handleKafkaTopicsList handles listing all topics
268266
func (b *KafkaTopicsToolBuilder) handleKafkaTopicsList(ctx context.Context, admin *kadm.Client, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
269-
includeInternal := false
270-
if val, exists := request.Arguments["includeInternal"]; exists {
271-
if boolVal, ok := val.(bool); ok {
272-
includeInternal = boolVal
273-
}
274-
}
267+
includeInternal := request.GetBool("includeInternal", false)
275268

276269
topics, err := admin.ListTopics(ctx)
277270
if err != nil {
@@ -314,12 +307,12 @@ func (b *KafkaTopicsToolBuilder) handleKafkaTopicCreate(ctx context.Context, adm
314307
return b.handleError("get topic name", err), nil
315308
}
316309

317-
partitionsNum, err := request.RequireNumber("partitions")
310+
partitionsNum, err := request.RequireInt("partitions")
318311
if err != nil {
319312
return b.handleError("get partitions", err), nil
320313
}
321314

322-
replicationFactorNum, err := request.RequireNumber("replicationFactor")
315+
replicationFactorNum, err := request.RequireInt("replicationFactor")
323316
if err != nil {
324317
return b.handleError("get replication factor", err), nil
325318
}
@@ -329,7 +322,8 @@ func (b *KafkaTopicsToolBuilder) handleKafkaTopicCreate(ctx context.Context, adm
329322

330323
// Parse optional configs
331324
var configs map[string]*string
332-
if configsParam, exists := request.Arguments["configs"]; exists {
325+
arguments := request.GetArguments()
326+
if configsParam, exists := arguments["configs"]; exists {
333327
if configsMap, ok := configsParam.(map[string]interface{}); ok {
334328
configs = make(map[string]*string)
335329
for key, value := range configsMap {
@@ -344,15 +338,8 @@ func (b *KafkaTopicsToolBuilder) handleKafkaTopicCreate(ctx context.Context, adm
344338
}
345339
}
346340

347-
// Create topic request
348-
req := kadm.TopicRequest{
349-
Topic: topicName,
350-
NumPartitions: partitions,
351-
ReplicationFactor: replicationFactor,
352-
Configs: configs,
353-
}
354-
355-
results, err := admin.CreateTopics(ctx, req)
341+
// Create topic using the correct CreateTopics API
342+
results, err := admin.CreateTopics(ctx, partitions, replicationFactor, configs, topicName)
356343
if err != nil {
357344
return b.handleError("create Kafka topic", err), nil
358345
}

0 commit comments

Comments
 (0)