1919
2020import static com .rabbitmq .client .amqp .ConnectionSettings .SASL_MECHANISM_PLAIN ;
2121import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
22+ import static com .rabbitmq .client .amqp .impl .HttpTestUtils .startHttpServer ;
23+ import static com .rabbitmq .client .amqp .impl .JwtTestUtils .*;
2224import static com .rabbitmq .client .amqp .impl .TestConditions .BrokerVersion .RABBITMQ_4_1_0 ;
2325import static com .rabbitmq .client .amqp .impl .TestUtils .*;
2426import static java .lang .System .currentTimeMillis ;
3234import com .rabbitmq .client .amqp .impl .TestUtils .DisabledIfOauth2AuthBackendNotEnabled ;
3335import com .rabbitmq .client .amqp .impl .TestUtils .Sync ;
3436import com .rabbitmq .client .amqp .oauth .HttpTokenRequester ;
35- import com .rabbitmq .client .amqp .oauth .Token ;
3637import com .rabbitmq .client .amqp .oauth .TokenParser ;
38+ import com .rabbitmq .client .amqp .oauth .TokenRequester ;
3739import com .sun .net .httpserver .Headers ;
40+ import com .sun .net .httpserver .HttpHandler ;
3841import com .sun .net .httpserver .HttpServer ;
3942import java .io .OutputStream ;
4043import java .time .Duration ;
41- import java .util .Base64 ;
4244import java .util .Collections ;
43- import java .util .List ;
44- import org .jose4j .jws .AlgorithmIdentifiers ;
45- import org .jose4j .jws .JsonWebSignature ;
46- import org .jose4j .jwt .JwtClaims ;
47- import org .jose4j .jwt .NumericDate ;
48- import org .jose4j .jwt .consumer .JwtConsumer ;
49- import org .jose4j .jwt .consumer .JwtConsumerBuilder ;
50- import org .jose4j .keys .HmacKey ;
45+ import java .util .function .LongSupplier ;
5146import org .jose4j .lang .JoseException ;
5247import org .junit .jupiter .api .AfterEach ;
5348import org .junit .jupiter .api .Test ;
5752@ DisabledIfOauth2AuthBackendNotEnabled
5853public class Oauth2Test {
5954
60- private static final String BASE64_KEY = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGH" ;
61- private static final HmacKey KEY = new HmacKey (Base64 .getDecoder ().decode (BASE64_KEY ));
6255 Environment environment ;
6356 HttpServer server ;
6457
65- private static String token (long expirationTime ) {
66- JwtClaims claims = new JwtClaims ();
67- claims .setIssuer ("unit_test" );
68- claims .setAudience ("rabbitmq" );
69- claims .setExpirationTime (NumericDate .fromMilliseconds (expirationTime ));
70- claims .setStringListClaim (
71- "scope" , List .of ("rabbitmq.configure:*/*" , "rabbitmq.write:*/*" , "rabbitmq.read:*/*" ));
72-
73- JsonWebSignature signature = new JsonWebSignature ();
74-
75- signature .setKeyIdHeaderValue ("token-key" );
76- signature .setAlgorithmHeaderValue (AlgorithmIdentifiers .HMAC_SHA256 );
77- signature .setKey (KEY );
78- signature .setPayload (claims .toJson ());
79- try {
80- return signature .getCompactSerialization ();
81- } catch (JoseException e ) {
82- throw new RuntimeException (e );
83- }
84- }
85-
8658 @ AfterEach
8759 void tearDown () {
8860 if (this .server != null ) {
@@ -108,7 +80,7 @@ void validTokenShouldSucceed() {
10880 @ Test
10981 @ BrokerVersionAtLeast (RABBITMQ_4_1_0 )
11082 void connectionShouldBeClosedWhenTokenExpires (TestInfo info ) throws JoseException {
111- String q = TestUtils . name (info );
83+ String q = name (info );
11284 long expiry = currentTimeMillis () + ofSeconds (2 ).toMillis ();
11385 String token = token (expiry );
11486 Sync connectionClosedSync = sync ();
@@ -156,63 +128,19 @@ void connectionShouldBeClosedWhenTokenExpires(TestInfo info) throws JoseExceptio
156128
157129 @ Test
158130 @ BrokerVersionAtLeast (RABBITMQ_4_1_0 )
159- void httpClientProviderShouldGetToken (TestInfo info ) throws Exception {
160- String q = TestUtils .name (info );
161-
131+ void openingConnectionWithHttpValidTokenShouldWork (TestInfo info ) throws Exception {
132+ String q = name (info );
162133 int port = randomNetworkPort ();
163134 String contextPath = "/uaa/oauth/token" ;
164135
165136 this .server =
166- HttpTestUtils .startHttpServer (
167- port ,
168- contextPath ,
169- exchange -> {
170- long expirationTime = currentTimeMillis () + 60_000 ;
171- byte [] data = token (expirationTime ).getBytes (UTF_8 );
172- Headers responseHeaders = exchange .getResponseHeaders ();
173- responseHeaders .set ("content-type" , "application/json" );
174- exchange .sendResponseHeaders (200 , data .length );
175- OutputStream responseBody = exchange .getResponseBody ();
176- responseBody .write (data );
177- responseBody .close ();
178- });
179-
180- HttpTokenRequester tokenRequester =
181- new HttpTokenRequester (
182- "http://localhost:" + port + contextPath ,
183- "" ,
184- "" ,
185- "" ,
186- Collections .emptyMap (),
187- null ,
188- null ,
189- null ,
190- null );
137+ startHttpServer (port , contextPath , tokenHttpHandler (() -> currentTimeMillis () + 60_000 ));
138+
139+ TokenRequester tokenRequester = httpTokenRequester ("http://localhost:" + port + contextPath );
191140 TokenParser tokenParser =
192- tokenAsString -> {
193- long expirationTime ;
194- try {
195- JwtConsumer consumer =
196- new JwtConsumerBuilder ()
197- .setExpectedAudience ("rabbitmq" )
198- .setVerificationKey (KEY )
199- .build ();
200- JwtClaims claims = consumer .processToClaims (tokenAsString );
201- expirationTime = claims .getExpirationTime ().getValueInMillis ();
202- } catch (Exception e ) {
203- throw new RuntimeException (e );
204- }
205- return new Token () {
206- @ Override
207- public String value () {
208- return tokenAsString ;
209- }
210-
211- @ Override
212- public long expirationTime () {
213- return expirationTime ;
214- }
215- };
141+ json -> {
142+ String compact = parse (json ).get ("value" ).toString ();
143+ return parseToken (compact );
216144 };
217145
218146 CredentialsProvider credentialsProvider =
@@ -224,5 +152,63 @@ public long expirationTime() {
224152 .credentialsProvider (credentialsProvider )
225153 .build ();
226154 c .management ().queue (q ).exclusive (true ).declare ();
155+ Publisher publisher = c .publisherBuilder ().queue (q ).build ();
156+ Sync consumeSync = TestUtils .sync ();
157+ c .consumerBuilder ()
158+ .queue (q )
159+ .messageHandler (
160+ (ctx , msg ) -> {
161+ ctx .accept ();
162+ consumeSync .down ();
163+ })
164+ .build ();
165+ publisher .publish (publisher .message (), ctx -> {});
166+ assertThat (consumeSync ).completes ();
167+ }
168+
169+ @ Test
170+ @ BrokerVersionAtLeast (RABBITMQ_4_1_0 )
171+ void openingConnectionWithHttpExpiredTokenShouldFail () throws Exception {
172+ int port = randomNetworkPort ();
173+ String contextPath = "/uaa/oauth/token" ;
174+
175+ this .server =
176+ startHttpServer (port , contextPath , tokenHttpHandler (() -> currentTimeMillis () - 60_000 ));
177+
178+ TokenRequester tokenRequester = httpTokenRequester ("http://localhost:" + port + contextPath );
179+ TokenParser tokenParser =
180+ json -> {
181+ String compact = parse (json ).get ("value" ).toString ();
182+ return parseToken (compact );
183+ };
184+
185+ CredentialsProvider credentialsProvider =
186+ new OAuthCredentialsProvider (tokenRequester , tokenParser );
187+ assertThatThrownBy (
188+ () ->
189+ environment
190+ .connectionBuilder ()
191+ .saslMechanism (SASL_MECHANISM_PLAIN )
192+ .credentialsProvider (credentialsProvider )
193+ .build ())
194+ .isInstanceOf (AmqpException .AmqpSecurityException .class );
195+ }
196+
197+ private static HttpHandler tokenHttpHandler (LongSupplier expirationTimeSupplier ) {
198+ return exchange -> {
199+ long expirationTime = expirationTimeSupplier .getAsLong ();
200+ String token = token (expirationTime );
201+ byte [] data = String .format ("{ 'value': '%s' }" , token ).replace ('\'' , '"' ).getBytes (UTF_8 );
202+ Headers responseHeaders = exchange .getResponseHeaders ();
203+ responseHeaders .set ("content-type" , "application/json" );
204+ exchange .sendResponseHeaders (200 , data .length );
205+ OutputStream responseBody = exchange .getResponseBody ();
206+ responseBody .write (data );
207+ responseBody .close ();
208+ };
209+ }
210+
211+ private static TokenRequester httpTokenRequester (String uri ) {
212+ return new HttpTokenRequester (uri , "" , "" , "" , Collections .emptyMap (), null , null , null , null );
227213 }
228214}
0 commit comments