Skip to content

Commit 214e042

Browse files
committed
Start support for OAuth 2 token retrieval and refresh
1 parent abc2471 commit 214e042

19 files changed

+841
-29
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
<slf4j.version>1.7.36</slf4j.version>
4545
<netty4.version>4.1.115.Final</netty4.version>
4646
<netty4.iouring.version>0.0.25.Final</netty4.iouring.version>
47-
<netty5.version>5.0.0.Alpha5</netty5.version>
4847
<micrometer.version>1.14.1</micrometer.version>
4948
<logback.version>1.2.13</logback.version>
5049
<junit.jupiter.version>5.11.3</junit.jupiter.version>
@@ -77,6 +76,7 @@
7776
<gpg.skip>true</gpg.skip>
7877
<gpg.keyname>6026DFCA</gpg.keyname>
7978
<spotbugs.skip>false</spotbugs.skip>
79+
<gson.version>2.11.0</gson.version>
8080
</properties>
8181

8282
<dependencies>
@@ -140,6 +140,13 @@
140140
</dependency>
141141
<!-- End of QPid dependencies -->
142142

143+
<dependency>
144+
<groupId>com.google.code.gson</groupId>
145+
<artifactId>gson</artifactId>
146+
<version>${gson.version}</version>
147+
<optional>true</optional>
148+
</dependency>
149+
143150
<dependency>
144151
<groupId>org.junit.jupiter</groupId>
145152
<artifactId>junit-jupiter-engine</artifactId>

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
7676
private final Lock instanceLock = new ReentrantLock();
7777
private final boolean filterExpressionsSupported, setTokenSupported;
7878
private volatile ExecutorService dispatchingExecutorService;
79+
private final Credentials credentials;
7980

8081
AmqpConnection(AmqpConnectionBuilder builder) {
8182
super(builder.listeners());
@@ -114,6 +115,14 @@ final class AmqpConnection extends ResourceBase implements Connection {
114115
this.affinityStrategy = null;
115116
}
116117
this.management = createManagement();
118+
if (this.connectionSettings.credentialsProvider()
119+
instanceof UsernamePasswordCredentialsProvider) {
120+
UsernamePasswordCredentialsProvider credentialsProvider =
121+
(UsernamePasswordCredentialsProvider) connectionSettings.credentialsProvider();
122+
this.credentials = new UsernamePasswordCredentials(credentialsProvider);
123+
} else {
124+
this.credentials = callback -> {};
125+
}
117126
LOGGER.debug("Opening native connection for connection '{}'...", this.name());
118127
NativeConnectionWrapper ncw =
119128
ConnectionUtils.enforceAffinity(
@@ -190,12 +199,7 @@ private NativeConnectionWrapper connect(
190199
List<Address> addresses) {
191200

192201
ConnectionOptions connectionOptions = new ConnectionOptions();
193-
if (connectionSettings.credentialsProvider() instanceof UsernamePasswordCredentialsProvider) {
194-
UsernamePasswordCredentialsProvider credentialsProvider =
195-
(UsernamePasswordCredentialsProvider) connectionSettings.credentialsProvider();
196-
connectionOptions.user(credentialsProvider.getUsername());
197-
connectionOptions.password(credentialsProvider.getPassword());
198-
}
202+
credentials.configure(new TokenConnectionCallback(connectionOptions));
199203
connectionOptions.virtualHost("vhost:" + connectionSettings.virtualHost());
200204
connectionOptions.saslOptions().addAllowedMechanism(connectionSettings.saslMechanism());
201205
connectionOptions.idleTimeout(
@@ -807,4 +811,25 @@ public boolean equals(Object o) {
807811
public int hashCode() {
808812
return Objects.hashCode(id);
809813
}
814+
815+
private static class TokenConnectionCallback implements Credentials.ConnectionCallback {
816+
817+
private final ConnectionOptions options;
818+
819+
private TokenConnectionCallback(ConnectionOptions options) {
820+
this.options = options;
821+
}
822+
823+
@Override
824+
public Credentials.ConnectionCallback username(String username) {
825+
options.user(username);
826+
return this;
827+
}
828+
829+
@Override
830+
public Credentials.ConnectionCallback password(String password) {
831+
options.password(password);
832+
return this;
833+
}
834+
}
810835
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,9 @@ void close(Throwable cause) {
333333
this.state(CLOSING, cause);
334334
this.connection.removeConsumer(this);
335335
try {
336-
this.nativeReceiver.close();
336+
if (this.nativeReceiver != null) {
337+
this.nativeReceiver.close();
338+
}
337339
this.sessionHandler.close();
338340
} catch (Exception e) {
339341
LOGGER.warn("Error while closing receiver", e);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ void close(Throwable cause) {
210210
this.state(State.CLOSING, cause);
211211
this.connection.removePublisher(this);
212212
try {
213-
this.sender.close();
213+
if (this.sender != null) {
214+
this.sender.close();
215+
}
214216
this.sessionHandler.close();
215217
} catch (Exception e) {
216218
LOGGER.warn("Error while closing sender", e);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
interface Credentials {
21+
22+
void configure(ConnectionCallback callback);
23+
24+
interface ConnectionCallback {
25+
26+
ConnectionCallback username(String username);
27+
28+
ConnectionCallback password(String password);
29+
}
30+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
21+
import com.rabbitmq.client.amqp.oauth.Token;
22+
import com.rabbitmq.client.amqp.oauth.TokenParser;
23+
import com.rabbitmq.client.amqp.oauth.TokenRequester;
24+
import java.util.concurrent.locks.Lock;
25+
import java.util.concurrent.locks.ReentrantLock;
26+
27+
final class OAuthCredentialsProvider implements UsernamePasswordCredentialsProvider {
28+
29+
private final TokenRequester requester;
30+
private final TokenParser parser;
31+
private volatile Token token;
32+
private final Lock lock = new ReentrantLock();
33+
34+
OAuthCredentialsProvider(TokenRequester requester, TokenParser parser) {
35+
this.requester = requester;
36+
this.parser = parser;
37+
}
38+
39+
@Override
40+
public String getUsername() {
41+
return "";
42+
}
43+
44+
@Override
45+
public String getPassword() {
46+
lock.lock();
47+
try {
48+
if (token == null || token.expirationTime() < System.currentTimeMillis()) {
49+
this.token = this.parser.parse(this.requester.request());
50+
}
51+
} finally {
52+
lock.unlock();
53+
}
54+
return token.value();
55+
}
56+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import com.rabbitmq.client.amqp.UsernamePasswordCredentialsProvider;
21+
22+
final class UsernamePasswordCredentials implements Credentials {
23+
24+
private final UsernamePasswordCredentialsProvider provider;
25+
26+
UsernamePasswordCredentials(UsernamePasswordCredentialsProvider provider) {
27+
this.provider = provider;
28+
}
29+
30+
@Override
31+
public void configure(ConnectionCallback callback) {
32+
callback.username(this.provider.getUsername()).password(this.provider.getPassword());
33+
}
34+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.oauth;
19+
20+
import com.google.gson.Gson;
21+
import com.google.gson.reflect.TypeToken;
22+
import java.util.Map;
23+
24+
public class GsonTokenParser implements TokenParser {
25+
26+
private static final Gson GSON = new Gson();
27+
private static final TypeToken<Map<String, Object>> MAP_TYPE = new TypeToken<>() {};
28+
29+
@Override
30+
public Token parse(String tokenAsString) {
31+
Map<String, Object> tokenAsMap = GSON.fromJson(tokenAsString, MAP_TYPE);
32+
String accessToken = (String) tokenAsMap.get("access_token");
33+
// in seconds, see https://www.rfc-editor.org/rfc/rfc6749#section-5.1
34+
long expiresIn = ((Number) tokenAsMap.get("expires_in")).longValue();
35+
long expirationTime = System.currentTimeMillis() + expiresIn * 1_000;
36+
return new DefaultTokenInfo(accessToken, expirationTime);
37+
}
38+
39+
private static final class DefaultTokenInfo implements Token {
40+
41+
private final String value;
42+
private final long expirationTime;
43+
44+
private DefaultTokenInfo(String value, long expirationTime) {
45+
this.value = value;
46+
this.expirationTime = expirationTime;
47+
}
48+
49+
@Override
50+
public String value() {
51+
return this.value;
52+
}
53+
54+
@Override
55+
public long expirationTime() {
56+
return this.expirationTime;
57+
}
58+
}
59+
}

0 commit comments

Comments
 (0)