Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
Expand All @@ -31,17 +32,20 @@
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceBusConsumer extends DefaultConsumer {
public class ServiceBusConsumer extends DefaultConsumer implements ShutdownAware {

private static final Logger LOG = LoggerFactory.getLogger(ServiceBusConsumer.class);
private ServiceBusProcessorClient client;
private final AtomicInteger pendingExchanges = new AtomicInteger();

public ServiceBusConsumer(final ServiceBusEndpoint endpoint, final Processor processor) {
super(endpoint, processor);
Expand Down Expand Up @@ -75,6 +79,7 @@ protected void doStart() throws Exception {
}

private void processMessage(ServiceBusReceivedMessageContext messageContext) {
pendingExchanges.incrementAndGet();
final ServiceBusReceivedMessage message = messageContext.getMessage();
final Exchange exchange = createServiceBusExchange(message);
final ConsumerOnCompletion onCompletion = new ConsumerOnCompletion(messageContext);
Expand All @@ -98,14 +103,45 @@ private void processError(ServiceBusErrorContext errorContext) {
@Override
protected void doStop() throws Exception {
if (client != null) {
// shutdown the client
client.close();
// stop accepting new messages but keep the connection open
// so that in-flight exchanges can still complete/abandon messages
client.stop();
}

// shutdown camel consumer
super.doStop();
}

@Override
protected void doShutdown() throws Exception {
if (client != null) {
// close the client after all in-flight exchanges have completed
client.close();
}

super.doShutdown();
}

@Override
public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
// stop accepting new messages but keep the connection open
// so that in-flight exchanges can still complete/abandon messages
if (client != null) {
client.stop();
}
return true;
}

@Override
public int getPendingExchangesSize() {
return pendingExchanges.get();
}

@Override
public void prepareShutdown(boolean suspendOnly, boolean forced) {
// noop
}

public ServiceBusConfiguration getConfiguration() {
return getEndpoint().getConfiguration();
}
Expand Down Expand Up @@ -171,36 +207,45 @@ private ConsumerOnCompletion(ServiceBusReceivedMessageContext messageContext) {

@Override
public void onComplete(Exchange exchange) {
super.onComplete(exchange);
if (getConfiguration().getServiceBusReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
messageContext.complete();
try {
super.onComplete(exchange);
if (getConfiguration().getServiceBusReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
messageContext.complete();
}
} finally {
pendingExchanges.decrementAndGet();
}
}

@Override
public void onFailure(Exchange exchange) {
final Exception cause = exchange.getException();
if (cause != null) {
getExceptionHandler().handleException("Error during processing exchange.", exchange, cause);
}
try {
final Exception cause = exchange.getException();
if (cause != null) {
getExceptionHandler().handleException("Error during processing exchange.", exchange, cause);
}

if (getConfiguration().getServiceBusReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
if (getConfiguration().isEnableDeadLettering() && (ObjectHelper.isEmpty(getConfiguration().getSubQueue())
|| ObjectHelper.equal(getConfiguration().getSubQueue(), SubQueue.NONE))) {
DeadLetterOptions deadLetterOptions = new DeadLetterOptions();
if (cause != null) {
deadLetterOptions
.setDeadLetterReason(String.format("%s: %s", cause.getClass().getName(), cause.getMessage()));
deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n")));
messageContext.deadLetter(deadLetterOptions);
if (getConfiguration().getServiceBusReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
if (getConfiguration().isEnableDeadLettering() && (ObjectHelper.isEmpty(getConfiguration().getSubQueue())
|| ObjectHelper.equal(getConfiguration().getSubQueue(), SubQueue.NONE))) {
DeadLetterOptions deadLetterOptions = new DeadLetterOptions();
if (cause != null) {
deadLetterOptions
.setDeadLetterReason(
String.format("%s: %s", cause.getClass().getName(), cause.getMessage()));
deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n")));
messageContext.deadLetter(deadLetterOptions);
} else {
messageContext.deadLetter();
}
} else {
messageContext.deadLetter();
messageContext.abandon();
}
} else {
messageContext.abandon();
}
} finally {
pendingExchanges.decrementAndGet();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,79 @@ void customProcessorClient() throws Exception {
}
}

@Test
void deferShutdownStopsClientAndReturnsTrue() throws Exception {
try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, processor)) {
consumer.doStart();

boolean deferred = consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);

assertThat(deferred).isTrue();
verify(client).stop();
}
}

@Test
void pendingExchangesSizeTracksInflightMessages() throws Exception {
try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, processor)) {
when(configuration.getServiceBusReceiveMode()).thenReturn(ServiceBusReceiveMode.PEEK_LOCK);
consumer.doStart();

assertThat(consumer.getPendingExchangesSize()).isZero();

when(messageContext.getMessage()).thenReturn(message);
processMessageCaptor.getValue().accept(messageContext);

assertThat(consumer.getPendingExchangesSize()).isEqualTo(1);

Exchange exchange = exchangeCaptor.getValue();
Synchronization synchronization = exchange.getExchangeExtension().handoverCompletions().get(0);
synchronization.onComplete(exchange);

assertThat(consumer.getPendingExchangesSize()).isZero();
}
}

@Test
void pendingExchangesSizeDecrementsOnFailure() throws Exception {
try (ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, processor)) {
when(configuration.getServiceBusReceiveMode()).thenReturn(ServiceBusReceiveMode.PEEK_LOCK);
consumer.doStart();

when(messageContext.getMessage()).thenReturn(message);
processMessageCaptor.getValue().accept(messageContext);

assertThat(consumer.getPendingExchangesSize()).isEqualTo(1);

Exchange exchange = exchangeCaptor.getValue();
Synchronization synchronization = exchange.getExchangeExtension().handoverCompletions().get(0);
synchronization.onFailure(exchange);

assertThat(consumer.getPendingExchangesSize()).isZero();
}
}

@Test
void doStopStopsClientWithoutClosing() throws Exception {
ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, processor);
consumer.doStart();

consumer.doStop();

verify(client).stop();
verify(client, never()).close();
}

@Test
void doShutdownClosesClient() throws Exception {
ServiceBusConsumer consumer = new ServiceBusConsumer(endpoint, processor);
consumer.doStart();

consumer.doShutdown();

verify(client).close();
}

private void configureMockMessage() {
when(message.getApplicationProperties()).thenReturn(new HashMap<>());
when(message.getBody()).thenReturn(BinaryData.fromBytes(MESSAGE_BODY.getBytes()));
Expand Down
Loading