diff --git a/.gitignore b/.gitignore index 9609ee3f..b23a510c 100644 --- a/.gitignore +++ b/.gitignore @@ -52,3 +52,4 @@ node_modules/ .cursor .envrc +mise.toml diff --git a/pkg/admin/dummy.go b/pkg/admin/dummy.go index 24556d0e..c1cd3ed7 100644 --- a/pkg/admin/dummy.go +++ b/pkg/admin/dummy.go @@ -236,6 +236,16 @@ func (d *DummyPulsarAdmin) GetSchema(string) (*v1alpha1.SchemaInfo, error) { return nil, nil } +// GetSchemaInfoWithVersion is a fake implements of GetSchemaInfoWithVersion +func (d *DummyPulsarAdmin) GetSchemaInfoWithVersion(string) (*v1alpha1.SchemaInfo, int64, error) { + return nil, 0, nil +} + +// GetSchemaVersionBySchemaInfo is a fake implements of GetSchemaVersionBySchemaInfo +func (d *DummyPulsarAdmin) GetSchemaVersionBySchemaInfo(string, *v1alpha1.SchemaInfo) (int64, error) { + return 0, nil +} + // UploadSchema is a fake implements of UploadSchema func (d *DummyPulsarAdmin) UploadSchema(string, *SchemaParams) error { return nil diff --git a/pkg/admin/errors.go b/pkg/admin/errors.go index 5e1c351c..8c416df0 100644 --- a/pkg/admin/errors.go +++ b/pkg/admin/errors.go @@ -17,6 +17,7 @@ package admin import ( "errors" "net" + "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" ) @@ -51,12 +52,22 @@ const ( // ErrorReason returns the HTTP status code for the error func ErrorReason(err error) Reason { - cliError, ok := err.(rest.Error) - if !ok { - // can't determine error reason as can't convert to a cli error + if err == nil { return ReasonUnknown } - return Reason(cliError.Code) + + var restErrPtr *rest.Error + if errors.As(err, &restErrPtr) && restErrPtr != nil { + return Reason(restErrPtr.Code) + } + + var restErr rest.Error + if errors.As(err, &restErr) { + return Reason(restErr.Code) + } + + // can't determine error reason as can't convert to a cli error + return ReasonUnknown } // IsNotFound returns true if the error indicates the resource is not found on server @@ -66,7 +77,25 @@ func IsNotFound(err error) bool { // IsAlreadyExist returns true if the error indicates the resource already exist func IsAlreadyExist(err error) bool { - return ErrorReason(err) == ReasonAlreadyExist + if err == nil { + return false + } + + reason := ErrorReason(err) + if reason == ReasonAlreadyExist { + return true + } + if reason == ReasonInvalidParameter { + return isAlreadyExistsMessage(err) + } + if reason != ReasonUnknown { + return false + } + return isAlreadyExistsMessage(err) +} + +func isAlreadyExistsMessage(err error) bool { + return strings.Contains(strings.ToLower(err.Error()), "already exist") } // IsInternalServerError returns true if the error indicates the resource already exist diff --git a/pkg/admin/errors_test.go b/pkg/admin/errors_test.go new file mode 100644 index 00000000..45b805c8 --- /dev/null +++ b/pkg/admin/errors_test.go @@ -0,0 +1,90 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package admin + +import ( + "errors" + "fmt" + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" +) + +func TestIsAlreadyExist(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "rest.Error with 409", + err: rest.Error{Code: 409, Reason: "already exists"}, + expected: true, + }, + { + name: "rest.Error pointer with 409", + err: &rest.Error{Code: 409, Reason: "already exists"}, + expected: true, + }, + { + name: "wrapped rest.Error with 409", + err: fmt.Errorf("wrapped: %w", rest.Error{Code: 409, Reason: "already exists"}), + expected: true, + }, + { + name: "rest.Error with 412 and already exists reason", + err: rest.Error{Code: 412, Reason: "This topic already exists"}, + expected: true, + }, + { + name: "wrapped error with already exists message", + err: fmt.Errorf("wrapped: %w", errors.New("This topic already exists")), + expected: true, + }, + { + name: "plain error with already exists message", + err: errors.New("This topic already exists"), + expected: true, + }, + { + name: "unrelated error", + err: errors.New("permission denied"), + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IsAlreadyExist(tt.err) + if result != tt.expected { + t.Fatalf("IsAlreadyExist() = %v, want %v", result, tt.expected) + } + }) + } +} diff --git a/pkg/admin/impl.go b/pkg/admin/impl.go index 88806cf0..f4d57554 100644 --- a/pkg/admin/impl.go +++ b/pkg/admin/impl.go @@ -1689,6 +1689,39 @@ func (p *PulsarAdminClient) GetSchema(topic string) (*v1alpha1.SchemaInfo, error return rsp, nil } +// GetSchemaInfoWithVersion get schema info with version for a given topic +func (p *PulsarAdminClient) GetSchemaInfoWithVersion(topic string) (*v1alpha1.SchemaInfo, int64, error) { + info, err := p.adminClient.Schemas().GetSchemaInfoWithVersion(topic) + if err != nil { + return nil, 0, err + } + if info == nil { + return nil, 0, errors.New("schema info is empty") + } + if info.SchemaInfo == nil { + return nil, info.Version, nil + } + rsp := &v1alpha1.SchemaInfo{ + Type: info.SchemaInfo.Type, + Schema: string(info.SchemaInfo.Schema), + Properties: info.SchemaInfo.Properties, + } + return rsp, info.Version, nil +} + +// GetSchemaVersionBySchemaInfo gets schema version for a given schema payload +func (p *PulsarAdminClient) GetSchemaVersionBySchemaInfo(topic string, info *v1alpha1.SchemaInfo) (int64, error) { + if info == nil { + return 0, errors.New("schema info is nil") + } + payload := utils.SchemaInfo{ + Type: info.Type, + Schema: []byte(info.Schema), + Properties: info.Properties, + } + return p.adminClient.Schemas().GetVersionBySchemaInfo(topic, payload) +} + // UploadSchema creates or updates a schema for a given topic func (p *PulsarAdminClient) UploadSchema(topic string, params *SchemaParams) error { var payload utils.PostSchemaPayload diff --git a/pkg/admin/interface.go b/pkg/admin/interface.go index 2eb1e72b..f85f4438 100644 --- a/pkg/admin/interface.go +++ b/pkg/admin/interface.go @@ -265,6 +265,12 @@ type PulsarAdmin interface { // GetSchema retrieves the latest schema of a topic GetSchema(topic string) (*v1alpha1.SchemaInfo, error) + // GetSchemaInfoWithVersion retrieves the latest schema and its version for a topic + GetSchemaInfoWithVersion(topic string) (*v1alpha1.SchemaInfo, int64, error) + + // GetSchemaVersionBySchemaInfo retrieves the version for a given schema payload + GetSchemaVersionBySchemaInfo(topic string, info *v1alpha1.SchemaInfo) (int64, error) + // UploadSchema creates or updates a schema for a given topic UploadSchema(topic string, params *SchemaParams) error diff --git a/pkg/connection/reconcile_topic.go b/pkg/connection/reconcile_topic.go index d27522b9..dff0de14 100644 --- a/pkg/connection/reconcile_topic.go +++ b/pkg/connection/reconcile_topic.go @@ -17,7 +17,6 @@ package connection import ( "context" "fmt" - "reflect" "slices" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" @@ -287,23 +286,43 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin } func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTopic, log logr.Logger) error { - schema, serr := pulsarAdmin.GetSchema(topic.Spec.Name) - if serr != nil && !admin.IsNotFound(serr) { - return serr - } if topic.Spec.SchemaInfo != nil { - // Only upload the schema when schema doesn't exist or the schema has been updated - if admin.IsNotFound(serr) || !reflect.DeepEqual(topic.Spec.SchemaInfo, schema) { + uploadSchema := func(currentVersion int64, desiredVersion int64) error { info := topic.Spec.SchemaInfo param := &admin.SchemaParams{ Type: info.Type, Schema: info.Schema, Properties: info.Properties, } - log.Info("Upload schema for the topic", "name", topic.Spec.Name, "type", info.Type, "schema", info.Schema, "properties", info.Properties) - if err := pulsarAdmin.UploadSchema(topic.Spec.Name, param); err != nil { - return err + log.Info("Upload schema for the topic", + "name", topic.Spec.Name, + "type", info.Type, + "schema", info.Schema, + "properties", info.Properties, + "currentVersion", currentVersion, + "desiredVersion", desiredVersion) + return pulsarAdmin.UploadSchema(topic.Spec.Name, param) + } + + _, currentVersion, serr := pulsarAdmin.GetSchemaInfoWithVersion(topic.Spec.Name) + if serr != nil { + if admin.IsNotFound(serr) { + return uploadSchema(currentVersion, -1) } + return serr + } + + desiredVersion, verr := pulsarAdmin.GetSchemaVersionBySchemaInfo(topic.Spec.Name, topic.Spec.SchemaInfo) + if verr != nil { + if admin.IsNotFound(verr) { + return uploadSchema(currentVersion, -1) + } + return verr + } + + // Only upload the schema when schema doesn't exist or the schema has been updated + if desiredVersion < 0 || desiredVersion != currentVersion { + return uploadSchema(currentVersion, desiredVersion) } } // Note: We intentionally do NOT delete existing schemas when schemaInfo is not specified. diff --git a/pkg/connection/reconcile_topic_test.go b/pkg/connection/reconcile_topic_test.go new file mode 100644 index 00000000..351e5e0e --- /dev/null +++ b/pkg/connection/reconcile_topic_test.go @@ -0,0 +1,209 @@ +// Copyright 2026 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connection + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest" + "github.com/go-logr/logr" + + resourcev1alpha1 "github.com/streamnative/pulsar-resources-operator/api/v1alpha1" + "github.com/streamnative/pulsar-resources-operator/pkg/admin" +) + +type fakePulsarAdmin struct { + admin.DummyPulsarAdmin + + getSchemaInfoWithVersionFn func(topic string) (*resourcev1alpha1.SchemaInfo, int64, error) + getSchemaVersionBySchemaInfoFn func(topic string, info *resourcev1alpha1.SchemaInfo) (int64, error) + uploadSchemaFn func(topic string, params *admin.SchemaParams) error + getSchemaInfoWithVersionCalls int + getSchemaVersionBySchemaInfoCalls int + uploadSchemaCalls int +} + +func (f *fakePulsarAdmin) GetSchemaInfoWithVersion(topic string) (*resourcev1alpha1.SchemaInfo, int64, error) { + f.getSchemaInfoWithVersionCalls++ + return f.getSchemaInfoWithVersionFn(topic) +} + +func (f *fakePulsarAdmin) GetSchemaVersionBySchemaInfo(topic string, info *resourcev1alpha1.SchemaInfo) (int64, error) { + f.getSchemaVersionBySchemaInfoCalls++ + return f.getSchemaVersionBySchemaInfoFn(topic, info) +} + +func (f *fakePulsarAdmin) UploadSchema(topic string, params *admin.SchemaParams) error { + f.uploadSchemaCalls++ + return f.uploadSchemaFn(topic, params) +} + +func TestApplySchema_NoSchemaInfo(t *testing.T) { + fake := &fakePulsarAdmin{ + getSchemaInfoWithVersionFn: func(string) (*resourcev1alpha1.SchemaInfo, int64, error) { + return nil, 0, nil + }, + getSchemaVersionBySchemaInfoFn: func(string, *resourcev1alpha1.SchemaInfo) (int64, error) { + return 0, nil + }, + uploadSchemaFn: func(string, *admin.SchemaParams) error { + return nil + }, + } + + topic := &resourcev1alpha1.PulsarTopic{ + Spec: resourcev1alpha1.PulsarTopicSpec{ + Name: "persistent://tenant/ns/topic", + }, + } + + if err := applySchema(fake, topic, logr.Discard()); err != nil { + t.Fatalf("applySchema returned error: %v", err) + } + if fake.getSchemaInfoWithVersionCalls != 0 { + t.Fatalf("expected GetSchemaInfoWithVersion not called, got %d", fake.getSchemaInfoWithVersionCalls) + } + if fake.getSchemaVersionBySchemaInfoCalls != 0 { + t.Fatalf("expected GetSchemaVersionBySchemaInfo not called, got %d", fake.getSchemaVersionBySchemaInfoCalls) + } + if fake.uploadSchemaCalls != 0 { + t.Fatalf("expected UploadSchema not called, got %d", fake.uploadSchemaCalls) + } +} + +func TestApplySchema_UploadWhenSchemaMissing(t *testing.T) { + fake := &fakePulsarAdmin{ + getSchemaInfoWithVersionFn: func(string) (*resourcev1alpha1.SchemaInfo, int64, error) { + return nil, 0, rest.Error{Code: 404, Reason: "schema not found"} + }, + getSchemaVersionBySchemaInfoFn: func(string, *resourcev1alpha1.SchemaInfo) (int64, error) { + t.Fatalf("GetSchemaVersionBySchemaInfo should not be called when schema is missing") + return 0, nil + }, + uploadSchemaFn: func(string, *admin.SchemaParams) error { + return nil + }, + } + + topic := &resourcev1alpha1.PulsarTopic{ + Spec: resourcev1alpha1.PulsarTopicSpec{ + Name: "persistent://tenant/ns/topic", + SchemaInfo: &resourcev1alpha1.SchemaInfo{ + Type: "STRING", + Schema: "", + }, + }, + } + + if err := applySchema(fake, topic, logr.Discard()); err != nil { + t.Fatalf("applySchema returned error: %v", err) + } + if fake.uploadSchemaCalls != 1 { + t.Fatalf("expected UploadSchema called once, got %d", fake.uploadSchemaCalls) + } +} + +func TestApplySchema_SkipWhenVersionMatches(t *testing.T) { + fake := &fakePulsarAdmin{ + getSchemaInfoWithVersionFn: func(string) (*resourcev1alpha1.SchemaInfo, int64, error) { + return &resourcev1alpha1.SchemaInfo{Type: "STRING", Schema: ""}, 3, nil + }, + getSchemaVersionBySchemaInfoFn: func(string, *resourcev1alpha1.SchemaInfo) (int64, error) { + return 3, nil + }, + uploadSchemaFn: func(string, *admin.SchemaParams) error { + return nil + }, + } + + topic := &resourcev1alpha1.PulsarTopic{ + Spec: resourcev1alpha1.PulsarTopicSpec{ + Name: "persistent://tenant/ns/topic", + SchemaInfo: &resourcev1alpha1.SchemaInfo{ + Type: "STRING", + Schema: "", + }, + }, + } + + if err := applySchema(fake, topic, logr.Discard()); err != nil { + t.Fatalf("applySchema returned error: %v", err) + } + if fake.uploadSchemaCalls != 0 { + t.Fatalf("expected UploadSchema not called, got %d", fake.uploadSchemaCalls) + } +} + +func TestApplySchema_UploadWhenVersionDiffers(t *testing.T) { + fake := &fakePulsarAdmin{ + getSchemaInfoWithVersionFn: func(string) (*resourcev1alpha1.SchemaInfo, int64, error) { + return &resourcev1alpha1.SchemaInfo{Type: "STRING", Schema: ""}, 3, nil + }, + getSchemaVersionBySchemaInfoFn: func(string, *resourcev1alpha1.SchemaInfo) (int64, error) { + return 2, nil + }, + uploadSchemaFn: func(string, *admin.SchemaParams) error { + return nil + }, + } + + topic := &resourcev1alpha1.PulsarTopic{ + Spec: resourcev1alpha1.PulsarTopicSpec{ + Name: "persistent://tenant/ns/topic", + SchemaInfo: &resourcev1alpha1.SchemaInfo{ + Type: "STRING", + Schema: "", + }, + }, + } + + if err := applySchema(fake, topic, logr.Discard()); err != nil { + t.Fatalf("applySchema returned error: %v", err) + } + if fake.uploadSchemaCalls != 1 { + t.Fatalf("expected UploadSchema called once, got %d", fake.uploadSchemaCalls) + } +} + +func TestApplySchema_UploadWhenVersionNotFound(t *testing.T) { + fake := &fakePulsarAdmin{ + getSchemaInfoWithVersionFn: func(string) (*resourcev1alpha1.SchemaInfo, int64, error) { + return &resourcev1alpha1.SchemaInfo{Type: "STRING", Schema: ""}, 3, nil + }, + getSchemaVersionBySchemaInfoFn: func(string, *resourcev1alpha1.SchemaInfo) (int64, error) { + return -1, nil + }, + uploadSchemaFn: func(string, *admin.SchemaParams) error { + return nil + }, + } + + topic := &resourcev1alpha1.PulsarTopic{ + Spec: resourcev1alpha1.PulsarTopicSpec{ + Name: "persistent://tenant/ns/topic", + SchemaInfo: &resourcev1alpha1.SchemaInfo{ + Type: "STRING", + Schema: "", + }, + }, + } + + if err := applySchema(fake, topic, logr.Discard()); err != nil { + t.Fatalf("applySchema returned error: %v", err) + } + if fake.uploadSchemaCalls != 1 { + t.Fatalf("expected UploadSchema called once, got %d", fake.uploadSchemaCalls) + } +} diff --git a/tests/operator/resources_test.go b/tests/operator/resources_test.go index e9e4f320..deaf603b 100644 --- a/tests/operator/resources_test.go +++ b/tests/operator/resources_test.go @@ -19,7 +19,10 @@ import ( "encoding/json" "fmt" "slices" + "strconv" + "strings" + pulsarutils "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/format" @@ -126,6 +129,12 @@ var _ = Describe("Resources", func() { ptopic2 *v1alphav1.PulsarTopic ptopicName2 string = "test-topic2" topicName2 string = "persistent://cloud/stage/user2" + pexistingTopic *v1alphav1.PulsarTopic + pexistingTopicName string = "test-existing-topic" + existingTopicName string = "persistent://cloud/stage/existing" + pemptySchemaTopic *v1alphav1.PulsarTopic + pemptySchemaName string = "test-empty-schema-topic" + emptySchemaTopic string = "persistent://cloud/stage/empty-schema" ppermission *v1alphav1.PulsarPermission ppermissionName string = "test-permission" ppermission2 *v1alphav1.PulsarPermission @@ -179,6 +188,15 @@ var _ = Describe("Resources", func() { pnamespace = utils.MakePulsarNamespace(namespaceName, pnamespaceName, pulsarNamespaceName, pconnName, lifecyclePolicy) ptopic = utils.MakePulsarTopic(namespaceName, ptopicName, topicName, pconnName, lifecyclePolicy) ptopic2 = utils.MakePulsarTopic(namespaceName, ptopicName2, topicName2, pconnName, lifecyclePolicy) + pexistingTopic = utils.MakePulsarTopic(namespaceName, pexistingTopicName, existingTopicName, pconnName, lifecyclePolicy) + pemptySchemaTopic = utils.MakePulsarTopic(namespaceName, pemptySchemaName, emptySchemaTopic, pconnName, lifecyclePolicy) + pemptySchemaTopic.Spec.SchemaInfo = &v1alphav1.SchemaInfo{ + Type: "STRING", + Schema: "", + } + pemptySchemaTopic.Spec.Properties = map[string]string{ + "schema-case": "empty", + } roles := []string{"ironman"} actions := []string{"produce", "consume", "functions"} ppermission = utils.MakePulsarPermission(namespaceName, ppermissionName, topicName, pconnName, v1alphav1.PulsarResourceTypeTopic, roles, actions, v1alphav1.CleanUpAfterDeletion) @@ -324,11 +342,19 @@ var _ = Describe("Resources", func() { }) Context("PulsarTopic operation", Ordered, func() { + It("should create existing topic directly in Pulsar", func() { + createTopicInPulsar(existingTopicName) + }) + It("should create the pulsartopic successfully", Label("Permissions"), func() { err := k8sClient.Create(ctx, ptopic) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) err = k8sClient.Create(ctx, ptopic2) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + err = k8sClient.Create(ctx, pexistingTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) + err = k8sClient.Create(ctx, pemptySchemaTopic) + Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) err = k8sClient.Create(ctx, partitionedTopic) Expect(err == nil || apierrors.IsAlreadyExists(err)).Should(BeTrue()) }) @@ -340,6 +366,18 @@ var _ = Describe("Resources", func() { Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) return v1alphav1.IsPulsarResourceReady(t) }, "20s", "100ms").Should(BeTrue()) + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pexistingTopicName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pemptySchemaName} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) Eventually(func() bool { t := &v1alphav1.PulsarTopic{} tns := types.NamespacedName{Namespace: partitionedTopic.Namespace, Name: partitionedTopic.Name} @@ -402,6 +440,30 @@ var _ = Describe("Resources", func() { // }, "5s", "100ms").Should(Succeed()) }) + It("should not create new schema versions for empty schema", func() { + initialCount := mustGetSchemaVersionCount(emptySchemaTopic) + Expect(initialCount).Should(BeNumerically(">", 0)) + + topic := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pemptySchemaName} + Expect(k8sClient.Get(ctx, tns, topic)).Should(Succeed()) + if topic.Spec.Properties == nil { + topic.Spec.Properties = map[string]string{} + } + topic.Spec.Properties["schema-update"] = "true" + Expect(k8sClient.Update(ctx, topic)).Should(Succeed()) + + Eventually(func() bool { + t := &v1alphav1.PulsarTopic{} + Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + return v1alphav1.IsPulsarResourceReady(t) + }, "20s", "100ms").Should(BeTrue()) + + Consistently(func() int { + return mustGetSchemaVersionCount(emptySchemaTopic) + }, "10s", "1s").Should(Equal(initialCount)) + }) + It("should increase the partitions successfully", func() { curTopic := &v1alphav1.PulsarTopic{} Expect(k8sClient.Get(ctx, types.NamespacedName{ @@ -2633,6 +2695,20 @@ var _ = Describe("Resources", func() { g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) }).Should(Succeed()) + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pexistingTopicName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + + Eventually(func(g Gomega) { + t := &v1alphav1.PulsarTopic{} + tns := types.NamespacedName{Namespace: namespaceName, Name: pemptySchemaName} + g.Expect(k8sClient.Get(ctx, tns, t)).Should(Succeed()) + g.Expect(k8sClient.Delete(ctx, t)).Should(Succeed()) + }).Should(Succeed()) + Eventually(func(g Gomega) { t := &v1alphav1.PulsarTopic{} tns := types.NamespacedName{Namespace: namespaceName, Name: partitionedTopic.Name} @@ -2696,3 +2772,61 @@ func updateTopicSchema(ctx context.Context, topicName, exampleSchemaDef string) } Expect(k8sClient.Update(ctx, topic)).Should(Succeed()) } + +func createTopicInPulsar(topicName string) { + parsedTopicName, err := pulsarutils.GetTopicName(topicName) + Expect(err).Should(Succeed()) + + endpoint := fmt.Sprintf("http://localhost:8080/admin/v2/%s/%s/%s/%s", + parsedTopicName.GetDomain(), + parsedTopicName.GetTenant(), + parsedTopicName.GetNamespace(), + parsedTopicName.GetLocalName()) + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + stdout, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "curl -s -o /dev/null -w \"%{http_code}\" -H \"Authorization: Bearer $PROXY_TOKEN\" -X PUT "+endpoint) + if err != nil { + Expect(err).Should(Succeed()) + } + + code, err := strconv.Atoi(strings.TrimSpace(stdout)) + Expect(err).Should(Succeed()) + if code == 200 || code == 204 || code == 409 { + return + } + + Expect(fmt.Errorf("unexpected status code %d (stderr: %s)", code, stderr)).Should(Succeed()) +} + +func mustGetSchemaVersionCount(topic string) int { + count, err := getSchemaVersionCount(topic) + Expect(err).Should(Succeed()) + return count +} + +func getSchemaVersionCount(topic string) (int, error) { + topicName, err := pulsarutils.GetTopicName(topic) + if err != nil { + return 0, err + } + endpoint := fmt.Sprintf("http://localhost:8080/admin/v2/schemas/%s/%s/%s/schemas", + topicName.GetTenant(), topicName.GetNamespace(), topicName.GetLocalName()) + podName := fmt.Sprintf("%s-broker-0", brokerName) + containerName := fmt.Sprintf("%s-broker", brokerName) + stdout, stderr, err := utils.ExecInPod(k8sConfig, namespaceName, podName, containerName, + "curl -s -H \"Authorization: Bearer $PROXY_TOKEN\" "+endpoint) + if err != nil { + return 0, fmt.Errorf("fetch schema versions failed: %w (stderr: %s)", err, stderr) + } + + var response struct { + Schemas []struct { + Version int64 `json:"version"` + } `json:"getSchemaResponses"` + } + if err := json.Unmarshal([]byte(stdout), &response); err != nil { + return 0, err + } + return len(response.Schemas), nil +}