Skip to content

Commit df6365f

Browse files
committed
Test connection is closed when OAuth token expires
1 parent 6ab0976 commit df6365f

File tree

9 files changed

+96
-21
lines changed

9 files changed

+96
-21
lines changed

.github/workflows/test-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
- name: Start broker
2626
run: ci/start-broker.sh
2727
env:
28-
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-token-renew'
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:main'
2929
- name: Start toxiproxy
3030
run: ci/start-toxiproxy.sh
3131
- name: Display Java version

ci/start-broker.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env bash
22

3-
RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:amqp-token-renew}
3+
RABBITMQ_IMAGE=${RABBITMQ_IMAGE:-pivotalrabbitmq/rabbitmq:main}
44

55
wait_for_message() {
66
while ! docker logs "$1" | grep -q "$2";

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
331331
"Not recovering connection '{}' for error {}",
332332
this.name(),
333333
event.failureCause().getMessage());
334+
close(ExceptionUtils.convert(ioex));
334335
}
335336
};
336337

@@ -730,10 +731,10 @@ private void close(Throwable cause) {
730731
rpcServer.close();
731732
}
732733
for (AmqpPublisher publisher : this.publishers) {
733-
publisher.close();
734+
publisher.close(cause);
734735
}
735736
for (AmqpConsumer consumer : this.consumers) {
736-
consumer.close();
737+
consumer.close(cause);
737738
}
738739
try {
739740
this.dispatchingExecutorService.shutdownNow();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ void recoverAfterConnectionFailure() {
328328
}
329329
}
330330

331-
private void close(Throwable cause) {
331+
void close(Throwable cause) {
332332
if (this.closed.compareAndSet(false, true)) {
333333
this.state(CLOSING, cause);
334334
this.connection.removeConsumer(this);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ private Sender createSender(
205205
}
206206
}
207207

208-
private void close(Throwable cause) {
208+
void close(Throwable cause) {
209209
if (this.closed.compareAndSet(false, true)) {
210210
this.state(State.CLOSING, cause);
211211
this.connection.removePublisher(this);

src/test/java/com/rabbitmq/client/amqp/impl/AuthorizationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class AuthorizationTest {
4141
String name;
4242

4343
@BeforeAll
44-
static void initAll() throws Exception {
44+
static void initAll() {
4545
addVhost(VH);
4646
addUser(USERNAME, PASSWORD);
4747
setPermissions(USERNAME, VH, "^amqp.*$");

src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
2121
import static com.rabbitmq.client.amqp.impl.Cli.*;
2222
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
23+
import static com.rabbitmq.client.amqp.impl.TestUtils.closedOnSecurityExceptionListener;
2324
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
2425
import static org.assertj.core.api.Assertions.assertThat;
2526
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -152,13 +153,4 @@ void sessionShouldGetClosedAfterPermissionsChangedAndSetTokenCalled(
152153
deleteUser(username);
153154
}
154155
}
155-
156-
private static Resource.StateListener closedOnSecurityExceptionListener(Sync sync) {
157-
return context -> {
158-
if (context.currentState() == Resource.State.CLOSED
159-
&& context.failureCause() instanceof AmqpException.AmqpSecurityException) {
160-
sync.down();
161-
}
162-
};
163-
}
164156
}

src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
21+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
22+
import static java.lang.System.currentTimeMillis;
23+
import static java.time.Duration.ofMillis;
24+
import static java.time.Duration.ofSeconds;
2025
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2126

22-
import com.rabbitmq.client.amqp.AmqpException;
23-
import com.rabbitmq.client.amqp.Connection;
24-
import com.rabbitmq.client.amqp.Environment;
27+
import com.rabbitmq.client.amqp.*;
28+
import com.rabbitmq.client.amqp.impl.TestUtils.DisabledIfOauth2AuthBackendNotEnabled;
29+
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
2530
import java.time.Duration;
2631
import java.util.List;
2732
import org.jose4j.base64url.Base64;
@@ -32,8 +37,10 @@
3237
import org.jose4j.keys.HmacKey;
3338
import org.jose4j.lang.JoseException;
3439
import org.junit.jupiter.api.Test;
40+
import org.junit.jupiter.api.TestInfo;
3541

3642
@AmqpTestInfrastructure
43+
@DisabledIfOauth2AuthBackendNotEnabled
3744
public class Oauth2Test {
3845

3946
private static final String BASE64_KEY = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH";
@@ -42,19 +49,67 @@ public class Oauth2Test {
4249

4350
@Test
4451
void expiredTokenShouldFail() throws Exception {
45-
String expiredToken = token(System.currentTimeMillis() - 1000);
52+
String expiredToken = token(currentTimeMillis() - 1000);
4653
assertThatThrownBy(
4754
() -> environment.connectionBuilder().username("").password(expiredToken).build())
4855
.isInstanceOf(AmqpException.AmqpSecurityException.class);
4956
}
5057

5158
@Test
5259
void validTokenShouldSucceed() throws Exception {
53-
String validToken = token(System.currentTimeMillis() + Duration.ofMinutes(10).toMillis());
60+
String validToken = token(currentTimeMillis() + Duration.ofMinutes(10).toMillis());
5461
try (Connection ignored =
5562
environment.connectionBuilder().username("").password(validToken).build()) {}
5663
}
5764

65+
@Test
66+
void connectionShouldBeClosedWhenTokenExpires(TestInfo info) throws Exception {
67+
String q = TestUtils.name(info);
68+
long expiry = currentTimeMillis() + ofSeconds(2).toMillis();
69+
String token = token(expiry);
70+
Sync connectionClosedSync = sync();
71+
Connection c =
72+
environment
73+
.connectionBuilder()
74+
.username("")
75+
.password(token)
76+
.listeners(closedOnSecurityExceptionListener(connectionClosedSync))
77+
.build();
78+
c.management().queue(q).exclusive(true).declare();
79+
Sync publisherClosedSync = sync();
80+
Publisher p =
81+
c.publisherBuilder()
82+
.queue(q)
83+
.listeners(closedOnSecurityExceptionListener(publisherClosedSync))
84+
.build();
85+
Sync consumeSync = sync();
86+
Sync consumerClosedSync = sync();
87+
c.consumerBuilder()
88+
.queue(q)
89+
.messageHandler(
90+
(ctx, msg) -> {
91+
ctx.accept();
92+
consumeSync.down();
93+
})
94+
.listeners(closedOnSecurityExceptionListener(consumerClosedSync))
95+
.build();
96+
p.publish(p.message(), ctx -> {});
97+
assertThat(consumeSync).completes();
98+
long newExpiry = currentTimeMillis() + ofSeconds(3).toMillis();
99+
token = token(newExpiry);
100+
((AmqpManagement) c.management()).setToken(token);
101+
// wait for the first token to expire
102+
waitAtMost(() -> currentTimeMillis() > expiry + ofMillis(500).toMillis());
103+
// the connection should still work thanks to the new token
104+
consumeSync.reset();
105+
p.publish(p.message(), ctx -> {});
106+
assertThat(consumeSync).completes();
107+
waitAtMost(() -> currentTimeMillis() > newExpiry);
108+
assertThat(connectionClosedSync).completes();
109+
assertThat(publisherClosedSync).completes();
110+
assertThat(consumerClosedSync).completes();
111+
}
112+
58113
private static String token(long expirationTime) throws JoseException {
59114
JwtClaims claims = new JwtClaims();
60115
claims.setIssuer("unit_test");

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static java.util.Collections.singletonMap;
2222
import static org.assertj.core.api.Assertions.fail;
2323

24+
import com.rabbitmq.client.amqp.AmqpException;
25+
import com.rabbitmq.client.amqp.Resource;
2426
import eu.rekawek.toxiproxy.Proxy;
2527
import eu.rekawek.toxiproxy.ToxiproxyClient;
2628
import java.io.IOException;
@@ -83,6 +85,15 @@ static <T> T waitUntilStable(Supplier<T> call, Duration waitTime) {
8385
return null;
8486
}
8587

88+
static Resource.StateListener closedOnSecurityExceptionListener(Sync sync) {
89+
return context -> {
90+
if (context.currentState() == Resource.State.CLOSED
91+
&& context.failureCause() instanceof AmqpException.AmqpSecurityException) {
92+
sync.down();
93+
}
94+
};
95+
}
96+
8697
@FunctionalInterface
8798
public interface CallableBooleanSupplier {
8899
boolean getAsBoolean() throws Exception;
@@ -404,6 +415,16 @@ private static class DisabledIfAuthMechanismSslNotEnabledCondition
404415
}
405416
}
406417

418+
private static class DisabledIfOauth2AuthBackendNotEnabledCondition
419+
extends DisabledIfPluginNotEnabledCondition {
420+
421+
DisabledIfOauth2AuthBackendNotEnabledCondition() {
422+
super(
423+
"OAuth2 authentication backend",
424+
output -> output.contains("rabbitmq_auth_backend_oauth2"));
425+
}
426+
}
427+
407428
static class DisabledIfNotClusterCondition implements ExecutionCondition {
408429

409430
private static final String KEY = "isCluster";
@@ -463,6 +484,12 @@ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext con
463484
@ExtendWith(DisabledIfAuthMechanismSslNotEnabledCondition.class)
464485
@interface DisabledIfAuthMechanismSslNotEnabled {}
465486

487+
@Target({ElementType.TYPE, ElementType.METHOD})
488+
@Retention(RetentionPolicy.RUNTIME)
489+
@Documented
490+
@ExtendWith(DisabledIfOauth2AuthBackendNotEnabledCondition.class)
491+
@interface DisabledIfOauth2AuthBackendNotEnabled {}
492+
466493
@Target({ElementType.TYPE, ElementType.METHOD})
467494
@Retention(RetentionPolicy.RUNTIME)
468495
@Documented

0 commit comments

Comments
 (0)