Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pulsaradmin/pkg/admin/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
}

properties := make(map[string]string)
isBatch := false
for k := range resp.Header {
switch {
case k == PublishTimeHeader:
Expand All @@ -268,7 +269,7 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
if h != "" {
properties[BatchHeader] = h
}
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
isBatch = true
case k == PropertyHeader:
propJSON := resp.Header.Get(k)
if err := json.Unmarshal([]byte(propJSON), &properties); err != nil {
Expand All @@ -280,6 +281,9 @@ func handleResp(topic utils.TopicName, resp *http.Response) ([]*utils.Message, e
}
}

if isBatch {
return getIndividualMsgsFromBatch(topic, ID, payload, properties)
}
return []*utils.Message{utils.NewMessage(topic.String(), *ID, payload, properties)}, nil
}

Expand Down