Skip to content

Commit c17d978

Browse files
author
Kim van der Riet
committed
QPID-8430: Reduce memory consumption of AMQP 0-10 messages when rerouted by preventing deep copy of non-persistent messages. AMQP 1.0 memory consumption is unchanged.
1 parent 8b1a3dc commit c17d978

File tree

6 files changed

+12
-1
lines changed

6 files changed

+12
-1
lines changed

src/qpid/broker/Message.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ void Message::addAnnotation(const std::string& key, const qpid::types::Variant&
172172

173173
void Message::annotationsChanged()
174174
{
175-
if (persistentContext) {
175+
if (persistentContext && persistentContext->isMergeRequired()) {
176176
uint64_t id = persistentContext->getPersistenceId();
177177
persistentContext = persistentContext->merge(getAnnotations());
178178
persistentContext->setIngressCompletion(sharedState);

src/qpid/broker/PersistableMessage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class PersistableMessage : public Persistable
8686
virtual void decodeHeader(framing::Buffer& buffer) = 0;
8787
virtual void decodeContent(framing::Buffer& buffer) = 0;
8888
virtual uint32_t encodedHeaderSize() const = 0;
89+
virtual bool isMergeRequired() const = 0;
8990
virtual boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const = 0;
9091
};
9192

src/qpid/broker/amqp/Message.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,10 @@ void Message::decodeHeader(framing::Buffer& buffer)
514514
}
515515
void Message::decodeContent(framing::Buffer& /*buffer*/) {}
516516

517+
bool Message::isMergeRequired() const {
518+
return true;
519+
}
520+
517521
boost::intrusive_ptr<PersistableMessage> Message::merge(const std::map<std::string, qpid::types::Variant>& added) const
518522
{
519523
//message- or delivery- annotations? would have to determine that from the name, for now assume always message-annotations

src/qpid/broker/amqp/Message.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq
8686
void decodeHeader(framing::Buffer& buffer);
8787
void decodeContent(framing::Buffer& buffer);
8888
uint32_t encodedHeaderSize() const;
89+
bool isMergeRequired() const;
8990
boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const;
9091

9192
static const Message& get(const qpid::broker::Message&);

src/qpid/broker/amqp_0_10/MessageTransfer.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,10 @@ std::string MessageTransfer::getUserId() const
465465
}
466466
MessageTransfer::MessageTransfer(const qpid::framing::FrameSet& f) : frames(f), requiredCredit(0) {}
467467

468+
bool MessageTransfer::isMergeRequired() const {
469+
return isPersistent();
470+
}
471+
468472
boost::intrusive_ptr<PersistableMessage> MessageTransfer::merge(const std::map<std::string, qpid::types::Variant>& annotations) const
469473
{
470474
boost::intrusive_ptr<MessageTransfer> clone(new MessageTransfer(this->frames));

src/qpid/broker/amqp_0_10/MessageTransfer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qp
114114
* but other meta data e.g.routing key and exchange)
115115
*/
116116
uint32_t encodedHeaderSize() const;
117+
bool isMergeRequired() const;
117118
boost::intrusive_ptr<PersistableMessage> merge(const std::map<std::string, qpid::types::Variant>& annotations) const;
118119

119120
QPID_BROKER_EXTERN bool isQMFv2() const;

0 commit comments

Comments
 (0)