Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions pkg/mcp/internal/testutil/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"strings"
"time"

"github.com/apache/pulsar-client-go/pulsaradmin"
Expand All @@ -29,10 +30,10 @@ import (

// PulsarTestHelper provides helper functions for Pulsar E2E testing.
type PulsarTestHelper struct {
adminClient pulsaradmin.Client
adminURL string
serviceURL string
httpClient *http.Client
adminClient pulsaradmin.Client
adminURL string
serviceURL string
httpClient *http.Client
}

// NewPulsarTestHelper creates a new PulsarTestHelper.
Expand Down Expand Up @@ -106,17 +107,37 @@ func (h *PulsarTestHelper) EnsureNamespace(ctx context.Context, tenant, namespac

// Check if namespace exists
namespaces, err := h.adminClient.Namespaces().GetNamespaces(tenant)
if err == nil {
if err != nil {
// If we cannot list namespaces, we'll try to create it anyway.
// This could be due to permission issues or other transient errors.
// The CreateNamespace call will fail with a proper error if there's a real issue.
} else {
// Successfully retrieved namespaces, check if ours exists
for _, ns := range namespaces {
if ns == fullNamespace {
return nil // Already exists
}
}
}

// Create namespace
// Create namespace - this will fail if it already exists or if there are permission/connection issues
err = h.adminClient.Namespaces().CreateNamespace(fullNamespace)
if err != nil {
// Check if the error is because namespace already exists.
// The Pulsar admin client doesn't provide typed errors, so we check the error message.
// This is a best-effort approach for test utilities - we check for common patterns:
// - "already exists" (common HTTP API message)
// - "alreadyexists" (possible gRPC/protobuf message)
// - "409" (HTTP Conflict status code)
// - "conflict" (HTTP status text)
// Using case-insensitive matching for robustness
errMsg := strings.ToLower(err.Error())
if strings.Contains(errMsg, "already exists") ||
strings.Contains(errMsg, "alreadyexists") ||
strings.Contains(errMsg, "409") ||
strings.Contains(errMsg, "conflict") {
return nil // Namespace already exists, which is what we want
}
return fmt.Errorf("failed to create namespace %s: %w", fullNamespace, err)
}

Expand Down