Skip to content

Commit cf832bb

Browse files
authored
[Issue #1418] Fix: Properties not getting consistently set on pulsaradmin subscription message responses (#1419)
### Motivation The properties on messages returned by PeekMessages is not returned consistently as described in #1418 This change makes sure we set all properties from headers before continuing on, so the returned message should consistently contain all properties. Currently the properties returned is inconsistent on repeated calls, even if the message returned is the same each time. ### Modifications I've made it so all headers are processed / al properties are set on every message, rather than exiting the loop early which can cause inconsistent results.
1 parent 6aa1fb6 commit cf832bb

File tree

2 files changed

+104
-49
lines changed

2 files changed

+104
-49
lines changed

pulsaradmin/pkg/admin/subscription.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
256256
}
257257

258258
properties := make(map[string]string)
259+
isBatch := false
259260
for k := range resp.Header {
260261
switch {
261262
case k == PublishTimeHeader:
@@ -268,7 +269,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
268269
if h != "" {
269270
properties[BatchHeader] = h
270271
}
271-
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
272+
isBatch = true
272273
case k == PropertyHeader:
273274
propJSON := resp.Header.Get(k)
274275
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
@@ -280,6 +281,9 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
280281
}
281282
}
282283

284+
if isBatch {
285+
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
286+
}
283287
return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil
284288
}
285289

pulsaradmin/pkg/admin/subscription_test.go

Lines changed: 99 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package admin
2020
import (
2121
"context"
2222
"fmt"
23+
"strconv"
2324
"sync"
2425
"testing"
2526
"time"
@@ -144,58 +145,108 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
144145
}
145146
}
146147

147-
func TestPeekMessageWithProperties(t *testing.T) {
148-
randomName := newTopicName()
149-
topic := "persistent://public/default/" + randomName
150-
topicName, _ := utils.GetTopicName(topic)
151-
subName := "test-sub"
152-
153-
cfg := &config.Config{}
154-
admin, err := New(cfg)
155-
assert.NoError(t, err)
156-
assert.NotNil(t, admin)
157-
158-
client, err := pulsar.NewClient(pulsar.ClientOptions{
159-
URL: lookupURL,
160-
})
161-
assert.NoError(t, err)
162-
defer client.Close()
163-
164-
// Create a producer for non-batch messages
165-
producer, err := client.CreateProducer(pulsar.ProducerOptions{
166-
Topic: topic,
167-
DisableBatching: true,
168-
})
169-
assert.NoError(t, err)
170-
defer producer.Close()
171-
172-
props := map[string]string{
173-
"key1": "value1",
174-
"KEY2": "VALUE2",
175-
"KeY3": "VaLuE3",
176-
"details=man": "good at playing basketball",
148+
func TestPeekMessagesWithProperties(t *testing.T) {
149+
tests := map[string]struct {
150+
batched bool
151+
}{
152+
"non-batched": {
153+
batched: false,
154+
},
155+
"batched": {
156+
batched: true,
157+
},
177158
}
178159

179-
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
180-
Payload: []byte("test-message"),
181-
Properties: props,
182-
})
183-
assert.NoError(t, err)
184-
185-
// Peek messages
186-
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 1)
187-
assert.NoError(t, err)
188-
assert.NotNil(t, messages)
189-
190-
// Verify properties of messages
191-
for _, msg := range messages {
192-
assert.Equal(t, "value1", msg.Properties["key1"])
193-
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
194-
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
195-
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
160+
for name, tc := range tests {
161+
t.Run(name, func(t *testing.T) {
162+
ctx := context.Background()
163+
randomName := newTopicName()
164+
topic := "persistent://public/default/" + randomName
165+
topicName, _ := utils.GetTopicName(topic)
166+
subName := "test-sub"
167+
168+
cfg := &config.Config{}
169+
admin, err := New(cfg)
170+
assert.NoError(t, err)
171+
assert.NotNil(t, admin)
172+
173+
client, err := pulsar.NewClient(pulsar.ClientOptions{
174+
URL: lookupURL,
175+
})
176+
assert.NoError(t, err)
177+
defer client.Close()
178+
179+
var producer pulsar.Producer
180+
batchSize := 5
181+
if tc.batched {
182+
producer, err = client.CreateProducer(pulsar.ProducerOptions{
183+
Topic: topic,
184+
DisableBatching: false,
185+
BatchingMaxMessages: uint(batchSize),
186+
BatchingMaxPublishDelay: time.Second * 2,
187+
})
188+
assert.NoError(t, err)
189+
defer producer.Close()
190+
} else {
191+
producer, err = client.CreateProducer(pulsar.ProducerOptions{
192+
Topic: topic,
193+
DisableBatching: true,
194+
})
195+
assert.NoError(t, err)
196+
defer producer.Close()
197+
}
198+
199+
props := map[string]string{
200+
"key1": "value1",
201+
"KEY2": "VALUE2",
202+
"KeY3": "VaLuE3",
203+
"details=man": "good at playing basketball",
204+
}
205+
206+
var wg sync.WaitGroup
207+
numberOfMessagesToWaitFor := 10
208+
numberOfMessagesToSend := numberOfMessagesToWaitFor
209+
if tc.batched {
210+
// If batched send one extra message to cause the batch to be sent immediately
211+
numberOfMessagesToSend++
212+
}
213+
wg.Add(numberOfMessagesToWaitFor)
214+
215+
for i := 0; i < numberOfMessagesToSend; i++ {
216+
producer.SendAsync(ctx, &pulsar.ProducerMessage{
217+
Payload: []byte("test-message"),
218+
Properties: props,
219+
}, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
220+
assert.Nil(t, err)
221+
if i < numberOfMessagesToWaitFor {
222+
wg.Done()
223+
}
224+
})
225+
}
226+
wg.Wait()
227+
228+
// Peek messages
229+
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, batchSize)
230+
assert.NoError(t, err)
231+
assert.NotNil(t, messages)
232+
assert.Len(t, messages, batchSize)
233+
234+
// Verify properties of messages
235+
for _, msg := range messages {
236+
assert.Equal(t, "value1", msg.Properties["key1"])
237+
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
238+
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
239+
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
240+
// Standard pulsar properties, set by pulsar
241+
assert.NotEmpty(t, msg.Properties["publish-time"])
242+
if tc.batched {
243+
assert.NotEmpty(t, msg.Properties[BatchHeader])
244+
assert.Equal(t, strconv.Itoa(batchSize), msg.Properties[BatchHeader])
245+
}
246+
}
247+
})
196248
}
197249
}
198-
199250
func TestGetMessageByID(t *testing.T) {
200251
randomName := newTopicName()
201252
topic := "persistent://public/default/" + randomName

0 commit comments

Comments
 (0)