Skip to content

Commit e58fca1

Browse files
committed
Send update.secret extension when scheduled
[#167029587] (cherry picked from commit 136b3ea)
1 parent c5eafdc commit e58fca1

File tree

5 files changed

+147
-32
lines changed

5 files changed

+147
-32
lines changed

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -438,8 +438,10 @@ public void start()
438438
}
439439
String refreshedPassword = credentialsProvider.getPassword();
440440

441-
// TODO send password to server with update-secret extension, using channel 0
442-
441+
AMQImpl.Connection.UpdateSecret updateSecret = new AMQImpl.Connection.UpdateSecret(
442+
LongStringHelper.asLongString(refreshedPassword), "Refresh scheduled by client"
443+
);
444+
_channel0.rpc(updateSecret);
443445
return true;
444446
});
445447

src/test/java/com/rabbitmq/client/impl/DefaultCredentialsRefreshServiceTest.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18+
import com.rabbitmq.client.Channel;
19+
import com.rabbitmq.client.Connection;
20+
import com.rabbitmq.client.ConnectionFactory;
21+
import com.rabbitmq.client.test.TestUtils;
1822
import org.junit.After;
1923
import org.junit.Test;
2024
import org.junit.runner.RunWith;
2125
import org.mockito.Mock;
2226
import org.mockito.junit.MockitoJUnitRunner;
2327
import org.mockito.stubbing.Answer;
2428

29+
import java.io.IOException;
2530
import java.time.Duration;
2631
import java.util.List;
2732
import java.util.concurrent.Callable;
@@ -32,8 +37,7 @@
3237
import java.util.function.Function;
3338
import java.util.stream.IntStream;
3439

35-
import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.fixedDelayBeforeExpirationRefreshDelayStrategy;
36-
import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.fixedTimeNeedRefreshStrategy;
40+
import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.*;
3741
import static java.time.Duration.ofSeconds;
3842
import static org.assertj.core.api.Assertions.assertThat;
3943
import static org.mockito.Mockito.*;
@@ -56,6 +60,55 @@ public void tearDown() {
5660
}
5761
}
5862

63+
@Test public void renew() {
64+
ConnectionFactory cf = new ConnectionFactory();
65+
OAuth2ClientCredentialsGrantCredentialsProvider provider = new OAuth2ClientCredentialsGrantCredentialsProvider.OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
66+
.tokenEndpointUri("http://localhost:" + 8080 + "/uaa/oauth/token")
67+
.clientId("rabbit_client").clientSecret("rabbit_secret")
68+
.grantType("password")
69+
.parameter("username", "rabbit_super")
70+
.parameter("password", "rabbit_super")
71+
.build();
72+
cf.setCredentialsProvider(provider);
73+
74+
75+
}
76+
77+
@Test public void connect() throws Exception {
78+
ConnectionFactory cf = new ConnectionFactory();
79+
OAuth2ClientCredentialsGrantCredentialsProvider provider = new OAuth2ClientCredentialsGrantCredentialsProvider.OAuth2ClientCredentialsGrantCredentialsProviderBuilder()
80+
.tokenEndpointUri("http://localhost:" + 8080 + "/uaa/oauth/token")
81+
.clientId("rabbit_client").clientSecret("rabbit_secret")
82+
.grantType("password")
83+
.parameter("username", "rabbit_super")
84+
.parameter("password", "rabbit_super")
85+
.build();
86+
cf.setCredentialsProvider(provider);
87+
refreshService = new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder()
88+
// .refreshDelayStrategy(ttl -> Duration.ofSeconds(60))
89+
.refreshDelayStrategy(ratioRefreshDelayStrategy(0.8))
90+
.needRefreshStrategy(expiration -> false)
91+
.build();
92+
cf.setCredentialsRefreshService(refreshService);
93+
try (Connection c = cf.newConnection()) {
94+
95+
while (true) {
96+
try {
97+
Channel ch = c.createChannel();
98+
String queue = ch.queueDeclare().getQueue();
99+
TestUtils.sendAndConsumeMessage("", queue, queue, c);
100+
System.out.println("Message sent and consumed");
101+
ch.close();
102+
} catch (IOException e) {
103+
System.out.println(e.getCause().getMessage());
104+
}
105+
Thread.sleep(10_000L);
106+
}
107+
}
108+
109+
110+
}
111+
59112
@Test
60113
public void scheduling() throws Exception {
61114
refreshService = new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder()

src/test/java/com/rabbitmq/client/test/RefreshCredentialsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import com.rabbitmq.client.impl.DefaultCredentialsRefreshService;
2222
import com.rabbitmq.client.impl.RefreshProtectedCredentialsProvider;
2323
import org.junit.Before;
24+
import org.junit.ClassRule;
2425
import org.junit.Test;
26+
import org.junit.rules.TestRule;
2527

2628
import java.time.Duration;
2729
import java.time.Instant;
@@ -33,6 +35,8 @@
3335

3436
public class RefreshCredentialsTest {
3537

38+
@ClassRule
39+
public static TestRule brokerVersionTestRule = TestUtils.atLeast38();
3640
DefaultCredentialsRefreshService refreshService;
3741

3842
@Before

src/test/java/com/rabbitmq/client/test/TestUtils.java

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+
// Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -15,19 +15,14 @@
1515

1616
package com.rabbitmq.client.test;
1717

18-
import com.rabbitmq.client.AMQP;
19-
import com.rabbitmq.client.Channel;
20-
import com.rabbitmq.client.Connection;
21-
import com.rabbitmq.client.ConnectionFactory;
22-
import com.rabbitmq.client.DefaultConsumer;
23-
import com.rabbitmq.client.Envelope;
24-
import com.rabbitmq.client.Recoverable;
25-
import com.rabbitmq.client.RecoverableConnection;
26-
import com.rabbitmq.client.RecoveryListener;
27-
import com.rabbitmq.client.ShutdownSignalException;
18+
import com.rabbitmq.client.*;
2819
import com.rabbitmq.client.impl.NetworkConnection;
2920
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
3021
import com.rabbitmq.tools.Host;
22+
import org.junit.AssumptionViolatedException;
23+
import org.junit.rules.TestRule;
24+
import org.junit.runner.Description;
25+
import org.junit.runners.model.Statement;
3126
import org.slf4j.LoggerFactory;
3227

3328
import javax.net.ssl.SSLContext;
@@ -46,7 +41,7 @@
4641

4742
public class TestUtils {
4843

49-
public static final boolean USE_NIO = System.getProperty("use.nio") == null ? false : true;
44+
public static final boolean USE_NIO = System.getProperty("use.nio") != null;
5045

5146
public static ConnectionFactory connectionFactory() {
5247
ConnectionFactory connectionFactory = new ConnectionFactory();
@@ -79,7 +74,7 @@ public static SSLContext getSSLContext() throws NoSuchAlgorithmException {
7974

8075
// pick the first protocol available, preferring TLSv1.2, then TLSv1,
8176
// falling back to SSLv3 if running on an ancient/crippled JDK
82-
for(String proto : Arrays.asList("TLSv1.2", "TLSv1", "SSLv3")) {
77+
for (String proto : Arrays.asList("TLSv1.2", "TLSv1", "SSLv3")) {
8378
try {
8479
c = SSLContext.getInstance(proto);
8580
return c;
@@ -90,31 +85,49 @@ public static SSLContext getSSLContext() throws NoSuchAlgorithmException {
9085
throw new NoSuchAlgorithmException();
9186
}
9287

88+
public static TestRule atLeast38() {
89+
return new BrokerVersionTestRule("3.8.0");
90+
}
91+
9392
public static boolean isVersion37orLater(Connection connection) {
93+
return atLeastVersion("3.7.0", connection);
94+
}
95+
96+
public static boolean isVersion38orLater(Connection connection) {
97+
return atLeastVersion("3.8.0", connection);
98+
}
99+
100+
private static boolean atLeastVersion(String expectedVersion, Connection connection) {
94101
String currentVersion = null;
95102
try {
96-
currentVersion = connection.getServerProperties().get("version").toString();
97-
// versions built from source: 3.7.0+rc.1.4.gedc5d96
98-
if (currentVersion.contains("+")) {
99-
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
100-
}
101-
// alpha (snapshot) versions: 3.7.0~alpha.449-1
102-
if (currentVersion.contains("~")) {
103-
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
104-
}
105-
// alpha (snapshot) versions: 3.7.1-alpha.40
106-
if (currentVersion.contains("-")) {
107-
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
108-
}
109-
return "0.0.0".equals(currentVersion) || versionCompare(currentVersion, "3.7.0") >= 0;
103+
currentVersion = currentVersion(
104+
connection.getServerProperties().get("version").toString()
105+
);
106+
return "0.0.0".equals(currentVersion) || versionCompare(currentVersion, expectedVersion) >= 0;
110107
} catch (RuntimeException e) {
111108
LoggerFactory.getLogger(TestUtils.class).warn("Unable to parse broker version {}", currentVersion, e);
112109
throw e;
113110
}
114111
}
115112

113+
private static String currentVersion(String currentVersion) {
114+
// versions built from source: 3.7.0+rc.1.4.gedc5d96
115+
if (currentVersion.contains("+")) {
116+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
117+
}
118+
// alpha (snapshot) versions: 3.7.0~alpha.449-1
119+
if (currentVersion.contains("~")) {
120+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
121+
}
122+
// alpha (snapshot) versions: 3.7.1-alpha.40
123+
if (currentVersion.contains("-")) {
124+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
125+
}
126+
return currentVersion;
127+
}
128+
116129
public static boolean sendAndConsumeMessage(String exchange, String routingKey, String queue, Connection c)
117-
throws IOException, TimeoutException, InterruptedException {
130+
throws IOException, TimeoutException, InterruptedException {
118131
Channel ch = c.createChannel();
119132
try {
120133
ch.confirmSelect();
@@ -249,4 +262,28 @@ public static int randomNetworkPort() throws IOException {
249262
socket.close();
250263
return port;
251264
}
265+
266+
private static class BrokerVersionTestRule implements TestRule {
267+
268+
private final String version;
269+
270+
public BrokerVersionTestRule(String version) {
271+
this.version = version;
272+
}
273+
274+
@Override
275+
public Statement apply(Statement base, Description description) {
276+
return new Statement() {
277+
@Override
278+
public void evaluate() throws Throwable {
279+
try (Connection c = TestUtils.connectionFactory().newConnection()) {
280+
if (!TestUtils.atLeastVersion(version, c)) {
281+
throw new AssumptionViolatedException("Broker version < " + version + ", skipping.");
282+
}
283+
}
284+
base.evaluate();
285+
}
286+
};
287+
}
288+
}
252289
}

src/test/java/com/rabbitmq/client/test/TestUtilsTest.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,23 @@ public void isVersion37orLater() {
4343
serverProperties.put("version", "3.7.1-alpha.40");
4444
assertThat(TestUtils.isVersion37orLater(connection), is(true));
4545
}
46+
47+
@Test
48+
public void isVersion38orLater() {
49+
Map<String, Object> serverProperties = new HashMap<>();
50+
Connection connection = mock(Connection.class);
51+
when(connection.getServerProperties()).thenReturn(serverProperties);
52+
53+
serverProperties.put("version", "3.7.0+rc.1.4.gedc5d96");
54+
assertThat(TestUtils.isVersion38orLater(connection), is(false));
55+
56+
serverProperties.put("version", "3.7.0~alpha.449-1");
57+
assertThat(TestUtils.isVersion38orLater(connection), is(false));
58+
59+
serverProperties.put("version", "3.7.1-alpha.40");
60+
assertThat(TestUtils.isVersion38orLater(connection), is(false));
61+
62+
serverProperties.put("version", "3.8.0+beta.4.38.g33a7f97");
63+
assertThat(TestUtils.isVersion38orLater(connection), is(true));
64+
}
4665
}

0 commit comments

Comments
 (0)