Skip to content

Commit 9b7ea0a

Browse files
committed
Close producer and consumer on security exception
The session can be closed after permissions are re-evaluated, e.g. when a token is renewed.
1 parent d93f93b commit 9b7ea0a

File tree

5 files changed

+89
-27
lines changed

5 files changed

+89
-27
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
2121
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
22+
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.*;
2223
import static java.time.Duration.ofSeconds;
2324
import static java.util.Optional.ofNullable;
2425

@@ -71,7 +72,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7172
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
7273
private final ExecutorService dispatchingExecutorService;
7374
private final java.util.function.Consumer<Delivery> nativeHandler;
74-
private final java.util.function.Consumer<ClientException> nativeReceiverCloseHandler;
75+
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
7576
// native receiver internal state, accessed only in the native executor/scheduler
7677
private ProtonReceiver protonReceiver;
7778
private volatile Scheduler protonExecutor;
@@ -101,7 +102,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
101102

102103
this.dispatchingExecutorService = connection.dispatchingExecutorService();
103104
this.nativeHandler = createNativeHandler(messageHandler);
104-
this.nativeReceiverCloseHandler =
105+
this.nativeCloseHandler =
105106
e ->
106107
this.dispatchingExecutorService.submit(
107108
() -> {
@@ -116,7 +117,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
116117
this.filters,
117118
this.subscriptionListener,
118119
this.nativeHandler,
119-
this.nativeReceiverCloseHandler);
120+
this.nativeCloseHandler);
120121
this.initStateFromNativeReceiver(this.nativeReceiver);
121122
this.metricsCollector = this.connection.metricsCollector();
122123
try {
@@ -277,8 +278,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
277278
messageHandler.handle(context, message);
278279
}
279280
}
280-
} catch (ClientLinkRemotelyClosedException e) {
281-
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
281+
} catch (ClientLinkRemotelyClosedException | ClientSessionRemotelyClosedException e) {
282+
if (notFound(e) || resourceDeleted(e) || unauthorizedAccess(e)) {
282283
this.close(ExceptionUtils.convert(e));
283284
}
284285
} catch (ClientConnectionRemotelyClosedException e) {
@@ -304,7 +305,7 @@ void recoverAfterConnectionFailure() {
304305
this.filters,
305306
this.subscriptionListener,
306307
this.nativeHandler,
307-
this.nativeReceiverCloseHandler),
308+
this.nativeCloseHandler),
308309
e -> {
309310
boolean shouldRetry =
310311
e instanceof AmqpException.AmqpResourceClosedException
@@ -533,13 +534,6 @@ private void handleException(Exception ex, String operation) {
533534
}
534535

535536
private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
536-
if (ex instanceof ClientLinkRemotelyClosedException) {
537-
ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException) ex;
538-
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
539-
consumer.close(ExceptionUtils.convert(e));
540-
return true;
541-
}
542-
}
543-
return false;
537+
return ExceptionUtils.maybeCloseConsumerOnException(consumer::close, ex);
544538
}
545539
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.*;
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.function.Consumer;
3031
import java.util.function.Function;
3132
import org.apache.qpid.protonj2.client.*;
3233
import org.apache.qpid.protonj2.client.exceptions.ClientException;
@@ -53,6 +54,8 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
5354
private final Duration publishTimeout;
5455
private final SessionHandler sessionHandler;
5556
private volatile ObservationCollector.ConnectionInfo connectionInfo;
57+
private final ExecutorService dispatchingExecutorService;
58+
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
5659

5760
AmqpPublisher(AmqpPublisherBuilder builder) {
5861
super(builder.listeners());
@@ -63,7 +66,17 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
6366
this.connection = builder.connection();
6467
this.publishTimeout = builder.publishTimeout();
6568
this.sessionHandler = this.connection.createSessionHandler();
66-
this.sender = this.createSender(sessionHandler.session(), this.address, this.publishTimeout);
69+
this.dispatchingExecutorService = connection.dispatchingExecutorService();
70+
this.nativeCloseHandler =
71+
e ->
72+
this.dispatchingExecutorService.submit(
73+
() -> {
74+
// get result to make spotbugs happy
75+
boolean ignored = maybeCloseConsumerOnException(this, e);
76+
});
77+
this.sender =
78+
this.createSender(
79+
sessionHandler.session(), this.address, this.publishTimeout, this.nativeCloseHandler);
6780
this.metricsCollector = this.connection.metricsCollector();
6881
this.observationCollector = this.connection.observationCollector();
6982
this.state(OPEN);
@@ -154,7 +167,11 @@ private static MetricsCollector.PublishDisposition mapToPublishDisposition(Statu
154167
void recoverAfterConnectionFailure() {
155168
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
156169
this.sender =
157-
this.createSender(this.sessionHandler.sessionNoCheck(), this.address, this.publishTimeout);
170+
this.createSender(
171+
this.sessionHandler.sessionNoCheck(),
172+
this.address,
173+
this.publishTimeout,
174+
this.nativeCloseHandler);
158175
}
159176

160177
@Override
@@ -164,14 +181,19 @@ public void close() {
164181

165182
// internal API
166183

167-
private Sender createSender(Session session, String address, Duration publishTimeout) {
184+
private Sender createSender(
185+
Session session,
186+
String address,
187+
Duration publishTimeout,
188+
Consumer<ClientException> nativeCloseHandler) {
168189
SenderOptions senderOptions =
169190
new SenderOptions()
170191
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
171192
.sendTimeout(
172193
publishTimeout.isNegative()
173194
? ConnectionOptions.INFINITE
174-
: publishTimeout.toMillis());
195+
: publishTimeout.toMillis())
196+
.closeHandler(nativeCloseHandler);
175197
try {
176198
Sender s =
177199
address == null
@@ -198,6 +220,10 @@ private void close(Throwable cause) {
198220
}
199221
}
200222

223+
private static boolean maybeCloseConsumerOnException(AmqpPublisher publisher, Exception ex) {
224+
return ExceptionUtils.maybeCloseConsumerOnException(publisher::close, ex);
225+
}
226+
201227
private static class DefaultContext implements Publisher.Context {
202228

203229
private final Message message;

src/main/java/com/rabbitmq/client/amqp/impl/ExceptionUtils.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.client.amqp.AmqpException;
2121
import java.util.concurrent.ExecutionException;
2222
import java.util.concurrent.Future;
23+
import java.util.function.Consumer;
2324
import javax.net.ssl.SSLException;
2425
import org.apache.qpid.protonj2.client.ErrorCondition;
2526
import org.apache.qpid.protonj2.client.exceptions.*;
@@ -67,20 +68,14 @@ static AmqpException convert(ClientException e, String format, Object... args) {
6768
return new AmqpException.AmqpSecurityException(message, e);
6869
} else if (isNetworkError(e)) {
6970
return new AmqpException.AmqpConnectionException(e.getMessage(), e);
70-
} else if (e instanceof ClientSessionRemotelyClosedException) {
71+
} else if (e instanceof ClientSessionRemotelyClosedException
72+
|| e instanceof ClientLinkRemotelyClosedException) {
7173
ErrorCondition errorCondition =
72-
((ClientSessionRemotelyClosedException) e).getErrorCondition();
74+
((ClientResourceRemotelyClosedException) e).getErrorCondition();
7375
if (isUnauthorizedAccess(errorCondition)) {
7476
return new AmqpException.AmqpSecurityException(e.getMessage(), e);
7577
} else if (isNotFound(errorCondition)) {
7678
return new AmqpException.AmqpEntityDoesNotExistException(e.getMessage(), e);
77-
} else {
78-
return new AmqpException.AmqpResourceClosedException(e.getMessage(), e);
79-
}
80-
} else if (e instanceof ClientLinkRemotelyClosedException) {
81-
ErrorCondition errorCondition = ((ClientLinkRemotelyClosedException) e).getErrorCondition();
82-
if (isNotFound(errorCondition)) {
83-
return new AmqpException.AmqpEntityDoesNotExistException(e.getMessage(), e);
8479
} else if (isResourceDeleted(errorCondition)) {
8580
return new AmqpException.AmqpEntityDoesNotExistException(e.getMessage(), e);
8681
} else {
@@ -109,6 +104,10 @@ static boolean notFound(ClientResourceRemotelyClosedException e) {
109104
&& "amqp:not-found".equals(e.getErrorCondition().condition());
110105
}
111106

107+
static boolean unauthorizedAccess(ClientResourceRemotelyClosedException e) {
108+
return isUnauthorizedAccess(e.getErrorCondition());
109+
}
110+
112111
private static boolean isUnauthorizedAccess(ErrorCondition errorCondition) {
113112
return errorConditionEquals(errorCondition, ERROR_UNAUTHORIZED_ACCESS);
114113
}
@@ -135,4 +134,16 @@ private static boolean isNetworkError(ClientException e) {
135134
}
136135
return false;
137136
}
137+
138+
static boolean maybeCloseConsumerOnException(Consumer<Throwable> closing, Exception ex) {
139+
if (ex instanceof ClientLinkRemotelyClosedException
140+
|| ex instanceof ClientSessionRemotelyClosedException) {
141+
ClientResourceRemotelyClosedException e = (ClientResourceRemotelyClosedException) ex;
142+
if (notFound(e) || resourceDeleted(e) || unauthorizedAccess(e)) {
143+
closing.accept(ExceptionUtils.convert(e));
144+
return true;
145+
}
146+
}
147+
return false;
148+
}
138149
}

src/main/qpid/org/apache/qpid/protonj2/client/SenderOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package org.apache.qpid.protonj2.client;
1818

1919
import java.util.concurrent.TimeUnit;
20+
import java.util.function.Consumer;
2021
import java.util.function.Supplier;
2122

23+
import org.apache.qpid.protonj2.client.exceptions.ClientException;
2224
import org.apache.qpid.protonj2.client.exceptions.ClientSendTimedOutException;
2325
import org.apache.qpid.protonj2.engine.DeliveryTagGenerator;
2426

@@ -28,6 +30,7 @@
2830
public class SenderOptions extends LinkOptions<SenderOptions> implements Cloneable {
2931

3032
private long sendTimeout = ConnectionOptions.DEFAULT_SEND_TIMEOUT;
33+
private Consumer<ClientException> closeHandler;
3134

3235
private Supplier<DeliveryTagGenerator> tagGeneratorSupplier;
3336

@@ -89,6 +92,26 @@ public SenderOptions sendTimeout(long timeout, TimeUnit units) {
8992
return this;
9093
}
9194

95+
/**
96+
* Callback when the sender is closed / shut down.
97+
*
98+
* @param closeHandler close / shutdown handler
99+
* @return this {@link SenderOptions} instance.
100+
*/
101+
public SenderOptions closeHandler(Consumer<ClientException> closeHandler) {
102+
this.closeHandler = closeHandler;
103+
return this;
104+
}
105+
106+
/**
107+
* Configured close / shutdown handler.
108+
*
109+
* @return the configured handler
110+
*/
111+
public Consumer<ClientException> closeHandler() {
112+
return this.closeHandler;
113+
}
114+
92115
@Override
93116
public SenderOptions clone() {
94117
return copyInto(new SenderOptions());

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientSender.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222
import java.util.concurrent.Future;
2323
import java.util.concurrent.TimeUnit;
24+
import java.util.function.Consumer;
2425

2526
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
2627
import org.apache.qpid.protonj2.buffer.ProtonBufferAllocator;
@@ -49,11 +50,13 @@ public final class ClientSender extends ClientSenderLinkType<Sender> implements
4950

5051
private final Deque<ClientOutgoingEnvelope> blocked = new ArrayDeque<>();
5152
private final SenderOptions options;
53+
private final Consumer<ClientException> closeHandler;
5254

5355
ClientSender(ClientSession session, SenderOptions options, String senderId, org.apache.qpid.protonj2.engine.Sender protonSender) {
5456
super(session, senderId, options, protonSender);
5557

5658
this.options = new SenderOptions(options);
59+
this.closeHandler = options.closeHandler() == null ? e -> { } : options.closeHandler();
5760
}
5861

5962
@Override
@@ -208,6 +211,11 @@ protected void linkSpecificCleanupHandler(ClientException failureCause) {
208211
} else {
209212
failPendingUnsettledAndBlockedSends(new ClientResourceRemotelyClosedException("The sender link has closed"));
210213
}
214+
try {
215+
this.closeHandler.accept(failureCause);
216+
} catch (Exception e) {
217+
LOG.warn("Error in close handler: {}", e.getMessage());
218+
}
211219
}
212220

213221
private void failPendingUnsettledAndBlockedSends(ClientException cause) {

0 commit comments

Comments
 (0)