Skip to content

Commit 387bc1a

Browse files
committed
Add OAuth configuration
WIP
1 parent 5b87f3b commit 387bc1a

File tree

15 files changed

+363
-35
lines changed

15 files changed

+363
-35
lines changed

src/main/java/com/rabbitmq/client/amqp/ConnectionSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ public interface ConnectionSettings<T> {
154154
*/
155155
Affinity<? extends T> affinity();
156156

157+
OAuthSettings<? extends T> oauth();
158+
157159
/**
158160
* TLS settings.
159161
*
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp;
19+
20+
public interface OAuthSettings<T> {
21+
22+
OAuthSettings<T> tokenEndpointUri(String uri);
23+
24+
OAuthSettings<T> clientId(String clientId);
25+
26+
OAuthSettings<T> clientSecret(String clientSecret);
27+
28+
OAuthSettings<T> grantType(String grantType);
29+
30+
OAuthSettings<T> parameter(String name, String value);
31+
32+
T connection();
33+
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static com.rabbitmq.client.amqp.Resource.State.*;
2121
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2222
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
23+
import static java.lang.System.nanoTime;
24+
import static java.time.Duration.ofNanos;
2325

2426
import com.rabbitmq.client.amqp.*;
2527
import com.rabbitmq.client.amqp.ObservationCollector;
@@ -81,8 +83,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
8183
AmqpConnection(AmqpConnectionBuilder builder) {
8284
super(builder.listeners());
8385
this.id = ID_SEQUENCE.getAndIncrement();
84-
this.name = builder.name();
8586
this.environment = builder.environment();
87+
this.name =
88+
builder.name() == null ? this.environment.toString() + "-" + this.id : builder.name();
8689
this.connectionSettings = builder.connectionSettings().consolidate();
8790
this.sessionHandlerSupplier =
8891
builder.isolateResources()
@@ -115,19 +118,18 @@ final class AmqpConnection extends ResourceBase implements Connection {
115118
this.affinityStrategy = null;
116119
}
117120
this.management = createManagement();
118-
if (this.connectionSettings.credentialsProvider()
119-
instanceof UsernamePasswordCredentialsProvider) {
120-
UsernamePasswordCredentialsProvider credentialsProvider =
121-
(UsernamePasswordCredentialsProvider) connectionSettings.credentialsProvider();
122-
Credentials credentials = new UsernamePasswordCredentials(credentialsProvider);
123-
this.credentialsRegistration =
124-
credentials.register(
125-
(u, p) -> {
126-
((AmqpManagement) management()).setToken(p);
127-
});
128-
} else {
129-
this.credentialsRegistration = Credentials.NO_OP.register((u, p) -> {});
130-
}
121+
Credentials credentials = builder.credentials();
122+
this.credentialsRegistration =
123+
credentials.register(
124+
(username, password) -> {
125+
LOGGER.debug("Setting new token for connection {}", this.name);
126+
long start = nanoTime();
127+
((AmqpManagement) management()).setToken(password);
128+
LOGGER.debug(
129+
"Set new token for connection {} in {} ms",
130+
this.name,
131+
ofNanos(nanoTime() - start).toMillis());
132+
});
131133
LOGGER.debug("Opening native connection for connection '{}'...", this.name());
132134
NativeConnectionWrapper ncw =
133135
ConnectionUtils.enforceAffinity(
@@ -611,7 +613,7 @@ ExecutorService dispatchingExecutorService() {
611613
if (this.dispatchingExecutorService == null) {
612614
this.dispatchingExecutorService =
613615
Executors.newSingleThreadExecutor(
614-
Utils.threadFactory("dispatching-" + this.name + "-"));
616+
Utils.threadFactory("dispatching-" + this.name() + "-"));
615617
}
616618
return this.dispatchingExecutorService;
617619
} finally {
@@ -754,7 +756,9 @@ private void close(Throwable cause) {
754756
consumer.close(cause);
755757
}
756758
try {
757-
this.dispatchingExecutorService.shutdownNow();
759+
if (this.dispatchingExecutorService != null) {
760+
this.dispatchingExecutorService.shutdownNow();
761+
}
758762
} catch (Exception e) {
759763
LOGGER.info(
760764
"Error while shutting down dispatching executor service for connection '{}': {}",
@@ -776,7 +780,7 @@ private void close(Throwable cause) {
776780

777781
@Override
778782
public String toString() {
779-
return this.environment.toString() + "-" + this.id;
783+
return this.name();
780784
}
781785

782786
static class NativeConnectionWrapper {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@ public DefaultConnectionSettings.DefaultAffinity<? extends ConnectionBuilder> af
104104
return this.connectionSettings.affinity();
105105
}
106106

107+
@Override
108+
public OAuthSettings<? extends ConnectionBuilder> oauth() {
109+
return this.connectionSettings.oauth();
110+
}
111+
107112
@Override
108113
public ConnectionBuilder listeners(Resource.StateListener... listeners) {
109114
if (listeners == null || listeners.length == 0) {
@@ -157,6 +162,10 @@ AmqpEnvironment environment() {
157162
return environment;
158163
}
159164

165+
Credentials credentials() {
166+
return environment().credentialsFactory().credentials(this.connectionSettings);
167+
}
168+
160169
AmqpRecoveryConfiguration recoveryConfiguration() {
161170
return recoveryConfiguration;
162171
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class AmqpEnvironment implements Environment {
5151
private final ConnectionUtils.AffinityCache affinityCache = new ConnectionUtils.AffinityCache();
5252
private final EventLoop recoveryEventLoop;
5353
private final ExecutorService recoveryEventLoopExecutorService;
54+
private final CredentialsFactory credentialsFactory = new CredentialsFactory(this);
5455

5556
AmqpEnvironment(
5657
ExecutorService executorService,
@@ -120,6 +121,10 @@ Clock clock() {
120121
return this.clock;
121122
}
122123

124+
CredentialsFactory credentialsFactory() {
125+
return this.credentialsFactory;
126+
}
127+
123128
@Override
124129
public void close() {
125130
if (this.closed.compareAndSet(false, true)) {
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import com.rabbitmq.client.amqp.CredentialsProvider;
21+
import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
22+
import com.rabbitmq.client.amqp.oauth.GsonTokenParser;
23+
import com.rabbitmq.client.amqp.oauth.HttpTokenRequester;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
26+
27+
final class CredentialsFactory {
28+
29+
private volatile Credentials oauthCredentials;
30+
private final Lock oauthCredentialsLock = new ReentrantLock();
31+
private final AmqpEnvironment environment;
32+
33+
CredentialsFactory(AmqpEnvironment environment) {
34+
this.environment = environment;
35+
}
36+
37+
Credentials credentials(DefaultConnectionSettings<?> settings) {
38+
CredentialsProvider provider = settings.credentialsProvider();
39+
Credentials credentials;
40+
if (settings.oauth().enabled()) {
41+
// TODO consider OAuth credentials are not shared
42+
credentials = oauthCredentials(settings);
43+
} else {
44+
if (provider instanceof UsernamePasswordCredentialsProvider) {
45+
UsernamePasswordCredentialsProvider credentialsProvider =
46+
(UsernamePasswordCredentialsProvider) provider;
47+
credentials = new UsernamePasswordCredentials(credentialsProvider);
48+
} else {
49+
credentials = Credentials.NO_OP;
50+
}
51+
}
52+
return credentials;
53+
}
54+
55+
private Credentials oauthCredentials(DefaultConnectionSettings<?> connectionSettings) {
56+
Credentials result = this.oauthCredentials;
57+
if (result != null) {
58+
return result;
59+
}
60+
61+
this.oauthCredentialsLock.lock();
62+
try {
63+
if (this.oauthCredentials == null) {
64+
DefaultConnectionSettings.DefaultOAuthSettings<?> settings = connectionSettings.oauth();
65+
// TODO set TLS configuration on TLS requester
66+
// TODO use pre-configured token requester if any
67+
HttpTokenRequester tokenRequester =
68+
new HttpTokenRequester(
69+
settings.tokenEndpointUri(),
70+
settings.clientId(),
71+
settings.clientSecret(),
72+
settings.grantType(),
73+
settings.parameters(),
74+
null,
75+
null,
76+
null,
77+
null,
78+
new GsonTokenParser());
79+
this.oauthCredentials =
80+
new TokenCredentials(tokenRequester, environment.scheduledExecutorService());
81+
}
82+
return this.oauthCredentials;
83+
} finally {
84+
this.oauthCredentialsLock.unlock();
85+
}
86+
}
87+
}

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import java.security.KeyManagementException;
2929
import java.security.NoSuchAlgorithmException;
3030
import java.time.Duration;
31-
import java.util.Collections;
32-
import java.util.List;
33-
import java.util.Random;
31+
import java.util.*;
3432
import java.util.concurrent.CopyOnWriteArrayList;
3533
import java.util.stream.Collectors;
3634
import javax.net.ssl.SSLContext;
@@ -74,6 +72,7 @@ abstract class DefaultConnectionSettings<T> implements ConnectionSettings<T> {
7472
private String saslMechanism = ConnectionSettings.SASL_MECHANISM_ANONYMOUS;
7573
private final DefaultTlsSettings<T> tlsSettings = new DefaultTlsSettings<>(this);
7674
private final DefaultAffinity<T> affinity = new DefaultAffinity<>(this);
75+
private final DefaultOAuthSettings<T> oAuthSettings = new DefaultOAuthSettings<>(this);
7776

7877
@Override
7978
public T uri(String uriString) {
@@ -223,6 +222,10 @@ void copyTo(DefaultConnectionSettings<?> copy) {
223222
}
224223

225224
this.affinity.copyTo(copy.affinity);
225+
226+
if (this.oAuthSettings.enabled()) {
227+
this.oAuthSettings.copyTo((DefaultOAuthSettings<?>) copy.oauth());
228+
}
226229
}
227230

228231
DefaultConnectionSettings<?> consolidate() {
@@ -295,6 +298,11 @@ public DefaultAffinity<? extends T> affinity() {
295298
return this.affinity;
296299
}
297300

301+
@Override
302+
public DefaultOAuthSettings<? extends T> oauth() {
303+
return this.oAuthSettings;
304+
}
305+
298306
static DefaultConnectionSettings<?> instance() {
299307
return new DefaultConnectionSettings<>() {
300308
@Override
@@ -490,4 +498,89 @@ void validate() {
490498
}
491499
}
492500
}
501+
502+
static class DefaultOAuthSettings<T> implements OAuthSettings<T> {
503+
504+
private final DefaultConnectionSettings<T> connectionSettings;
505+
private final Map<String, String> parameters = new HashMap<>();
506+
private String tokenEndpointUri;
507+
private String clientId;
508+
private String clientSecret;
509+
private String grantType = "client_credentials";
510+
511+
DefaultOAuthSettings(DefaultConnectionSettings<T> connectionSettings) {
512+
this.connectionSettings = connectionSettings;
513+
}
514+
515+
@Override
516+
public OAuthSettings<T> tokenEndpointUri(String uri) {
517+
this.tokenEndpointUri = uri;
518+
return this;
519+
}
520+
521+
@Override
522+
public OAuthSettings<T> clientId(String clientId) {
523+
this.clientId = clientId;
524+
return this;
525+
}
526+
527+
@Override
528+
public OAuthSettings<T> clientSecret(String clientSecret) {
529+
this.clientSecret = clientSecret;
530+
return this;
531+
}
532+
533+
@Override
534+
public OAuthSettings<T> grantType(String grantType) {
535+
this.grantType = grantType;
536+
return this;
537+
}
538+
539+
@Override
540+
public OAuthSettings<T> parameter(String name, String value) {
541+
if (value == null) {
542+
this.parameters.remove(name);
543+
} else {
544+
this.parameters.put(name, value);
545+
}
546+
return this;
547+
}
548+
549+
@Override
550+
public T connection() {
551+
return this.connectionSettings.toReturn();
552+
}
553+
554+
void copyTo(DefaultOAuthSettings<?> copy) {
555+
copy.tokenEndpointUri(this.tokenEndpointUri);
556+
copy.clientId(this.clientId);
557+
copy.clientSecret(this.clientSecret);
558+
copy.grantType(this.grantType);
559+
this.parameters.forEach(copy::parameter);
560+
}
561+
562+
String tokenEndpointUri() {
563+
return this.tokenEndpointUri;
564+
}
565+
566+
String clientId() {
567+
return this.clientId;
568+
}
569+
570+
String clientSecret() {
571+
return this.clientSecret;
572+
}
573+
574+
String grantType() {
575+
return this.grantType;
576+
}
577+
578+
Map<String, String> parameters() {
579+
return Map.copyOf(this.parameters);
580+
}
581+
582+
boolean enabled() {
583+
return this.tokenEndpointUri != null;
584+
}
585+
}
493586
}

0 commit comments

Comments
 (0)