Skip to content

Commit a7ef9ed

Browse files
authored
Compatible the HTTP header properties with PIP-279 (#1299)
### Motivation After [pip-279](apache/pulsar#20627), all properties keys and values use json string save to header: `X-Pulsar-Property` This PR to compatible with this change when using subscription admin API. Also, Using `pip-279` also avoids the issue where the Go HTTP client automatically formats HTTP headers: golang/go#37834, This will impact the peek command, the previous method might retrieve `properties` with inconsistent casing compared to the user-defined. ### Modifications - Compatible the HTTP header properties with PIP-279
1 parent 06f2693 commit a7ef9ed

File tree

2 files changed

+66
-1
lines changed

2 files changed

+66
-1
lines changed

pulsaradmin/pkg/admin/subscription.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package admin
2020
import (
2121
"bytes"
2222
"encoding/binary"
23+
"encoding/json"
2324
"io"
2425
"net/http"
2526
"net/url"
@@ -230,7 +231,14 @@ func safeRespClose(resp *http.Response) {
230231
const (
231232
PublishTimeHeader = "X-Pulsar-Publish-Time"
232233
BatchHeader = "X-Pulsar-Num-Batch-Message"
233-
PropertyPrefix = "X-Pulsar-Property-"
234+
235+
// PropertyPrefix is part of the old protocol for message properties.
236+
PropertyPrefix = "X-Pulsar-Property-"
237+
238+
// PropertyHeader is part of the new protocol introduced in SNIP-279
239+
// https://github.com/apache/pulsar/pull/20627
240+
// The value is a JSON string representing the properties.
241+
PropertyHeader = "X-Pulsar-Property"
234242
)
235243

236244
func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, error) {
@@ -261,6 +269,11 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
261269
properties[BatchHeader] = h
262270
}
263271
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
272+
case k == PropertyHeader:
273+
propJSON := resp.Header.Get(k)
274+
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
275+
return nil, err
276+
}
264277
case strings.Contains(k, PropertyPrefix):
265278
key := strings.TrimPrefix(k, PropertyPrefix)
266279
properties[key] = resp.Header.Get(k)

pulsaradmin/pkg/admin/subscription_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,58 @@ func TestPeekMessageForPartitionedTopic(t *testing.T) {
144144
}
145145
}
146146

147+
func TestPeekMessageWithProperties(t *testing.T) {
148+
randomName := newTopicName()
149+
topic := "persistent://public/default/" + randomName
150+
topicName, _ := utils.GetTopicName(topic)
151+
subName := "test-sub"
152+
153+
cfg := &config.Config{}
154+
admin, err := New(cfg)
155+
assert.NoError(t, err)
156+
assert.NotNil(t, admin)
157+
158+
client, err := pulsar.NewClient(pulsar.ClientOptions{
159+
URL: lookupURL,
160+
})
161+
assert.NoError(t, err)
162+
defer client.Close()
163+
164+
// Create a producer for non-batch messages
165+
producer, err := client.CreateProducer(pulsar.ProducerOptions{
166+
Topic: topic,
167+
DisableBatching: true,
168+
})
169+
assert.NoError(t, err)
170+
defer producer.Close()
171+
172+
props := map[string]string{
173+
"key1": "value1",
174+
"KEY2": "VALUE2",
175+
"KeY3": "VaLuE3",
176+
"details=man": "good at playing basketball",
177+
}
178+
179+
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
180+
Payload: []byte("test-message"),
181+
Properties: props,
182+
})
183+
assert.NoError(t, err)
184+
185+
// Peek messages
186+
messages, err := admin.Subscriptions().PeekMessages(*topicName, subName, 1)
187+
assert.NoError(t, err)
188+
assert.NotNil(t, messages)
189+
190+
// Verify properties of messages
191+
for _, msg := range messages {
192+
assert.Equal(t, "value1", msg.Properties["key1"])
193+
assert.Equal(t, "VALUE2", msg.Properties["KEY2"])
194+
assert.Equal(t, "VaLuE3", msg.Properties["KeY3"])
195+
assert.Equal(t, "good at playing basketball", msg.Properties["details=man"])
196+
}
197+
}
198+
147199
func TestGetMessageByID(t *testing.T) {
148200
randomName := newTopicName()
149201
topic := "persistent://public/default/" + randomName

0 commit comments

Comments
 (0)