@@ -236,6 +236,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
236
236
private final MessageSerializer messageSerializer ;
237
237
private final Runnable onClientClose ;
238
238
private final ServiceBusSessionManager sessionManager ;
239
+ private final boolean isSessionEnabled ;
239
240
private final Semaphore completionLock = new Semaphore (1 );
240
241
private final String identifier ;
241
242
@@ -268,6 +269,12 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
268
269
this .instrumentation = Objects .requireNonNull (instrumentation , "'tracer' cannot be null" );
269
270
this .messageSerializer = Objects .requireNonNull (messageSerializer , "'messageSerializer' cannot be null." );
270
271
this .onClientClose = Objects .requireNonNull (onClientClose , "'onClientClose' cannot be null." );
272
+ this .sessionManager = null ;
273
+ if (receiverOptions .getSessionId () != null || receiverOptions .getMaxConcurrentSessions () != null ) {
274
+ // Assert the internal invariant for above 'sessionManager = null' i.e, session-unaware call-sites should not set these options.
275
+ throw new IllegalStateException ("Session-specific options are not expected to be present on a client for session unaware entity." );
276
+ }
277
+ this .isSessionEnabled = false ;
271
278
272
279
this .managementNodeLocks = new LockContainer <>(cleanupInterval );
273
280
this .renewalContainer = new LockContainer <>(Duration .ofMinutes (2 ), renewal -> {
@@ -278,7 +285,6 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
278
285
renewal .close ();
279
286
});
280
287
281
- this .sessionManager = null ;
282
288
this .identifier = identifier ;
283
289
this .tracer = instrumentation .getTracer ();
284
290
this .trackSettlementSequenceNumber = instrumentation .startTrackingSettlementSequenceNumber ();
@@ -298,6 +304,7 @@ public final class ServiceBusReceiverAsyncClient implements AutoCloseable {
298
304
this .messageSerializer = Objects .requireNonNull (messageSerializer , "'messageSerializer' cannot be null." );
299
305
this .onClientClose = Objects .requireNonNull (onClientClose , "'onClientClose' cannot be null." );
300
306
this .sessionManager = Objects .requireNonNull (sessionManager , "'sessionManager' cannot be null." );
307
+ this .isSessionEnabled = true ;
301
308
302
309
this .managementNodeLocks = new LockContainer <>(cleanupInterval );
303
310
this .renewalContainer = new LockContainer <>(Duration .ofMinutes (2 ), renewal -> {
@@ -865,7 +872,7 @@ Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
865
872
final Flux <ServiceBusMessageContext > messageFluxWithTracing = new FluxTrace (messageFlux , instrumentation );
866
873
final Flux <ServiceBusMessageContext > withAutoLockRenewal ;
867
874
868
- if (!receiverOptions . isSessionReceiver () && receiverOptions .isAutoLockRenewEnabled ()) {
875
+ if (!isSessionEnabled && receiverOptions .isAutoLockRenewEnabled ()) {
869
876
withAutoLockRenewal = new FluxAutoLockRenew (messageFluxWithTracing , receiverOptions ,
870
877
renewalContainer , this ::renewMessageLock );
871
878
} else {
@@ -1032,15 +1039,15 @@ public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message)
1032
1039
if (isDisposed .get ()) {
1033
1040
return monoError (LOGGER , new IllegalStateException (
1034
1041
String .format (INVALID_OPERATION_DISPOSED_RECEIVER , "renewMessageLock" )));
1042
+ } else if (isSessionEnabled ) {
1043
+ final String errorMessage = "Renewing message lock is an invalid operation when working with sessions." ;
1044
+ return monoError (LOGGER , new IllegalStateException (errorMessage ));
1035
1045
} else if (Objects .isNull (message )) {
1036
1046
return monoError (LOGGER , new NullPointerException ("'message' cannot be null." ));
1037
1047
} else if (Objects .isNull (message .getLockToken ())) {
1038
1048
return monoError (LOGGER , new NullPointerException ("'message.getLockToken()' cannot be null." ));
1039
1049
} else if (message .getLockToken ().isEmpty ()) {
1040
1050
return monoError (LOGGER , new IllegalArgumentException ("'message.getLockToken()' cannot be empty." ));
1041
- } else if (receiverOptions .isSessionReceiver ()) {
1042
- final String errorMessage = "Renewing message lock is an invalid operation when working with sessions." ;
1043
- return monoError (LOGGER , new IllegalStateException (errorMessage ));
1044
1051
}
1045
1052
1046
1053
return tracer .traceMonoWithLink ("ServiceBus.renewMessageLock" , renewMessageLock (message .getLockToken ()), message , message .getContext ())
@@ -1088,15 +1095,15 @@ public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration m
1088
1095
if (isDisposed .get ()) {
1089
1096
return monoError (LOGGER , new IllegalStateException (
1090
1097
String .format (INVALID_OPERATION_DISPOSED_RECEIVER , "getAutoRenewMessageLock" )));
1098
+ } else if (isSessionEnabled ) {
1099
+ final String errorMessage = "Renewing message lock is an invalid operation when working with sessions." ;
1100
+ return monoError (LOGGER , new IllegalStateException (errorMessage ));
1091
1101
} else if (Objects .isNull (message )) {
1092
1102
return monoError (LOGGER , new NullPointerException ("'message' cannot be null." ));
1093
1103
} else if (Objects .isNull (message .getLockToken ())) {
1094
1104
return monoError (LOGGER , new NullPointerException ("'message.getLockToken()' cannot be null." ));
1095
1105
} else if (message .getLockToken ().isEmpty ()) {
1096
1106
return monoError (LOGGER , new IllegalArgumentException ("'message.getLockToken()' cannot be empty." ));
1097
- } else if (receiverOptions .isSessionReceiver ()) {
1098
- return monoError (LOGGER , new IllegalStateException (
1099
- String .format ("Cannot renew message lock [%s] for a session receiver." , message .getLockToken ())));
1100
1107
} else if (maxLockRenewalDuration == null ) {
1101
1108
return monoError (LOGGER , new NullPointerException ("'maxLockRenewalDuration' cannot be null." ));
1102
1109
} else if (maxLockRenewalDuration .isNegative ()) {
@@ -1485,6 +1492,9 @@ deadLetterErrorDescription, propertiesToModify, sessionId, getLinkName(sessionId
1485
1492
}
1486
1493
1487
1494
private ServiceBusAsyncConsumer getOrCreateConsumer () {
1495
+ if (isSessionEnabled ) {
1496
+ throw LOGGER .logExceptionAsError (new IllegalStateException ("The ServiceBusAsyncConsumer is expected to work only with session unaware entity." ));
1497
+ }
1488
1498
final ServiceBusAsyncConsumer existing = consumer .get ();
1489
1499
if (existing != null ) {
1490
1500
return existing ;
@@ -1499,19 +1509,14 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
1499
1509
// The Mono, when subscribed, creates a ServiceBusReceiveLink in the ServiceBusAmqpConnection emitted by the connectionProcessor
1500
1510
//
1501
1511
final Mono <ServiceBusReceiveLink > receiveLinkMono = connectionProcessor .flatMap (connection -> {
1502
- if (receiverOptions .isSessionReceiver ()) {
1503
- return connection .createReceiveLink (linkName , entityPath , receiverOptions .getReceiveMode (),
1504
- null , entityType , identifier , receiverOptions .getSessionId ());
1505
- } else {
1506
- return connection .createReceiveLink (linkName , entityPath , receiverOptions .getReceiveMode (),
1507
- null , entityType , identifier );
1508
- }
1512
+ return connection .createReceiveLink (linkName , entityPath , receiverOptions .getReceiveMode (),
1513
+ null , entityType , identifier );
1509
1514
}).doOnNext (next -> {
1510
1515
LOGGER .atVerbose ()
1511
1516
.addKeyValue (LINK_NAME_KEY , linkName )
1512
1517
.addKeyValue (ENTITY_PATH_KEY , next .getEntityPath ())
1513
1518
.addKeyValue ("mode" , receiverOptions .getReceiveMode ())
1514
- .addKeyValue ("isSessionEnabled" , receiverOptions . isSessionReceiver () )
1519
+ .addKeyValue ("isSessionEnabled" , false )
1515
1520
.addKeyValue (ENTITY_TYPE_KEY , entityType )
1516
1521
.log ("Created consumer for Service Bus resource." );
1517
1522
});
@@ -1576,10 +1581,8 @@ private ServiceBusAsyncConsumer getOrCreateConsumer() {
1576
1581
* @return The name of the receive link, or null of it has not connected via a receive link.
1577
1582
*/
1578
1583
private String getLinkName (String sessionId ) {
1579
- if (sessionManager != null && !CoreUtils .isNullOrEmpty (sessionId )) {
1580
- return sessionManager .getLinkName (sessionId );
1581
- } else if (!CoreUtils .isNullOrEmpty (sessionId ) && !receiverOptions .isSessionReceiver ()) {
1582
- return null ;
1584
+ if (!CoreUtils .isNullOrEmpty (sessionId )) {
1585
+ return isSessionEnabled ? sessionManager .getLinkName (sessionId ) : null ;
1583
1586
} else {
1584
1587
final ServiceBusAsyncConsumer existing = consumer .get ();
1585
1588
return existing != null ? existing .getLinkName () : null ;
@@ -1590,12 +1593,10 @@ Mono<OffsetDateTime> renewSessionLock(String sessionId) {
1590
1593
if (isDisposed .get ()) {
1591
1594
return monoError (LOGGER , new IllegalStateException (
1592
1595
String .format (INVALID_OPERATION_DISPOSED_RECEIVER , "renewSessionLock" )));
1593
- } else if (!receiverOptions . isSessionReceiver () ) {
1596
+ } else if (!isSessionEnabled ) {
1594
1597
return monoError (LOGGER , new IllegalStateException ("Cannot renew session lock on a non-session receiver." ));
1595
1598
}
1596
- final String linkName = sessionManager != null
1597
- ? sessionManager .getLinkName (sessionId )
1598
- : null ;
1599
+ final String linkName = sessionManager .getLinkName (sessionId );
1599
1600
1600
1601
return tracer .traceMono ("ServiceBus.renewSessionLock" , connectionProcessor
1601
1602
.flatMap (connection -> connection .getManagementNode (entityPath , entityType ))
@@ -1607,7 +1608,7 @@ Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
1607
1608
if (isDisposed .get ()) {
1608
1609
return monoError (LOGGER , new IllegalStateException (
1609
1610
String .format (INVALID_OPERATION_DISPOSED_RECEIVER , "renewSessionLock" )));
1610
- } else if (!receiverOptions . isSessionReceiver () ) {
1611
+ } else if (!isSessionEnabled ) {
1611
1612
return monoError (LOGGER , new IllegalStateException (
1612
1613
"Cannot renew session lock on a non-session receiver." ));
1613
1614
} else if (maxLockRenewalDuration == null ) {
@@ -1632,7 +1633,7 @@ Mono<Void> setSessionState(String sessionId, byte[] sessionState) {
1632
1633
if (isDisposed .get ()) {
1633
1634
return monoError (LOGGER , new IllegalStateException (
1634
1635
String .format (INVALID_OPERATION_DISPOSED_RECEIVER , "setSessionState" )));
1635
- } else if (!receiverOptions . isSessionReceiver () ) {
1636
+ } else if (!isSessionEnabled ) {
1636
1637
return monoError (LOGGER , new IllegalStateException ("Cannot set session state on a non-session receiver." ));
1637
1638
}
1638
1639
final String linkName = sessionManager != null
@@ -1649,7 +1650,7 @@ Mono<byte[]> getSessionState(String sessionId) {
1649
1650
if (isDisposed .get ()) {
1650
1651
return monoError (LOGGER , new IllegalStateException (
1651
1652
String .format (INVALID_OPERATION_DISPOSED_RECEIVER , "getSessionState" )));
1652
- } else if (!receiverOptions . isSessionReceiver () ) {
1653
+ } else if (!isSessionEnabled ) {
1653
1654
return monoError (LOGGER , new IllegalStateException ("Cannot get session state on a non-session receiver." ));
1654
1655
}
1655
1656
0 commit comments