Skip to content

Commit f7b151f

Browse files
authored
implement queue-based load leveling pattern
1 parent b8b924a commit f7b151f

File tree

1 file changed

+49
-16
lines changed

1 file changed

+49
-16
lines changed

docs/aac/java-mwa-guide.md

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -117,33 +117,66 @@ Use the [Strangler fig](/azure/architecture/patterns/strangler-fig) pattern to g
117117

118118
Implement the [Queue-Based Load Leveling pattern](/azure/architecture/patterns/queue-based-load-leveling) on producer portion of the decoupled service to asynchronously handle tasks that don't need immediate responses. This pattern enhances overall system responsiveness and scalability by using a queue to manage workload distribution. It allows the decoupled service to process requests at a consistent rate. To implement this pattern effectively, follow these recommendations:
119119

120-
- *Use nonblocking message queuing.* Ensure the process that sends messages to the queue doesn't block other processes while waiting for the decoupled service to handle messages in the queue. If the process requires the result of the decoupled-service operation, have an alternative way to handle the situation while waiting for the queued operation to complete. For example, the reference implementation uses Azure Service Bus and the `await` keyword with `messageSender.PublishAsync()` to asynchronously publish messages to the queue without blocking the thread that executes this code (*see example code*):
120+
- *Use non-blocking message queuing.* Ensure the process that sends messages to the queue doesn't block other processes while waiting for the decoupled service to handle messages in the queue. If the process requires the result of the decoupled-service operation, implement an alternative way to handle the situation while waiting for the queued operation to complete. For example, in Spring Boot, you can use the `StreamBridge` class to asynchronously publish messages to the queue without blocking the calling thread (*see example code*):
121+
122+
```java
123+
private final StreamBridge streamBridge;
124+
125+
public SupportGuideQueueSender(StreamBridge streamBridge) {
126+
this.streamBridge = streamBridge;
127+
}
121128

122-
```csharp
123129
// Asynchronously publish a message without blocking the calling thread
124-
await messageSender.PublishAsync(new TicketRenderRequestMessage(Guid.NewGuid(), ticket, null, DateTime.Now), CancellationToken.None);
130+
@Override
131+
public void send(String to, String guideUrl, Long requestId) {
132+
EmailRequest emailRequest = EmailRequest.newBuilder()
133+
.setRequestId(requestId)
134+
.setEmailAddress(to)
135+
.setUrlToManual(guideUrl)
136+
.build();
137+
138+
log.info("EmailRequest: {}", emailRequest);
139+
140+
var message = emailRequest.toByteArray();
141+
streamBridge.send(EMAIL_REQUEST_QUEUE, message);
142+
143+
log.info("Message sent to the queue");
144+
}
125145
```
126146

147+
This Java example uses `StreamBridge` to send messages asynchronously
148+
127149
This approach ensures that the main application remains responsive and can handle other tasks concurrently, while the decoupled service processes the queued requests at a manageable rate.
128150

129151
- *Implement message retry and removal.* Implement a mechanism to retry processing of queued messages that can't be processed successfully. If failures persist, these messages should be removed from the queue. For example, Azure Service Bus has built-in retry and dead letter queue features.
130152
131-
- *Configure idempotent message processing.* The logic that processes messages from the queue must be idempotent to handle cases where a message might be processed more than once. For example, the reference implementation uses `ServiceBusClient.CreateProcessor` with `AutoCompleteMessages = true` and `ReceiveMode = ServiceBusReceiveMode.PeekLock` to ensure messages are only processed once and can be reprocessed on failure (*see following code*).
153+
- *Configure idempotent message processing.* The logic that processes messages from the queue must be idempotent to handle cases where a message might be processed more than once. In Spring Boot, you can use `@StreamListener` or `@KafkaListener` with a unique message identifier to prevent duplicate processing. Or you can organize the business process o operate in a functional approach with Spring Cloud Stream, where the `consume` method is defined in a way that produces the same result when it is executed repeatedly (*see example code*):
132154
133-
```csharp
134-
// Create a processor for idempotent message processing
135-
var processor = serviceBusClient.CreateProcessor(path, new ServiceBusProcessorOptions
136-
{
137-
// Allow the messages to be auto-completed if processing finishes without failure
138-
AutoCompleteMessages = true,
155+
```java
156+
Function<byte[], byte[]> consume() {
157+
return message -> {
139158
140-
// PeekLock mode provides reliability in that unsettled messages will be redelivered on failure
141-
ReceiveMode = ServiceBusReceiveMode.PeekLock,
159+
log.info("New message received");
142160
143-
// Containerized processors can scale at the container level and need not scale via the processor options
144-
MaxConcurrentCalls = 1,
145-
PrefetchCount = 0
146-
});
161+
try {
162+
EmailRequest emailRequest = EmailRequest.parseFrom(message);
163+
log.info("EmailRequest: {}", emailRequest);
164+
165+
EmailResponse emailResponse = EmailResponse.newBuilder()
166+
.setEmailAddress(emailRequest.getEmailAddress())
167+
.setUrlToManual(emailRequest.getUrlToManual())
168+
.setRequestId(emailRequest.getRequestId())
169+
.setMessage("Email sent to " + emailRequest.getEmailAddress() + " with URL to manual " + emailRequest.getUrlToManual())
170+
.setStatus(Status.SUCCESS)
171+
.build();
172+
173+
return emailResponse.toByteArray();
174+
175+
} catch (InvalidProtocolBufferException e) {
176+
throw new RuntimeException("Error parsing email request message", e);
177+
}
178+
};
179+
}
147180
```
148181
149182
- *Manage changes to the experience.* Asynchronous processing can lead to tasks not being immediately completed. Users should be made aware when their task is still being processed to set correct expectations and avoid confusion. Use visual cues or messages to indicate that a task is in progress. Give users the option to receive notifications when their task is done, such as an email or push notification.

0 commit comments

Comments
 (0)