Skip to content

Commit 49e2d01

Browse files
committed
Support token renewal
For RabbitMQ 4.1. See rabbitmq/rabbitmq-server#12599
1 parent 9b7ea0a commit 49e2d01

File tree

4 files changed

+95
-5
lines changed

4 files changed

+95
-5
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27+
env:
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:amqp-token-renew'
2729
- name: Start toxiproxy
2830
run: ci/start-toxiproxy.sh
2931
- name: Display Java version

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.rabbitmq.client.amqp.AmqpException;
2424
import com.rabbitmq.client.amqp.Management;
25+
import java.nio.charset.StandardCharsets;
2526
import java.time.Duration;
2627
import java.util.*;
2728
import java.util.concurrent.*;
@@ -38,6 +39,7 @@
3839
import org.apache.qpid.protonj2.client.exceptions.ClientException;
3940
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
4041
import org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException;
42+
import org.apache.qpid.protonj2.types.Binary;
4143
import org.slf4j.Logger;
4244
import org.slf4j.LoggerFactory;
4345

@@ -169,6 +171,24 @@ public UnbindSpecification unbind() {
169171
return new AmqpBindingManagement.AmqpUnbindSpecification(this);
170172
}
171173

174+
void setToken(String token) {
175+
checkAvailable();
176+
UUID requestId = messageId();
177+
try {
178+
Message<?> request =
179+
Message.create(new Binary(token.getBytes(StandardCharsets.UTF_8)))
180+
.to("/auth/tokens")
181+
.subject("PUT");
182+
183+
OutstandingRequest outstandingRequest = this.request(request, requestId);
184+
outstandingRequest.block();
185+
186+
checkResponse(outstandingRequest, requestId, 204);
187+
} catch (ClientException e) {
188+
throw new AmqpException("Error on set-token operation", e);
189+
}
190+
}
191+
172192
@Override
173193
public void close() {
174194
if (this.initializing) {

src/test/java/com/rabbitmq/client/amqp/impl/Cli.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,12 +182,11 @@ public static void addVhost(String vhost) {
182182
rabbitmqctl("add_vhost " + vhost);
183183
}
184184

185-
public static void addUser(String username, String password) throws IOException {
185+
public static void addUser(String username, String password) {
186186
rabbitmqctl(format("add_user %s %s", username, password));
187187
}
188188

189-
public static void setPermissions(String username, String vhost, String permission)
190-
throws IOException {
189+
public static void setPermissions(String username, String vhost, String permission) {
191190
setPermissions(username, vhost, asList(permission, permission, permission));
192191
}
193192

src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,26 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
21+
import static com.rabbitmq.client.amqp.impl.Cli.*;
22+
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
23+
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
2124
import static org.assertj.core.api.Assertions.assertThat;
2225
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2326

24-
import com.rabbitmq.client.amqp.AmqpException;
25-
import com.rabbitmq.client.amqp.Management;
27+
import com.rabbitmq.client.amqp.*;
28+
import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast;
29+
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
2630
import java.time.Duration;
2731
import java.util.concurrent.atomic.AtomicInteger;
2832
import java.util.function.Supplier;
2933
import org.junit.jupiter.api.*;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.ValueSource;
3036

3137
@AmqpTestInfrastructure
3238
public class ManagementTest {
3339

40+
Environment environment;
3441
AmqpConnection connection;
3542
AmqpManagement management;
3643

@@ -88,4 +95,66 @@ void receiveLoopShouldStopAfterBeingIdle() {
8895
assertThat(management.queueInfo(info1.name())).hasName(info1.name());
8996
assertThat(management.queueInfo(info2.name())).hasName(info2.name());
9097
}
98+
99+
@ParameterizedTest
100+
@ValueSource(booleans = {true, false})
101+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
102+
void setToken(boolean isolateResources, TestInfo info) {
103+
String username = "foo";
104+
String password = "bar";
105+
String vh = "/";
106+
String q = TestUtils.name(info);
107+
108+
Connection c = null;
109+
try {
110+
addUser(username, password);
111+
setPermissions(username, vh, ".*");
112+
this.connection.management().queue(q).declare();
113+
114+
c =
115+
((AmqpConnectionBuilder) environment.connectionBuilder())
116+
.isolateResources(isolateResources)
117+
.username(username)
118+
.password(password)
119+
.build();
120+
Sync consumeSync = sync();
121+
Sync publisherClosedSync = sync();
122+
Sync consumerClosedSync = sync();
123+
Publisher p =
124+
c.publisherBuilder().queue(q).listeners(closedListener(publisherClosedSync)).build();
125+
c.consumerBuilder()
126+
.queue(q)
127+
.messageHandler(
128+
(ctx, msg) -> {
129+
ctx.accept();
130+
consumeSync.down();
131+
})
132+
.listeners(closedListener(consumerClosedSync))
133+
.build();
134+
135+
p.publish(p.message(), ctx -> {});
136+
assertThat(consumeSync).completes();
137+
138+
setPermissions(username, vh, "foobar");
139+
AmqpManagement m = (AmqpManagement) c.management();
140+
m.setToken(password);
141+
assertThat(publisherClosedSync).completes();
142+
assertThat(consumerClosedSync).completes();
143+
} finally {
144+
if (c != null) {
145+
c.close();
146+
}
147+
this.connection.management().queueDeletion().delete(q);
148+
deleteUser(username);
149+
}
150+
}
151+
152+
private static Resource.StateListener closedListener(Sync sync) {
153+
return context -> {
154+
if (context.currentState() == Resource.State.CLOSED
155+
&& context.failureCause() instanceof AmqpException.AmqpSecurityException) {
156+
sync.down();
157+
}
158+
};
159+
}
91160
}

0 commit comments

Comments
 (0)