diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index b2ece7f41..8a4c6d34b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -57,6 +57,9 @@ import org.apache.qpid.jms.transports.TransportFactory; import org.apache.qpid.jms.transports.TransportListener; import org.apache.qpid.jms.util.IOExceptionSupport; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; @@ -117,6 +120,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private int drainTimeout = 60000; private long sessionOutoingWindow = -1; //Use proton default private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + private TerminusExpiryPolicy defaultExpiryPolicy; + private UnsignedInteger defaultTimeout; private final URI remoteURI; private final AtomicBoolean closed = new AtomicBoolean(); @@ -1071,6 +1076,36 @@ public void setSessionOutgoingWindow(long sessionOutoingWindow) { this.sessionOutoingWindow = sessionOutoingWindow; } + public TerminusExpiryPolicy getDefaultExpiryPolicy() { + return defaultExpiryPolicy; + } + + /** + * Set the default expiry policy. This will override consumer policy. + * + * @param defaultExpiryPolicy + */ + public void setDefaultExpiryPolicy(String defaultExpiryPolicy) { + this.defaultExpiryPolicy = TerminusExpiryPolicy.valueOf(Symbol.valueOf(defaultExpiryPolicy)); + } + + public UnsignedInteger getDefaultTimeout() { + return defaultTimeout; + } + + /** + * Set the default timeout for consumer policy + * + * @param timeout + */ + public void setDefaultTimeout(int timeout) { + if (timeout < 0) { + defaultTimeout = new UnsignedInteger(Integer.MAX_VALUE); + } else { + defaultTimeout = new UnsignedInteger(timeout); + } + } + public long getCloseTimeout() { return this.closeTimeout; } @@ -1178,7 +1213,7 @@ private void checkClosed() throws ProviderClosedException { } } - private final class IdleTimeoutCheck implements Runnable { + private final class IdleTimeoutCheck implements Runnable { @Override public void run() { boolean checkScheduled = false; diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java index 3bf883637..9ba374edd 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java @@ -127,6 +127,13 @@ private void configureSource(Source source) { source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); } + if (this.parent.getConnection().getProvider().getDefaultExpiryPolicy() != null) { + source.setExpiryPolicy(this.parent.getConnection().getProvider().getDefaultExpiryPolicy()); + } + if (this.parent.getConnection().getProvider().getDefaultTimeout() != null) { + source.setTimeout(this.parent.getConnection().getProvider().getDefaultTimeout()); + } + if (resourceInfo.isBrowser()) { source.setDistributionMode(COPY); }