Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pulsaradmin/pkg/admin/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
}

properties := make(map[string]string)
isBatch := false
for k := range resp.Header {
switch {
case k == PublishTimeHeader:
Expand All @@ -268,7 +269,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
if h != "" {
properties[BatchHeader] = h
}
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
isBatch = true
case k == PropertyHeader:
propJSON := resp.Header.Get(k)
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
Expand All @@ -280,6 +281,9 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
}
}

if isBatch {
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
}
return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil
}

Expand Down
147 changes: 99 additions & 48 deletions pulsaradmin/pkg/admin/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package admin
import (
"context"
"fmt"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -144,58 +145,108 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
}
}

func TestPeekMessageWithProperties(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, _ := utils.GetTopicName(topic)
subName := "test-sub"

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()

// Create a producer for non-batch messages
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()

props := map[string]string{
"key1": "value1",
"KEY2": "VALUE2",
"KeY3": "VaLuE3",
"details=man": "good at playing basketball",
func TestPeekMessagesWithProperties(t *testing.T) {
tests := map[string]struct {
batched bool
}{
"non-batched": {
batched: false,
},
"batched": {
batched: true,
},
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("test-message"),
Properties: props,
})
assert.NoError(t, err)

// Peek messages
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 1)
assert.NoError(t, err)
assert.NotNil(t, messages)

// Verify properties of messages
for _, msg := range messages {
assert.Equal(t, "value1", msg.Properties["key1"])
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx := context.Background()
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
topicName, _ := utils.GetTopicName(topic)
subName := "test-sub"

cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
assert.NoError(t, err)
defer client.Close()

var producer pulsar.Producer
batchSize := 5
if tc.batched {
producer, err = client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
BatchingMaxMessages: uint(batchSize),
BatchingMaxPublishDelay: time.Second * 2,
})
assert.NoError(t, err)
defer producer.Close()
} else {
producer, err = client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.NoError(t, err)
defer producer.Close()
}

props := map[string]string{
"key1": "value1",
"KEY2": "VALUE2",
"KeY3": "VaLuE3",
"details=man": "good at playing basketball",
}

var wg sync.WaitGroup
numberOfMessagesToWaitFor := 10
numberOfMessagesToSend := numberOfMessagesToWaitFor
if tc.batched {
// If batched send one extra message to cause the batch to be sent immediately
numberOfMessagesToSend++
}
wg.Add(numberOfMessagesToWaitFor)

for i := 0; i < numberOfMessagesToSend; i++ {
producer.SendAsync(ctx, &pulsar.ProducerMessage{
Payload: []byte("test-message"),
Properties: props,
}, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
assert.Nil(t, err)
if i < numberOfMessagesToWaitFor {
wg.Done()
}
})
}
wg.Wait()

// Peek messages
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, batchSize)
assert.NoError(t, err)
assert.NotNil(t, messages)
assert.Len(t, messages, batchSize)

// Verify properties of messages
for _, msg := range messages {
assert.Equal(t, "value1", msg.Properties["key1"])
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
// Standard pulsar properties, set by pulsar
assert.NotEmpty(t, msg.Properties["publish-time"])
if tc.batched {
assert.NotEmpty(t, msg.Properties[BatchHeader])
assert.Equal(t, strconv.Itoa(batchSize), msg.Properties[BatchHeader])
}
}
})
}
}

func TestGetMessageByID(t *testing.T) {
randomName := newTopicName()
topic := "persistent://public/default/" + randomName
Expand Down