Skip to content

Commit ada9824

Browse files
authored
Adding unit test for updateDisposition closes link on timeout (Azure#28081)
1 parent a136f4f commit ada9824

File tree

1 file changed

+30
-0
lines changed

1 file changed

+30
-0
lines changed

sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReceiveLinkProcessorTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,4 +695,34 @@ void updateDispositionDoesNotAddCredit() {
695695
verify(link1).addCredits(eq(PREFETCH));
696696
verify(link1).updateDisposition(eq(lockToken), eq(deliveryState));
697697
}
698+
699+
@Test
700+
void updateDispositionClosesLinkOnTimeout() {
701+
// Arrange
702+
final ServiceBusReceiveLinkProcessor processor = Flux.<ServiceBusReceiveLink>create(sink -> sink.next(link1))
703+
.subscribeWith(linkProcessor);
704+
705+
final AmqpException amqpException = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR,
706+
"Test-timeout-error", new AmqpErrorContext("test-namespace"));
707+
when(retryPolicy.calculateRetryDelay(amqpException, 1)).thenReturn(Duration.ofSeconds(1));
708+
709+
final String lockToken = "lockToken";
710+
final DeliveryState deliveryState = mock(DeliveryState.class);
711+
712+
when(link1.updateDisposition(eq(lockToken), eq(deliveryState))).thenReturn(Mono.error(amqpException));
713+
when(link1.closeAsync()).thenReturn(Mono.empty());
714+
715+
// Act & Assert
716+
StepVerifier.create(processor.updateDisposition(lockToken, deliveryState))
717+
.expectErrorSatisfies(error -> assertSame(amqpException, error))
718+
.verify();
719+
processor.cancel();
720+
721+
verify(link1).updateDisposition(eq(lockToken), eq(deliveryState));
722+
verify(link1, times(1)).closeAsync();
723+
724+
assertTrue(processor.isTerminated());
725+
assertFalse(processor.hasError());
726+
assertNull(processor.getError());
727+
}
698728
}

0 commit comments

Comments
 (0)