Skip to content

Commit 462e956

Browse files
committed
If cannot deliver incoming message due to queue limit then RELEASE message
1 parent 085744f commit 462e956

File tree

3 files changed

+39
-33
lines changed

3 files changed

+39
-33
lines changed

src/qpid/broker/Queue.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1661,7 +1661,11 @@ bool Queue::checkDepth(const QueueDepth& increment, const Message&)
16611661
if (brokerMgmtObject)
16621662
brokerMgmtObject->inc_discardsOverflow();
16631663
}
1664-
throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name << ": current=[" << current << "], max=[" << settings.maxDepth << "]"));
1664+
throw ResourceLimitExceededException(QPID_MSG("Maximum depth exceeded on " << name <<
1665+
": current=[" << current <<
1666+
"], max=[" << settings.maxDepth <<
1667+
"], increment=[" << increment <<
1668+
"]"));
16651669
} else {
16661670
current += increment;
16671671
return true;

src/qpid/broker/amqp/Incoming.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,17 @@ void DecodingIncoming::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message>
154154
}
155155
userid.verify(message.getUserId());
156156
received->begin();
157-
handle(message, session.getTransaction(delivery));
158-
Transfer t(delivery, sessionPtr);
159-
sessionPtr->pending_accept(delivery);
160-
received->end(t);
157+
158+
try {
159+
handle(message, session.getTransaction(delivery));
160+
Transfer t(delivery, sessionPtr);
161+
sessionPtr->pending_accept(delivery);
162+
received->end(t);
163+
} catch (const qpid::framing::ResourceLimitExceededException& e) {
164+
pn_delivery_update(delivery, PN_RELEASED);
165+
pn_delivery_settle(delivery);
166+
} catch (const qpid::SessionException& e) {
167+
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
168+
}
161169
}
162170
}}} // namespace qpid::broker::amqp

src/qpid/broker/amqp/Session.cpp

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -948,28 +948,27 @@ void IncomingToQueue::handle(qpid::broker::Message& message, qpid::broker::TxBuf
948948
msg << " Queue " << queue->getName() << " has been deleted";
949949
throw Exception(qpid::amqp::error_conditions::RESOURCE_DELETED, msg.str());
950950
}
951+
951952
try {
952953
queue->deliver(message, transaction);
953-
} catch (const qpid::SessionException& e) {
954-
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
955954
}
955+
catch (const qpid::Exception& e) {
956+
QPID_LOG(warning, "Cannot deliver to queue " << queue->getName() << ": " << e.what());
957+
throw;
958+
}
959+
956960
}
957961

958962
void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
959963
{
960-
if (exchange->isDestroyed())
964+
if (exchange->isDestroyed()) {
961965
throw qpid::framing::ResourceDeletedException(QPID_MSG("Exchange " << exchange->getName() << " has been deleted."));
962-
try {
963-
authorise.route(exchange, message);
964-
DeliverableMessage deliverable(message, transaction);
965-
exchange->route(deliverable);
966-
if (!deliverable.delivered) {
967-
if (exchange->getAlternate()) {
968-
exchange->getAlternate()->route(deliverable);
969-
}
970-
}
971-
} catch (const qpid::SessionException& e) {
972-
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
966+
}
967+
authorise.route(exchange, message);
968+
DeliverableMessage deliverable(message, transaction);
969+
exchange->route(deliverable);
970+
if (!deliverable.delivered && exchange->getAlternate()) {
971+
exchange->getAlternate()->route(deliverable);
973972
}
974973
}
975974

@@ -993,21 +992,16 @@ void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuff
993992
}
994993
}
995994

996-
try {
997-
if (queue) {
998-
authorise.incoming(queue);
999-
queue->deliver(message, transaction);
1000-
} else if (exchange) {
1001-
authorise.route(exchange, message);
1002-
DeliverableMessage deliverable(message, transaction);
1003-
exchange->route(deliverable);
1004-
} else {
1005-
QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
1006-
}
1007-
} catch (const qpid::SessionException& e) {
1008-
throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
995+
if (queue) {
996+
authorise.incoming(queue);
997+
queue->deliver(message, transaction);
998+
} else if (exchange) {
999+
authorise.route(exchange, message);
1000+
DeliverableMessage deliverable(message, transaction);
1001+
exchange->route(deliverable);
1002+
} else {
1003+
QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
10091004
}
1010-
10111005
}
10121006

10131007
void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery)

0 commit comments

Comments
 (0)