Skip to content

Commit 4d2d1c8

Browse files
committed
Adjust expiration time for OAuth tests
1 parent c2565ca commit 4d2d1c8

File tree

12 files changed

+229
-151
lines changed

12 files changed

+229
-151
lines changed

src/main/java/com/rabbitmq/client/amqp/OAuthSettings.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,53 @@
1919

2020
import javax.net.ssl.SSLContext;
2121

22+
/**
23+
* Configuration to retrieve a token using the <a
24+
* href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2 Client Credentials flow</a>.
25+
*
26+
* @param <T> the type of object returned by methods, usually the object itself
27+
*/
2228
public interface OAuthSettings<T> {
2329

30+
/**
31+
* Set the URI to access to get the token.
32+
*
33+
* <p>TLS is supported by providing a <code>HTTPS</code> URI and setting a {@link SSLContext}. See
34+
* {@link #tls()} for more information. <em>Applications in production should always use HTTPS to
35+
* retrieve tokens.</em>
36+
*
37+
* @param uri access URI
38+
* @return OAuth settings
39+
*/
2440
OAuthSettings<T> tokenEndpointUri(String uri);
2541

42+
/**
43+
* Set the OAuth 2 client ID
44+
*
45+
* <p>The client ID usually identifies the application that requests a token.
46+
*
47+
* @param clientId client ID
48+
* @return OAuth settings
49+
*/
2650
OAuthSettings<T> clientId(String clientId);
2751

52+
/**
53+
* Set the secret (password) to use to get a token.
54+
*
55+
* @param clientSecret client secret
56+
* @return OAuth settings
57+
*/
2858
OAuthSettings<T> clientSecret(String clientSecret);
2959

60+
/**
61+
* Set the grant type to use when requesting the token.
62+
*
63+
* <p>The default is <code>client_credentials</code>, but some OAuth 2 servers can use
64+
* non-standard grant types to request tokens with extra-information.
65+
*
66+
* @param grantType grant type
67+
* @return OAuth settings
68+
*/
3069
OAuthSettings<T> grantType(String grantType);
3170

3271
OAuthSettings<T> parameter(String name, String value);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 37 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.Resource.State.*;
21+
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert;
2122
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2223
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
2324
import static java.lang.System.nanoTime;
2425
import static java.time.Duration.ofNanos;
2526

2627
import com.rabbitmq.client.amqp.*;
2728
import com.rabbitmq.client.amqp.ObservationCollector;
29+
import com.rabbitmq.client.amqp.impl.Utils.RunnableWithException;
2830
import com.rabbitmq.client.amqp.impl.Utils.StopWatch;
2931
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
3032
import java.time.Duration;
@@ -102,8 +104,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
102104
} else {
103105
disconnectHandler =
104106
(c, e) -> {
105-
AmqpException failureCause =
106-
ExceptionUtils.convert(e.failureCause(), "Connection disconnected");
107+
AmqpException failureCause = convert(e.failureCause(), "Connection disconnected");
107108
this.close(failureCause);
108109
};
109110
}
@@ -248,7 +249,7 @@ private NativeConnectionWrapper connect(
248249
checkBrokerVersion(connection);
249250
return new NativeConnectionWrapper(connection, extractNode(connection), address);
250251
} catch (ClientException e) {
251-
throw ExceptionUtils.convert(e);
252+
throw convert(e);
252253
} finally {
253254
LOGGER.debug("Connection attempt for '{}' took {}", this.name(), stopWatch.stop());
254255
}
@@ -275,7 +276,7 @@ private static String brokerVersion(org.apache.qpid.protonj2.client.Connection c
275276
try {
276277
return (String) connection.properties().get("version");
277278
} catch (ClientException e) {
278-
throw ExceptionUtils.convert(e);
279+
throw convert(e);
279280
}
280281
}
281282

@@ -315,11 +316,11 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
315316
resultReference = new AtomicReference<>();
316317
BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent> result =
317318
(conn, event) -> {
318-
ClientIOException ioex = event.failureCause();
319+
ClientIOException failureCause = event.failureCause();
319320
LOGGER.debug(
320321
"Disconnect handler of '{}', error is the following: {}",
321322
this.name(),
322-
ioex.getMessage());
323+
failureCause.getMessage());
323324
if (this.state() == OPENING) {
324325
LOGGER.debug("Connection is still opening, disconnect handler skipped");
325326
// the broker is not available when opening the connection
@@ -332,7 +333,8 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
332333
this.name());
333334
return;
334335
}
335-
AmqpException exception = ExceptionUtils.convert(event.failureCause());
336+
AmqpException exception =
337+
convert(failureCause, "Connection '%s' disconnected", this.name());
336338
LOGGER.debug("Converted native exception to {}", exception.getClass().getSimpleName());
337339

338340
if (RECOVERY_PREDICATE.test(exception) && this.state() != OPENING) {
@@ -353,8 +355,8 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
353355
LOGGER.debug(
354356
"Not recovering connection '{}' for error {}",
355357
this.name(),
356-
event.failureCause().getMessage());
357-
close(ExceptionUtils.convert(ioex));
358+
failureCause.getMessage());
359+
close(exception);
358360
}
359361
};
360362

@@ -596,7 +598,7 @@ private Session openSession(org.apache.qpid.protonj2.client.Connection connectio
596598
try {
597599
return connection.openSession();
598600
} catch (ClientException e) {
599-
throw ExceptionUtils.convert(e, "Error while opening session");
601+
throw convert(e, "Error while opening session");
600602
}
601603
}
602604

@@ -745,48 +747,47 @@ long id() {
745747
private void close(Throwable cause) {
746748
if (this.closed.compareAndSet(false, true)) {
747749
this.state(CLOSING, cause);
750+
LOGGER.debug("Closing connection {}", this);
748751
this.credentialsRegistration.unregister();
749752
this.environment.removeConnection(this);
753+
BiConsumer<String, RunnableWithException> safeClose =
754+
(label, action) -> {
755+
try {
756+
action.run();
757+
} catch (Exception e) {
758+
LOGGER.info(
759+
"Error during connection '{}' closing ({}): {}", this, label, e.getMessage());
760+
}
761+
};
750762
if (this.topologyListener instanceof AutoCloseable) {
751-
try {
752-
((AutoCloseable) this.topologyListener).close();
753-
} catch (Exception e) {
754-
LOGGER.info("Error while closing topology listener", e);
755-
}
763+
safeClose.accept(
764+
"topology listener", () -> ((AutoCloseable) this.topologyListener).close());
756765
}
757-
this.closeManagement();
766+
safeClose.accept("management", this::closeManagement);
767+
758768
for (RpcClient rpcClient : this.rpcClients) {
759-
rpcClient.close();
769+
safeClose.accept("RPC client", rpcClient::close);
760770
}
761771
for (RpcServer rpcServer : this.rpcServers) {
762-
rpcServer.close();
772+
safeClose.accept("RPC server", rpcServer::close);
763773
}
764774
for (AmqpPublisher publisher : this.publishers) {
765-
publisher.close(cause);
775+
safeClose.accept("publisher", () -> publisher.close(cause));
766776
}
767777
for (AmqpConsumer consumer : this.consumers) {
768-
consumer.close(cause);
778+
safeClose.accept("consumer", () -> consumer.close(cause));
769779
}
770-
try {
771-
if (this.dispatchingExecutorService != null) {
772-
this.dispatchingExecutorService.shutdownNow();
773-
}
774-
} catch (Exception e) {
775-
LOGGER.info(
776-
"Error while shutting down dispatching executor service for connection '{}': {}",
777-
this.name(),
778-
e.getMessage());
780+
if (this.dispatchingExecutorService != null) {
781+
safeClose.accept(
782+
"dispatcing executor service", () -> this.dispatchingExecutorService.shutdown());
779783
}
780-
try {
781-
org.apache.qpid.protonj2.client.Connection nc = this.nativeConnection;
782-
if (nc != null) {
783-
nc.close();
784-
}
785-
} catch (Exception e) {
786-
LOGGER.warn("Error while closing native connection", e);
784+
org.apache.qpid.protonj2.client.Connection nc = this.nativeConnection;
785+
if (nc != null) {
786+
safeClose.accept("native connection", nc::close);
787787
}
788788
this.state(CLOSED, cause);
789789
this.environment.metricsCollector().closeConnection();
790+
LOGGER.debug("Connection {} has been closed", this);
790791
}
791792
}
792793

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5555
private volatile ClientReceiver nativeReceiver;
5656
private final AtomicBoolean closed = new AtomicBoolean(false);
5757
private final int initialCredits;
58-
private final MessageHandler messageHandler;
5958
private final Long id;
6059
private final String address;
6160
private final String queue;
@@ -84,7 +83,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
8483
super(builder.listeners());
8584
this.id = ID_SEQUENCE.getAndIncrement();
8685
this.initialCredits = builder.initialCredits();
87-
this.messageHandler =
86+
MessageHandler messageHandler =
8887
builder
8988
.connection()
9089
.observationCollector()
@@ -257,43 +256,6 @@ private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler
257256
};
258257
}
259258

260-
private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHandler) {
261-
return () -> {
262-
try {
263-
receiver.addCredit(this.initialCredits);
264-
while (!Thread.currentThread().isInterrupted()) {
265-
Delivery delivery = receiver.receive(100, TimeUnit.MILLISECONDS);
266-
if (delivery != null) {
267-
this.unsettledMessageCount.incrementAndGet();
268-
this.metricsCollector.consume();
269-
AmqpMessage message = new AmqpMessage(delivery.message());
270-
Consumer.Context context =
271-
new DeliveryContext(
272-
delivery,
273-
this.protonExecutor,
274-
this.metricsCollector,
275-
this.unsettledMessageCount,
276-
this.replenishCreditOperation,
277-
this);
278-
messageHandler.handle(context, message);
279-
}
280-
}
281-
} catch (ClientLinkRemotelyClosedException | ClientSessionRemotelyClosedException e) {
282-
if (notFound(e) || resourceDeleted(e) || unauthorizedAccess(e)) {
283-
this.close(ExceptionUtils.convert(e));
284-
}
285-
} catch (ClientConnectionRemotelyClosedException e) {
286-
// receiver is closed
287-
} catch (ClientException e) {
288-
java.util.function.Consumer<String> log =
289-
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.warn(m, e);
290-
log.accept("Error while polling AMQP receiver");
291-
} catch (Exception e) {
292-
LOGGER.warn("Unexpected error in consumer loop", e);
293-
}
294-
};
295-
}
296-
297259
void recoverAfterConnectionFailure() {
298260
this.nativeReceiver =
299261
RetryUtils.callAndMaybeRetry(

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicLong;
2828
import org.apache.qpid.protonj2.client.*;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931

3032
class AmqpEnvironment implements Environment {
3133

34+
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpEnvironment.class);
3235
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
3336

3437
private final Client client;
@@ -128,6 +131,7 @@ CredentialsFactory credentialsFactory() {
128131
@Override
129132
public void close() {
130133
if (this.closed.compareAndSet(false, true)) {
134+
LOGGER.debug("Closing environment {}", this);
131135
this.connectionManager.close();
132136
this.client.close();
133137
this.recoveryEventLoop.close();
@@ -145,6 +149,7 @@ public void close() {
145149
this.clockRefreshFuture.cancel(false);
146150
}
147151
this.scheduledExecutorService.shutdownNow();
152+
LOGGER.debug("Environment {} has been closed", this);
148153
}
149154
}
150155

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ OutstandingRequest request(Message<?> request, UUID requestId) throws ClientExce
404404
this.outstandingRequests.put(requestId, outstandingRequest);
405405
LOGGER.debug("Sending request {}", requestId);
406406
this.sender.send(request);
407+
// FIXME use async callback for management responses
407408
Future<?> loop = this.receiveLoop;
408409
if (loop == null) {
409410
this.instanceLock.lock();

src/main/java/com/rabbitmq/client/amqp/impl/Credentials.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,11 @@ interface Registration {
2828
void connect(AuthenticationCallback callback);
2929

3030
void unregister();
31-
3231
}
3332

3433
interface AuthenticationCallback {
3534

3635
void authenticate(String username, String password);
37-
3836
}
3937

4038
class NoOpCredentials implements Credentials {

src/main/java/com/rabbitmq/client/amqp/impl/CredentialsFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ private Credentials createOAuthCredentials(DefaultConnectionSettings<?> connecti
9292
clientBuilderConsumer,
9393
null,
9494
new GsonTokenParser());
95-
return new TokenCredentials(tokenRequester, environment.scheduledExecutorService());
95+
return new TokenCredentials(
96+
tokenRequester, environment.scheduledExecutorService(), settings.refreshDelayStrategy());
9697
}
9798
}

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@
2828
import java.security.KeyManagementException;
2929
import java.security.NoSuchAlgorithmException;
3030
import java.time.Duration;
31+
import java.time.Instant;
3132
import java.util.*;
3233
import java.util.concurrent.CopyOnWriteArrayList;
34+
import java.util.function.Function;
3335
import java.util.stream.Collectors;
3436
import javax.net.ssl.SSLContext;
3537
import javax.net.ssl.TrustManager;
@@ -509,6 +511,8 @@ static class DefaultOAuthSettings<T> implements OAuthSettings<T> {
509511
private String clientSecret;
510512
private String grantType = "client_credentials";
511513
private boolean shared = true;
514+
private Function<Instant, Duration> refreshDelayStrategy =
515+
TokenCredentials.DEFAULT_REFRESH_DELAY_STRATEGY;
512516

513517
DefaultOAuthSettings(DefaultConnectionSettings<T> connectionSettings) {
514518
this.connectionSettings = connectionSettings;
@@ -555,6 +559,15 @@ public OAuthSettings<T> shared(boolean shared) {
555559
return this;
556560
}
557561

562+
DefaultOAuthSettings<T> refreshDelayStrategy(Function<Instant, Duration> refreshDelayStrategy) {
563+
this.refreshDelayStrategy = refreshDelayStrategy;
564+
return this;
565+
}
566+
567+
Function<Instant, Duration> refreshDelayStrategy() {
568+
return this.refreshDelayStrategy;
569+
}
570+
558571
@Override
559572
public DefaultOAuthTlsSettings<? extends T> tls() {
560573
this.tls.enable();
@@ -576,6 +589,7 @@ void copyTo(DefaultOAuthSettings<?> copy) {
576589
if (this.tls.enabled()) {
577590
this.tls.copyTo(copy.tls());
578591
}
592+
copy.refreshDelayStrategy(this.refreshDelayStrategy);
579593
}
580594

581595
String tokenEndpointUri() {

0 commit comments

Comments
 (0)