Skip to content

Commit ed6ed6c

Browse files
authored
competing consumers pattern
1 parent f7b151f commit ed6ed6c

File tree

1 file changed

+40
-70
lines changed

1 file changed

+40
-70
lines changed

docs/aac/java-mwa-guide.md

Lines changed: 40 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -150,34 +150,7 @@ This approach ensures that the main application remains responsive and can handl
150150

151151
- *Implement message retry and removal.* Implement a mechanism to retry processing of queued messages that can't be processed successfully. If failures persist, these messages should be removed from the queue. For example, Azure Service Bus has built-in retry and dead letter queue features.
152152
153-
- *Configure idempotent message processing.* The logic that processes messages from the queue must be idempotent to handle cases where a message might be processed more than once. In Spring Boot, you can use `@StreamListener` or `@KafkaListener` with a unique message identifier to prevent duplicate processing. Or you can organize the business process o operate in a functional approach with Spring Cloud Stream, where the `consume` method is defined in a way that produces the same result when it is executed repeatedly (*see example code*):
154-
155-
```java
156-
Function<byte[], byte[]> consume() {
157-
return message -> {
158-
159-
log.info("New message received");
160-
161-
try {
162-
EmailRequest emailRequest = EmailRequest.parseFrom(message);
163-
log.info("EmailRequest: {}", emailRequest);
164-
165-
EmailResponse emailResponse = EmailResponse.newBuilder()
166-
.setEmailAddress(emailRequest.getEmailAddress())
167-
.setUrlToManual(emailRequest.getUrlToManual())
168-
.setRequestId(emailRequest.getRequestId())
169-
.setMessage("Email sent to " + emailRequest.getEmailAddress() + " with URL to manual " + emailRequest.getUrlToManual())
170-
.setStatus(Status.SUCCESS)
171-
.build();
172-
173-
return emailResponse.toByteArray();
174-
175-
} catch (InvalidProtocolBufferException e) {
176-
throw new RuntimeException("Error parsing email request message", e);
177-
}
178-
};
179-
}
180-
```
153+
- *Configure idempotent message processing.* The logic that processes messages from the queue must be idempotent to handle cases where a message might be processed more than once. In Spring Boot, you can use `@StreamListener` or `@KafkaListener` with a unique message identifier to prevent duplicate processing. Or you can organize the business process o operate in a functional approach with Spring Cloud Stream, where the `consume` method is defined in a way that produces the same result when it is executed repeatedly.
181154
182155
- *Manage changes to the experience.* Asynchronous processing can lead to tasks not being immediately completed. Users should be made aware when their task is still being processed to set correct expectations and avoid confusion. Use visual cues or messages to indicate that a task is in progress. Give users the option to receive notifications when their task is done, such as an email or push notification.
183156
@@ -197,13 +170,13 @@ Implement the [Competing Consumers pattern](/azure/architecture/patterns/competi
197170
198171
To implement the Competing Consumers pattern, follow these recommendations:
199172
200-
- *Handle concurrent messages.* When receiving messages from a queue, ensure that your system is designed to handle multiple messages concurrently. Set the maximum concurrent calls to 1 so a separate consumer handles each message.
173+
- *Handle concurrent messages.* When receiving messages from a queue, ensure that your system scales predictably by configuring the concurrency to match your system design. Your load test results will help you decide the appropriate number of concerrent messages to handle and you can start from 1 to measure the impact how the system will perform.
201174
202-
- *Disable prefetching.* Disable message prefetching of messages so consumers fetching messages only when they're ready.
175+
- *Disable prefetching.* Disable prefetching of messages so consumers only fetch messages when they're ready.
203176

204-
- *Use reliable message processing modes.* Use a reliable processing mode, such as PeekLock (or its equivalent), that automatically retry messages that fail processing. This mode enhances reliability over deletion-first methods. If one worker fails to handle a message, another must be able to process it without errors, even if the message is processed multiple times.
177+
- *Use reliable message processing modes.* Use a reliable processing mode, such as PeekLock (or its equivalent), that automatically retries messages that fail processing. This mode enhances reliability over deletion-first methods. If one worker fails to handle a message, another must be able to process it without errors, even if the message is processed multiple times.
205178

206-
- *Implement error handling.* Route malformed or unprocessable messages to a separate, dead-letter queue. This design prevents repetitive processing. For example, you can catch exceptions during message processing and move the problematic message to the separate queue.
179+
- *Implement error handling.* Route malformed or unprocessable messages to a separate, dead-letter queue. This design prevents repetitive processing. For example, you can catch exceptions during message processing and move the problematic message to the separate queue. For Azure Service Bus, messages are moved to the dead-leter queue after a specified number of delivery attempts or on explicit rejection by the application.
207180

208181
- *Handle out-of-order messages.* Design consumers to process messages that arrive out of sequence. Multiple parallel consumers means they might process messages out of order.
209182

@@ -215,45 +188,42 @@ To implement the Competing Consumers pattern, follow these recommendations:
215188

216189
- *Configure logging.* Integrate logging and specific exception handling within the message processing workflow. Focus on capturing serialization errors and directing these problematic messages to a dead letter mechanism. These logs provide valuable insights for troubleshooting.
217190

218-
For example, the reference implementation uses the Competing Consumers pattern a stateless service on Azure Container App to process ticket-rendering requests from an Azure Service Bus queue. It configures a queue processor with:
219-
220-
- *AutoCompleteMessages*: Automatically completes messages if processed without failure.
221-
- *ReceiveMode*: Uses PeekLock mode and redelivers messages if they aren't settled.
222-
- *MaxConcurrentCalls*: Set to 1 to handle one message at a time.
223-
- *PrefetchCount*: Set to 0 to avoid prefetching messages.
224-
225-
The processor logs message processing details, aiding in troubleshooting and monitoring. It captures deserialization errors and routes invalid messages to a dead-letter queue, preventing repetitive processing of faulty messages. The service scales at the container level, allowing for efficient handling of message spikes based on queue length.
226-
227-
```C#
228-
// Create a processor for the given queue that will process incoming messages
229-
var processor = serviceBusClient.CreateProcessor(path, new ServiceBusProcessorOptions
230-
{
231-
// Allow the messages to be auto-completed if processing finishes without failure
232-
AutoCompleteMessages = true,
233-
// PeekLock mode provides reliability in that unsettled messages will be redelivered on failure
234-
ReceiveMode = ServiceBusReceiveMode.PeekLock,
235-
// Containerized processors can scale at the container level and need not scale via the processor options
236-
MaxConcurrentCalls = 1,
237-
PrefetchCount = 0
238-
});
239-
240-
// Called for each message received by the processor
241-
processor.ProcessMessageAsync += async args =>
242-
{
243-
logger.LogInformation("Processing message {MessageId} from {ServiceBusNamespace}/{Path}", args.Message.MessageId, args.FullyQualifiedNamespace, args.EntityPath);
244-
// Unhandled exceptions in the handler will be caught by the processor and result in abandoning and dead-lettering the message
245-
try
246-
{
247-
var message = args.Message.Body.ToObjectFromJson<T>();
248-
await messageHandler(message, args.CancellationToken);
249-
logger.LogInformation("Successfully processed message {MessageId} from {ServiceBusNamespace}/{Path}", args.Message.MessageId, args.FullyQualifiedNamespace, args.EntityPath);
250-
}
251-
catch (JsonException)
252-
{
253-
logger.LogError("Invalid message body; could not be deserialized to {Type}", typeof(T));
254-
await args.DeadLetterMessageAsync(args.Message, $"Invalid message body; could not be deserialized to {typeof(T)}", cancellationToken: args.CancellationToken);
191+
For example, the reference implementation uses the Competing Consumers pattern on a stateless service running in Azure Container App to process the email delivery requests from an Azure Service Bus queue.
192+
193+
The processor logs message processing details, aiding in troubleshooting and monitoring. It captures deserialization errors and provides insights needed when debugging the process. The service scales at the container level, allowing for efficient handling of message spikes based on queue length.
194+
195+
```java
196+
@Configuration
197+
public class EmailProcessor {
198+
199+
private static final Logger log = LoggerFactory.getLogger(EmailProcessor.class);
200+
201+
@Bean
202+
Function<byte[], byte[]> consume() {
203+
return message -> {
204+
205+
log.info("New message received");
206+
207+
try {
208+
EmailRequest emailRequest = EmailRequest.parseFrom(message);
209+
log.info("EmailRequest: {}", emailRequest);
210+
211+
EmailResponse emailResponse = EmailResponse.newBuilder()
212+
.setEmailAddress(emailRequest.getEmailAddress())
213+
.setUrlToManual(emailRequest.getUrlToManual())
214+
.setRequestId(emailRequest.getRequestId())
215+
.setMessage("Email sent to " + emailRequest.getEmailAddress() + " with URL to manual " + emailRequest.getUrlToManual())
216+
.setStatus(Status.SUCCESS)
217+
.build();
218+
219+
return emailResponse.toByteArray();
220+
221+
} catch (InvalidProtocolBufferException e) {
222+
throw new RuntimeException("Error parsing email request message", e);
223+
}
224+
};
255225
}
256-
};
226+
}
257227
```
258228

259229
### Implement the Health Endpoint Monitoring pattern

0 commit comments

Comments
 (0)