Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ node_modules/

.cursor
.envrc
mise.toml
10 changes: 10 additions & 0 deletions pkg/admin/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,16 @@ func (d *DummyPulsarAdmin) GetSchema(string) (*v1alpha1.SchemaInfo, error) {
return nil, nil
}

// GetSchemaInfoWithVersion is a fake implements of GetSchemaInfoWithVersion
func (d *DummyPulsarAdmin) GetSchemaInfoWithVersion(string) (*v1alpha1.SchemaInfo, int64, error) {
return nil, 0, nil
}

// GetSchemaVersionBySchemaInfo is a fake implements of GetSchemaVersionBySchemaInfo
func (d *DummyPulsarAdmin) GetSchemaVersionBySchemaInfo(string, *v1alpha1.SchemaInfo) (int64, error) {
return 0, nil
}

// UploadSchema is a fake implements of UploadSchema
func (d *DummyPulsarAdmin) UploadSchema(string, *SchemaParams) error {
return nil
Expand Down
39 changes: 34 additions & 5 deletions pkg/admin/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package admin
import (
"errors"
"net"
"strings"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
)
Expand Down Expand Up @@ -51,12 +52,22 @@ const (

// ErrorReason returns the HTTP status code for the error
func ErrorReason(err error) Reason {
cliError, ok := err.(rest.Error)
if !ok {
// can't determine error reason as can't convert to a cli error
if err == nil {
return ReasonUnknown
}
return Reason(cliError.Code)

var restErrPtr *rest.Error
if errors.As(err, &restErrPtr) && restErrPtr != nil {
return Reason(restErrPtr.Code)
}

var restErr rest.Error
if errors.As(err, &restErr) {
return Reason(restErr.Code)
}

// can't determine error reason as can't convert to a cli error
return ReasonUnknown
}

// IsNotFound returns true if the error indicates the resource is not found on server
Expand All @@ -66,7 +77,25 @@ func IsNotFound(err error) bool {

// IsAlreadyExist returns true if the error indicates the resource already exist
func IsAlreadyExist(err error) bool {
return ErrorReason(err) == ReasonAlreadyExist
if err == nil {
return false
}

reason := ErrorReason(err)
if reason == ReasonAlreadyExist {
return true
}
if reason == ReasonInvalidParameter {
return isAlreadyExistsMessage(err)
}
if reason != ReasonUnknown {
return false
}
return isAlreadyExistsMessage(err)
}

func isAlreadyExistsMessage(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "already exist")
}

// IsInternalServerError returns true if the error indicates the resource already exist
Expand Down
90 changes: 90 additions & 0 deletions pkg/admin/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2026 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2026 StreamNative
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package admin

import (
"errors"
"fmt"
"testing"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
)

func TestIsAlreadyExist(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{
name: "rest.Error with 409",
err: rest.Error{Code: 409, Reason: "already exists"},
expected: true,
},
{
name: "rest.Error pointer with 409",
err: &rest.Error{Code: 409, Reason: "already exists"},
expected: true,
},
{
name: "wrapped rest.Error with 409",
err: fmt.Errorf("wrapped: %w", rest.Error{Code: 409, Reason: "already exists"}),
expected: true,
},
{
name: "rest.Error with 412 and already exists reason",
err: rest.Error{Code: 412, Reason: "This topic already exists"},
expected: true,
},
{
name: "wrapped error with already exists message",
err: fmt.Errorf("wrapped: %w", errors.New("This topic already exists")),
expected: true,
},
{
name: "plain error with already exists message",
err: errors.New("This topic already exists"),
expected: true,
},
{
name: "unrelated error",
err: errors.New("permission denied"),
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := IsAlreadyExist(tt.err)
if result != tt.expected {
t.Fatalf("IsAlreadyExist() = %v, want %v", result, tt.expected)
}
})
}
}
33 changes: 33 additions & 0 deletions pkg/admin/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,39 @@ func (p *PulsarAdminClient) GetSchema(topic string) (*v1alpha1.SchemaInfo, error
return rsp, nil
}

// GetSchemaInfoWithVersion get schema info with version for a given topic
func (p *PulsarAdminClient) GetSchemaInfoWithVersion(topic string) (*v1alpha1.SchemaInfo, int64, error) {
info, err := p.adminClient.Schemas().GetSchemaInfoWithVersion(topic)
if err != nil {
return nil, 0, err
}
if info == nil {
return nil, 0, errors.New("schema info is empty")
}
if info.SchemaInfo == nil {
return nil, info.Version, nil
}
rsp := &v1alpha1.SchemaInfo{
Type: info.SchemaInfo.Type,
Schema: string(info.SchemaInfo.Schema),
Properties: info.SchemaInfo.Properties,
}
return rsp, info.Version, nil
}

// GetSchemaVersionBySchemaInfo gets schema version for a given schema payload
func (p *PulsarAdminClient) GetSchemaVersionBySchemaInfo(topic string, info *v1alpha1.SchemaInfo) (int64, error) {
if info == nil {
return 0, errors.New("schema info is nil")
}
payload := utils.SchemaInfo{
Type: info.Type,
Schema: []byte(info.Schema),
Properties: info.Properties,
}
return p.adminClient.Schemas().GetVersionBySchemaInfo(topic, payload)
}

// UploadSchema creates or updates a schema for a given topic
func (p *PulsarAdminClient) UploadSchema(topic string, params *SchemaParams) error {
var payload utils.PostSchemaPayload
Expand Down
6 changes: 6 additions & 0 deletions pkg/admin/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ type PulsarAdmin interface {
// GetSchema retrieves the latest schema of a topic
GetSchema(topic string) (*v1alpha1.SchemaInfo, error)

// GetSchemaInfoWithVersion retrieves the latest schema and its version for a topic
GetSchemaInfoWithVersion(topic string) (*v1alpha1.SchemaInfo, int64, error)

// GetSchemaVersionBySchemaInfo retrieves the version for a given schema payload
GetSchemaVersionBySchemaInfo(topic string, info *v1alpha1.SchemaInfo) (int64, error)

// UploadSchema creates or updates a schema for a given topic
UploadSchema(topic string, params *SchemaParams) error

Expand Down
39 changes: 29 additions & 10 deletions pkg/connection/reconcile_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package connection
import (
"context"
"fmt"
"reflect"
"slices"

"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
Expand Down Expand Up @@ -287,23 +286,43 @@ func (r *PulsarTopicReconciler) ReconcileTopic(ctx context.Context, pulsarAdmin
}

func applySchema(pulsarAdmin admin.PulsarAdmin, topic *resourcev1alpha1.PulsarTopic, log logr.Logger) error {
schema, serr := pulsarAdmin.GetSchema(topic.Spec.Name)
if serr != nil && !admin.IsNotFound(serr) {
return serr
}
if topic.Spec.SchemaInfo != nil {
// Only upload the schema when schema doesn't exist or the schema has been updated
if admin.IsNotFound(serr) || !reflect.DeepEqual(topic.Spec.SchemaInfo, schema) {
uploadSchema := func(currentVersion int64, desiredVersion int64) error {
info := topic.Spec.SchemaInfo
param := &admin.SchemaParams{
Type: info.Type,
Schema: info.Schema,
Properties: info.Properties,
}
log.Info("Upload schema for the topic", "name", topic.Spec.Name, "type", info.Type, "schema", info.Schema, "properties", info.Properties)
if err := pulsarAdmin.UploadSchema(topic.Spec.Name, param); err != nil {
return err
log.Info("Upload schema for the topic",
"name", topic.Spec.Name,
"type", info.Type,
"schema", info.Schema,
"properties", info.Properties,
"currentVersion", currentVersion,
"desiredVersion", desiredVersion)
return pulsarAdmin.UploadSchema(topic.Spec.Name, param)
}

_, currentVersion, serr := pulsarAdmin.GetSchemaInfoWithVersion(topic.Spec.Name)
if serr != nil {
if admin.IsNotFound(serr) {
return uploadSchema(currentVersion, -1)
}
return serr
}

desiredVersion, verr := pulsarAdmin.GetSchemaVersionBySchemaInfo(topic.Spec.Name, topic.Spec.SchemaInfo)
if verr != nil {
if admin.IsNotFound(verr) {
return uploadSchema(currentVersion, -1)
}
return verr
}

// Only upload the schema when schema doesn't exist or the schema has been updated
if desiredVersion < 0 || desiredVersion != currentVersion {
return uploadSchema(currentVersion, desiredVersion)
}
}
// Note: We intentionally do NOT delete existing schemas when schemaInfo is not specified.
Expand Down
Loading