diff --git a/pkg/mcp/internal/testutil/pulsar.go b/pkg/mcp/internal/testutil/pulsar.go index c527fce..2d2e5b3 100644 --- a/pkg/mcp/internal/testutil/pulsar.go +++ b/pkg/mcp/internal/testutil/pulsar.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "strings" "time" "github.com/apache/pulsar-client-go/pulsaradmin" @@ -29,10 +30,10 @@ import ( // PulsarTestHelper provides helper functions for Pulsar E2E testing. type PulsarTestHelper struct { - adminClient pulsaradmin.Client - adminURL string - serviceURL string - httpClient *http.Client + adminClient pulsaradmin.Client + adminURL string + serviceURL string + httpClient *http.Client } // NewPulsarTestHelper creates a new PulsarTestHelper. @@ -106,7 +107,12 @@ func (h *PulsarTestHelper) EnsureNamespace(ctx context.Context, tenant, namespac // Check if namespace exists namespaces, err := h.adminClient.Namespaces().GetNamespaces(tenant) - if err == nil { + if err != nil { + // If we cannot list namespaces, we'll try to create it anyway. + // This could be due to permission issues or other transient errors. + // The CreateNamespace call will fail with a proper error if there's a real issue. + } else { + // Successfully retrieved namespaces, check if ours exists for _, ns := range namespaces { if ns == fullNamespace { return nil // Already exists @@ -114,9 +120,24 @@ func (h *PulsarTestHelper) EnsureNamespace(ctx context.Context, tenant, namespac } } - // Create namespace + // Create namespace - this will fail if it already exists or if there are permission/connection issues err = h.adminClient.Namespaces().CreateNamespace(fullNamespace) if err != nil { + // Check if the error is because namespace already exists. + // The Pulsar admin client doesn't provide typed errors, so we check the error message. + // This is a best-effort approach for test utilities - we check for common patterns: + // - "already exists" (common HTTP API message) + // - "alreadyexists" (possible gRPC/protobuf message) + // - "409" (HTTP Conflict status code) + // - "conflict" (HTTP status text) + // Using case-insensitive matching for robustness + errMsg := strings.ToLower(err.Error()) + if strings.Contains(errMsg, "already exists") || + strings.Contains(errMsg, "alreadyexists") || + strings.Contains(errMsg, "409") || + strings.Contains(errMsg, "conflict") { + return nil // Namespace already exists, which is what we want + } return fmt.Errorf("failed to create namespace %s: %w", fullNamespace, err) }