Skip to content

Commit 1f36a79

Browse files
authored
fix batch consuming issue (Azure#25664)
1 parent 7264c48 commit 1f36a79

File tree

3 files changed

+146
-23
lines changed

3 files changed

+146
-23
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.spring.test.eventhubs.stream.binder;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
11+
import org.springframework.boot.test.context.SpringBootTest;
12+
import org.springframework.context.annotation.Bean;
13+
import org.springframework.messaging.Message;
14+
import org.springframework.messaging.support.GenericMessage;
15+
import org.springframework.test.context.TestPropertySource;
16+
import reactor.core.publisher.Flux;
17+
import reactor.core.publisher.Sinks;
18+
19+
import java.util.List;
20+
import java.util.UUID;
21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Consumer;
24+
import java.util.function.Supplier;
25+
26+
import static org.assertj.core.api.Assertions.assertThat;
27+
28+
@SpringBootTest(classes = EventHubBinderConsumingBatchModeIT.TestConfig.class)
29+
@TestPropertySource(properties =
30+
{
31+
"spring.cloud.stream.eventhub.bindings.consume-in-0.consumer.checkpoint-mode=BATCH",
32+
"spring.cloud.stream.eventhub.bindings.consume-in-0.consumer.max-batch-size=10",
33+
"spring.cloud.stream.eventhub.bindings.consume-in-0.consumer.max-wait-time=2s",
34+
"spring.cloud.stream.bindings.consume-in-0.destination=test-eventhub-consuming-batch",
35+
"spring.cloud.stream.bindings.supply-out-0.destination=test-eventhub-consuming-batch",
36+
"spring.cloud.azure.eventhub.checkpoint-container=test-eventhub-consuming-batch",
37+
"spring.cloud.stream.bindings.consume-in-0.consumer.batch-mode=true"
38+
})
39+
public class EventHubBinderConsumingBatchModeIT {
40+
41+
private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderConsumingBatchModeIT.class);
42+
43+
private static final String MESSAGE = UUID.randomUUID().toString();
44+
45+
private static final CountDownLatch LATCH = new CountDownLatch(1);
46+
47+
@Autowired
48+
private Sinks.Many<Message<String>> many;
49+
50+
@EnableAutoConfiguration
51+
public static class TestConfig {
52+
53+
@Bean
54+
public Sinks.Many<Message<String>> many() {
55+
return Sinks.many().unicast().onBackpressureBuffer();
56+
}
57+
58+
@Bean
59+
public Supplier<Flux<Message<String>>> supply(Sinks.Many<Message<String>> many) {
60+
return () -> many.asFlux()
61+
.doOnNext(m -> LOGGER.info("Manually sending message {}", m.getPayload()))
62+
.doOnError(t -> LOGGER.error("Error encountered", t));
63+
}
64+
65+
@Bean
66+
public Consumer<Message<List<String>>> consume() {
67+
return message -> {
68+
List<String> payload = message.getPayload();
69+
LOGGER.info("EventHubBinderBatchModeIT: New message received: '{}'", payload);
70+
if (payload.contains(EventHubBinderConsumingBatchModeIT.MESSAGE)) {
71+
LATCH.countDown();
72+
}
73+
};
74+
}
75+
}
76+
77+
@Test
78+
public void testSendAndReceiveMessage() throws InterruptedException {
79+
LOGGER.info("EventHubBinderBatchModeIT begin.");
80+
EventHubBinderConsumingBatchModeIT.LATCH.await(15, TimeUnit.SECONDS);
81+
LOGGER.info("Send a message:" + MESSAGE + ".");
82+
many.emitNext(new GenericMessage<>("\"" + MESSAGE + "\""), Sinks.EmitFailureHandler.FAIL_FAST);
83+
assertThat(EventHubBinderConsumingBatchModeIT.LATCH.await(600, TimeUnit.SECONDS)).isTrue();
84+
LOGGER.info("EventHubBinderBatchModeIT end.");
85+
}
86+
}

sdk/spring/azure-spring-cloud-test-eventhubs/test-resources.json

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,20 @@
9494
"status": "Active"
9595
}
9696
},
97+
{
98+
"type": "Microsoft.EventHub/namespaces/eventhubs",
99+
"apiVersion": "2017-04-01",
100+
"name": "[concat(variables('eventHubsNamespaceName'), '/test-eventhub-consuming-batch')]",
101+
"location": "[variables('location')]",
102+
"dependsOn": [
103+
"[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespaceName'))]"
104+
],
105+
"properties": {
106+
"messageRetentionInDays": 1,
107+
"partitionCount": 1,
108+
"status": "Active"
109+
}
110+
},
97111
{
98112
"type": "Microsoft.EventHub/namespaces/eventhubs",
99113
"apiVersion": "2017-04-01",
@@ -172,6 +186,17 @@
172186
],
173187
"properties": {}
174188
},
189+
{
190+
"type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups",
191+
"apiVersion": "2017-04-01",
192+
"name": "[concat(variables('eventHubsNamespaceName'), '/test-eventhub-consuming-batch/$Default')]",
193+
"location": "[variables('location')]",
194+
"dependsOn": [
195+
"[resourceId('Microsoft.EventHub/namespaces/eventhubs', variables('eventHubsNamespaceName'), 'test-eventhub-consuming-batch')]",
196+
"[resourceId('Microsoft.EventHub/namespaces', variables('eventHubsNamespaceName'))]"
197+
],
198+
"properties": {}
199+
},
175200
{
176201
"type": "Microsoft.EventHub/namespaces/eventhubs/consumergroups",
177202
"apiVersion": "2017-04-01",
@@ -311,6 +336,18 @@
311336
"publicAccess": "None"
312337
}
313338
},
339+
{
340+
"type": "Microsoft.Storage/storageAccounts/blobServices/containers",
341+
"apiVersion": "2019-06-01",
342+
"name": "[concat(variables('storageAccountName'), '/default/test-eventhub-consuming-batch')]",
343+
"dependsOn": [
344+
"[resourceId('Microsoft.Storage/storageAccounts/blobServices', variables('storageAccountName'), 'default')]",
345+
"[resourceId('Microsoft.Storage/storageAccounts', variables('storageAccountName'))]"
346+
],
347+
"properties": {
348+
"publicAccess": "None"
349+
}
350+
},
314351
{
315352
"type": "Microsoft.Storage/storageAccounts/blobServices/containers",
316353
"apiVersion": "2019-06-01",
@@ -374,4 +411,4 @@
374411
"value": "[listKeys(resourceId('Microsoft.EventHub/namespaces/authorizationRules', variables('eventHubsNamespaceName'), variables('eventHubsNamespaceKeyName')), '2017-04-01').primaryConnectionString]"
375412
}
376413
}
377-
}
414+
}

sdk/spring/azure-spring-integration-eventhubs/src/main/java/com/azure/spring/integration/eventhub/checkpoint/BatchCheckpointManager.java

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313
import org.springframework.util.Assert;
14+
import org.springframework.util.CollectionUtils;
1415

1516
import java.util.List;
1617
import java.util.concurrent.ConcurrentHashMap;
@@ -23,9 +24,9 @@
2324
public class BatchCheckpointManager extends CheckpointManager {
2425
private static final Logger LOG = LoggerFactory.getLogger(BatchCheckpointManager.class);
2526
private static final String CHECKPOINT_FAIL_MSG = "Consumer group '%s' failed to checkpoint offset %s of message "
26-
+ "%s on partition %s, last checkpointed message is %s";
27+
+ "on partition %s in batch mode";
2728
private static final String CHECKPOINT_SUCCESS_MSG =
28-
"Consumer group '%s' checkpointed offset %s of message %s on partition %s in %s " + "mode";
29+
"Consumer group '%s' succeed to checkpoint offset %s of message on partition %s in batch mode";
2930

3031
private final ConcurrentHashMap<String, EventData> lastEventByPartition = new ConcurrentHashMap<>();
3132

@@ -51,15 +52,16 @@ public void completeBatch(EventContext context) {
5152
}
5253

5354
public void onMessages(EventBatchContext context) {
54-
EventData lastEvent = getLastEnqueuedEvent(context);
55+
EventData lastEvent = getLastEventFromBatch(context);
56+
if (lastEvent == null) {
57+
return;
58+
}
5559
Long offset = lastEvent.getOffset();
60+
String partitionId = context.getPartitionContext().getPartitionId();
61+
String consumerGroup = context.getPartitionContext().getConsumerGroup();
5662
context.updateCheckpointAsync()
57-
.doOnError(t -> logCheckpointFail(context, offset, lastEvent,
58-
lastEventByPartition.get(context.getPartitionContext().getPartitionId()), t))
59-
.doOnSuccess(v -> {
60-
this.lastEventByPartition.put(context.getPartitionContext().getPartitionId(), lastEvent);
61-
logCheckpointSuccess(context, offset, lastEvent);
62-
})
63+
.doOnError(t -> logCheckpointFail(consumerGroup, partitionId, offset, t))
64+
.doOnSuccess(v -> logCheckpointSuccess(consumerGroup, partitionId, offset))
6365
.subscribe();
6466
}
6567

@@ -68,27 +70,25 @@ protected Logger getLogger() {
6870
return LOG;
6971
}
7072

71-
void logCheckpointFail(EventBatchContext context, Long offset, EventData lastEnqueuedEvent,
72-
EventData lastCheckpointedEvent, Throwable t) {
73-
if (getLogger().isWarnEnabled()) {
74-
getLogger().warn(String
75-
.format(CHECKPOINT_FAIL_MSG, context.getPartitionContext().getConsumerGroup(), offset,
76-
lastEnqueuedEvent, lastCheckpointedEvent, context.getPartitionContext().getPartitionId()), t);
77-
}
73+
void logCheckpointFail(String consumerGroup, String partitionId, Long offset, Throwable t) {
74+
getLogger().warn(String
75+
.format(CHECKPOINT_FAIL_MSG, consumerGroup, offset, partitionId), t);
7876
}
7977

80-
void logCheckpointSuccess(EventBatchContext context, Long offset, EventData lastEnqueuedEvent) {
78+
void logCheckpointSuccess(String consumerGroup, String partitionId, Long offset) {
8179
if (getLogger().isDebugEnabled()) {
8280
getLogger().debug(String
83-
.format(CHECKPOINT_SUCCESS_MSG, context.getPartitionContext().getConsumerGroup(), offset,
84-
lastEnqueuedEvent, context.getPartitionContext().getPartitionId(),
85-
this.checkpointConfig.getCheckpointMode()));
81+
.format(CHECKPOINT_SUCCESS_MSG, consumerGroup, offset, partitionId));
8682
}
8783
}
8884

89-
EventData getLastEnqueuedEvent(EventBatchContext context) {
85+
private EventData getLastEventFromBatch(EventBatchContext context) {
9086
List<EventData> events = context.getEvents();
91-
return events.get(events.size() - 1);
87+
if (CollectionUtils.isEmpty(events)) {
88+
return null;
89+
}
90+
EventData lastEvent = events.get(events.size() - 1);
91+
return lastEvent;
9292

9393
}
9494
}

0 commit comments

Comments
 (0)