1- // Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
1+ // Copyright (c) 2007-2020 Pivotal Software, Inc. All rights reserved.
22//
33// This software, the RabbitMQ Java client library, is triple-licensed under the
44// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
@@ -84,9 +84,16 @@ public class RpcClient {
8484
8585 /** Map from request correlation ID to continuation BlockingCell */
8686 private final Map <String , BlockingCell <Object >> _continuationMap = new HashMap <String , BlockingCell <Object >>();
87- /** Contains the most recently-used request correlation ID */
87+
88+ /**
89+ * Generates correlation ID for each request.
90+ *
91+ * @since 5.9.0
92+ */
8893 private final Supplier <String > _correlationIdGenerator ;
8994
95+ private String lastCorrelationId = "0" ;
96+
9097 /** Consumer attached to our reply queue */
9198 private DefaultConsumer _consumer ;
9299
@@ -110,7 +117,7 @@ public RpcClient(RpcClientParams params) throws
110117 _timeout = params .getTimeout ();
111118 _useMandatory = params .shouldUseMandatory ();
112119 _replyHandler = params .getReplyHandler ();
113- _correlationIdGenerator = params .getCorrelationIdGenerator ();
120+ _correlationIdGenerator = params .getCorrelationIdSupplier ();
114121
115122 _consumer = setupConsumer ();
116123 if (_useMandatory ) {
@@ -295,6 +302,7 @@ public Response doCall(AMQP.BasicProperties props, byte[] message, int timeout)
295302 String replyId ;
296303 synchronized (_continuationMap ) {
297304 replyId = _correlationIdGenerator .get ();
305+ lastCorrelationId = replyId ;
298306 props = ((props ==null ) ? new AMQP .BasicProperties .Builder () : props .builder ())
299307 .correlationId (replyId ).replyTo (_replyTo ).build ();
300308 _continuationMap .put (replyId , k );
@@ -475,16 +483,21 @@ public Map<String, BlockingCell<Object>> getContinuationMap() {
475483 }
476484
477485 /**
478- * Retrieve the correlation id.
486+ * Retrieve the last correlation id used.
487+ * <p>
488+ * Note as of 5.9.0, correlation IDs may not always be integers
489+ * (by default, they are).
490+ * This method will try to parse the last correlation ID string
491+ * as an integer, so this may result in {@link NumberFormatException}
492+ * if the correlation ID supplier provided by
493+ * {@link RpcClientParams#correlationIdSupplier(Supplier)}
494+ * does not generate appropriate IDs.
495+ *
479496 * @return the most recently used correlation id
480- * @deprecated Only works for {@link IncrementingCorrelationIdGenerator}
497+ * @see RpcClientParams#correlationIdSupplier(Supplier)
481498 */
482499 public int getCorrelationId () {
483- if (_correlationIdGenerator instanceof IncrementingCorrelationIdGenerator ) {
484- return ((IncrementingCorrelationIdGenerator ) _correlationIdGenerator ).getCorrelationId ();
485- } else {
486- throw new UnsupportedOperationException ();
487- }
500+ return Integer .valueOf (this .lastCorrelationId );
488501 }
489502
490503 /**
@@ -532,5 +545,47 @@ public byte[] getBody() {
532545 return body ;
533546 }
534547 }
548+
549+ /**
550+ * Creates generation IDs as a sequence of integers.
551+ *
552+ * @return
553+ * @see RpcClientParams#correlationIdSupplier(Supplier)
554+ * @since 5.9.0
555+ */
556+ public static Supplier <String > incrementingCorrelationIdSupplier () {
557+ return incrementingCorrelationIdSupplier ("" );
558+ }
559+
560+ /**
561+ * Creates generation IDs as a sequence of integers, with the provided prefix.
562+ *
563+ * @param prefix
564+ * @return
565+ * @see RpcClientParams#correlationIdSupplier(Supplier)
566+ * @since 5.9.0
567+ */
568+ public static Supplier <String > incrementingCorrelationIdSupplier (String prefix ) {
569+ return new IncrementingCorrelationIdSupplier (prefix );
570+ }
571+
572+ /**
573+ * @since 5.9.0
574+ */
575+ private static class IncrementingCorrelationIdSupplier implements Supplier <String > {
576+
577+ private final String prefix ;
578+ private int correlationId ;
579+
580+ public IncrementingCorrelationIdSupplier (String prefix ) {
581+ this .prefix = prefix ;
582+ }
583+
584+ @ Override
585+ public String get () {
586+ return prefix + ++correlationId ;
587+ }
588+
589+ }
535590}
536591
0 commit comments