From 871fa181a0a0d5741866b5965a4811ce2c932cff Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 3 Sep 2025 11:23:33 +0100 Subject: [PATCH 1/5] Fix: Properties not getting consistently set on pulsaradmin subscription message responses --- pulsaradmin/pkg/admin/subscription.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsaradmin/pkg/admin/subscription.go b/pulsaradmin/pkg/admin/subscription.go index a1a13619be..37a7556536 100644 --- a/pulsaradmin/pkg/admin/subscription.go +++ b/pulsaradmin/pkg/admin/subscription.go @@ -256,6 +256,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e } properties := make(map[string]string) + isBatch := false for k := range resp.Header { switch { case k == PublishTimeHeader: @@ -268,7 +269,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e if h != "" { properties[BatchHeader] = h } - return getIndividualMsgsFromBatch(topic, ID, payload, properties) + isBatch = true case k == PropertyHeader: propJSON := resp.Header.Get(k) if err := json.Unmarshal([]byte(propJSON), &properties); err != nil { @@ -280,7 +281,11 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e } } - return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil + if isBatch { + return getIndividualMsgsFromBatch(topic, ID, payload, properties) + } else { + return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil + } } func getIndividualMsgsFromBatch(topic utils.TopicName, msgID *utils.MessageID, data []byte, From 6cac1463edc24f8dab2b806256712419e6f846e5 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Wed, 3 Sep 2025 12:03:58 +0100 Subject: [PATCH 2/5] tidy --- pulsaradmin/pkg/admin/subscription.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsaradmin/pkg/admin/subscription.go b/pulsaradmin/pkg/admin/subscription.go index 37a7556536..2dfa28d6e8 100644 --- a/pulsaradmin/pkg/admin/subscription.go +++ b/pulsaradmin/pkg/admin/subscription.go @@ -283,9 +283,8 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e if isBatch { return getIndividualMsgsFromBatch(topic, ID, payload, properties) - } else { - return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil } + return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil } func getIndividualMsgsFromBatch(topic utils.TopicName, msgID *utils.MessageID, data []byte, From 78758de53aba8341365cafe94619ac7d584b6703 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Thu, 4 Sep 2025 10:04:58 +0100 Subject: [PATCH 3/5] Add to unit test --- pulsaradmin/pkg/admin/subscription_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index 1e4a7b2148..ccd9b4cb6e 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -193,6 +193,8 @@ func TestPeekMessageWithProperties(t *testing.T) { assert.Equal(t, "VALUE2", msg.Properties["KEY2"]) assert.Equal(t, "VaLuE3", msg.Properties["KeY3"]) assert.Equal(t, "good at playing basketball", msg.Properties["details=man"]) + // Standard pulsar property, always expected + assert.NotEmpty(t, msg.Properties["publish-time"]) } } From fd1a5e4d084217c9967fd574373a97b4855f9b0a Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Thu, 4 Sep 2025 14:01:17 +0100 Subject: [PATCH 4/5] Improve TestPeekMessagesWithProperties --- pulsaradmin/pkg/admin/subscription_test.go | 149 ++++++++++++++------- 1 file changed, 99 insertions(+), 50 deletions(-) diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index ccd9b4cb6e..c200135e1a 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -20,6 +20,7 @@ package admin import ( "context" "fmt" + "strconv" "sync" "testing" "time" @@ -144,60 +145,108 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) { } } -func TestPeekMessageWithProperties(t *testing.T) { - randomName := newTopicName() - topic := "persistent://public/default/" + randomName - topicName, _ := utils.GetTopicName(topic) - subName := "test-sub" - - cfg := &config.Config{} - admin, err := New(cfg) - assert.NoError(t, err) - assert.NotNil(t, admin) - - client, err := pulsar.NewClient(pulsar.ClientOptions{ - URL: lookupURL, - }) - assert.NoError(t, err) - defer client.Close() - - // Create a producer for non-batch messages - producer, err := client.CreateProducer(pulsar.ProducerOptions{ - Topic: topic, - DisableBatching: true, - }) - assert.NoError(t, err) - defer producer.Close() - - props := map[string]string{ - "key1": "value1", - "KEY2": "VALUE2", - "KeY3": "VaLuE3", - "details=man": "good at playing basketball", +func TestPeekMessagesWithProperties(t *testing.T) { + tests := map[string]struct { + batched bool + }{ + "non-batched": { + batched: false, + }, + "batched": { + batched: true, + }, } - _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{ - Payload: []byte("test-message"), - Properties: props, - }) - assert.NoError(t, err) - - // Peek messages - messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 1) - assert.NoError(t, err) - assert.NotNil(t, messages) - - // Verify properties of messages - for _, msg := range messages { - assert.Equal(t, "value1", msg.Properties["key1"]) - assert.Equal(t, "VALUE2", msg.Properties["KEY2"]) - assert.Equal(t, "VaLuE3", msg.Properties["KeY3"]) - assert.Equal(t, "good at playing basketball", msg.Properties["details=man"]) - // Standard pulsar property, always expected - assert.NotEmpty(t, msg.Properties["publish-time"]) + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + randomName := newTopicName() + topic := "persistent://public/default/" + randomName + topicName, _ := utils.GetTopicName(topic) + subName := "test-sub" + + cfg := &config.Config{} + admin, err := New(cfg) + assert.NoError(t, err) + assert.NotNil(t, admin) + + client, err := pulsar.NewClient(pulsar.ClientOptions{ + URL: lookupURL, + }) + assert.NoError(t, err) + defer client.Close() + + var producer pulsar.Producer + batchSize := 5 + if tc.batched { + producer, err = client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: false, + BatchingMaxMessages: uint(batchSize), + BatchingMaxPublishDelay: time.Second * 2, + }) + assert.NoError(t, err) + defer producer.Close() + } else { + producer, err = client.CreateProducer(pulsar.ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.NoError(t, err) + defer producer.Close() + } + + props := map[string]string{ + "key1": "value1", + "KEY2": "VALUE2", + "KeY3": "VaLuE3", + "details=man": "good at playing basketball", + } + + var wg sync.WaitGroup + numberOfMessagesToWaitFor := 10 + numberOfMessagesToSend := numberOfMessagesToWaitFor + if tc.batched { + // If batched send one extra message to cause the batch to be sent immediately + numberOfMessagesToSend += 1 + } + wg.Add(numberOfMessagesToWaitFor) + + for i := 0; i < numberOfMessagesToSend; i++ { + producer.SendAsync(ctx, &pulsar.ProducerMessage{ + Payload: []byte("test-message"), + Properties: props, + }, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { + assert.Nil(t, err) + if i < numberOfMessagesToWaitFor { + wg.Done() + } + }) + } + wg.Wait() + + // Peek messages + messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, batchSize) + assert.NoError(t, err) + assert.NotNil(t, messages) + assert.Len(t, messages, batchSize) + + // Verify properties of messages + for _, msg := range messages { + assert.Equal(t, "value1", msg.Properties["key1"]) + assert.Equal(t, "VALUE2", msg.Properties["KEY2"]) + assert.Equal(t, "VaLuE3", msg.Properties["KeY3"]) + assert.Equal(t, "good at playing basketball", msg.Properties["details=man"]) + // Standard pulsar properties, set by pulsar + assert.NotEmpty(t, msg.Properties["publish-time"]) + if tc.batched { + assert.NotEmpty(t, msg.Properties[BatchHeader]) + assert.Equal(t, strconv.Itoa(batchSize), msg.Properties[BatchHeader]) + } + } + }) } } - func TestGetMessageByID(t *testing.T) { randomName := newTopicName() topic := "persistent://public/default/" + randomName From 5951a818b65f779b7d3eceb4d0eb06c4773704d1 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Fri, 5 Sep 2025 09:50:41 +0100 Subject: [PATCH 5/5] Lint --- pulsaradmin/pkg/admin/subscription_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsaradmin/pkg/admin/subscription_test.go b/pulsaradmin/pkg/admin/subscription_test.go index c200135e1a..08d2cf221e 100644 --- a/pulsaradmin/pkg/admin/subscription_test.go +++ b/pulsaradmin/pkg/admin/subscription_test.go @@ -208,7 +208,7 @@ func TestPeekMessagesWithProperties(t *testing.T) { numberOfMessagesToSend := numberOfMessagesToWaitFor if tc.batched { // If batched send one extra message to cause the batch to be sent immediately - numberOfMessagesToSend += 1 + numberOfMessagesToSend++ } wg.Add(numberOfMessagesToWaitFor) @@ -216,7 +216,7 @@ func TestPeekMessagesWithProperties(t *testing.T) { producer.SendAsync(ctx, &pulsar.ProducerMessage{ Payload: []byte("test-message"), Properties: props, - }, func(id pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { + }, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) { assert.Nil(t, err) if i < numberOfMessagesToWaitFor { wg.Done()