Skip to content

Commit c83171e

Browse files
committed
Add dedup/idempotency test
1 parent 56b2b52 commit c83171e

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed
Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
// Copyright (c) 2025 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
5+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
6+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
7+
// please see LICENSE-APACHE2.
8+
//
9+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
10+
// either express or implied. See the LICENSE file for specific language governing
11+
// rights and limitations of this software.
12+
//
13+
// If you have any questions regarding licensing, please contact us at
14+
15+
package com.rabbitmq.stream.impl;
16+
17+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
18+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
19+
import static com.rabbitmq.stream.impl.TestUtils.waitUntilStable;
20+
import static java.util.stream.Collectors.toList;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
23+
import com.rabbitmq.stream.ConfirmationHandler;
24+
import com.rabbitmq.stream.Consumer;
25+
import com.rabbitmq.stream.Environment;
26+
import com.rabbitmq.stream.EnvironmentBuilder;
27+
import com.rabbitmq.stream.Message;
28+
import com.rabbitmq.stream.MessageHandler;
29+
import com.rabbitmq.stream.OffsetSpecification;
30+
import com.rabbitmq.stream.Producer;
31+
import com.rabbitmq.stream.SubscriptionListener;
32+
import io.netty.channel.EventLoopGroup;
33+
import java.time.Duration;
34+
import java.util.ArrayList;
35+
import java.util.Collections;
36+
import java.util.List;
37+
import java.util.Set;
38+
import java.util.concurrent.ConcurrentHashMap;
39+
import java.util.concurrent.CountDownLatch;
40+
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.concurrent.atomic.AtomicLong;
43+
import java.util.function.Supplier;
44+
import java.util.stream.Collectors;
45+
import java.util.stream.IntStream;
46+
import org.junit.jupiter.api.BeforeEach;
47+
import org.junit.jupiter.api.Test;
48+
49+
@StreamTestInfrastructure
50+
public class DedupIdempotencyTest {
51+
52+
String stream;
53+
Environment environment;
54+
EventLoopGroup eventLoopGroup;
55+
56+
@BeforeEach
57+
void init() {
58+
EnvironmentBuilder environmentBuilder =
59+
Environment.builder().netty().eventLoopGroup(eventLoopGroup).environmentBuilder();
60+
environment = environmentBuilder.build();
61+
}
62+
63+
@Test
64+
void publishBatchWithDedup() {
65+
Producer producer =
66+
environment.producerBuilder().name("app1").confirmTimeout(Duration.ZERO).stream(stream)
67+
.build();
68+
69+
// message ID, must be a strictly increasing sequence
70+
AtomicLong sequence = new AtomicLong();
71+
int batchSize = 50;
72+
// batch of messages, publishing ID and message ID set
73+
// publishing ID is mandatory, the message ID can be set
74+
// anywhere in the message (e.g. in application properties)
75+
List<Message> batch =
76+
IntStream.range(0, batchSize)
77+
.mapToObj(
78+
ignored -> {
79+
long id = sequence.getAndIncrement();
80+
return producer
81+
.messageBuilder()
82+
.publishingId(id)
83+
.properties()
84+
.messageId(id)
85+
.messageBuilder()
86+
.build();
87+
})
88+
.collect(toList());
89+
90+
boolean batchSentAndConfirmed = false;
91+
int attempts = 0;
92+
// publishing the batch of messages
93+
// we can retry as many times as we want,
94+
// the broker takes care of deduplicating messages
95+
// it is not suboptimal to retry the whole batch: failures are rare
96+
while (!batchSentAndConfirmed && attempts < 5) {
97+
attempts++;
98+
try {
99+
batchSentAndConfirmed = publishBatch(producer, batch);
100+
} catch (Exception e) {
101+
// keep retrying...
102+
}
103+
}
104+
assertThat(batchSentAndConfirmed).isTrue();
105+
106+
assertThat(streamMessageCount(stream, environment)).isEqualTo(batchSize);
107+
108+
// just checking dedup
109+
batchSentAndConfirmed = publishBatch(producer, batch);
110+
assertThat(batchSentAndConfirmed).isTrue();
111+
assertThat(streamMessageCount(stream, environment)).isEqualTo(batchSize);
112+
}
113+
114+
@Test
115+
void externalOffsetTrackingIdempotentProcessing() throws Exception {
116+
AtomicLong sequence = new AtomicLong();
117+
Producer producer = environment.producerBuilder().stream(stream).build();
118+
int messageCount = 50;
119+
// a batch of messages to publish
120+
// we don't use dedup to demonstrate the consumer idempotency
121+
// (we'll publish the batch several times)
122+
List<Message> messages =
123+
IntStream.range(0, messageCount)
124+
.mapToObj(
125+
ignored ->
126+
producer
127+
.messageBuilder()
128+
.properties()
129+
.messageId(sequence.getAndIncrement())
130+
.messageBuilder()
131+
.build())
132+
.collect(Collectors.toList());
133+
Runnable publish =
134+
() -> {
135+
CountDownLatch latch = new CountDownLatch(messageCount);
136+
messages.forEach(msg -> producer.send(msg, s -> latch.countDown()));
137+
assertThat(latchAssert(latch)).completes();
138+
};
139+
publish.run();
140+
141+
// data structures for storage
142+
// we assume they can be updated atomically, like tables in a relational database
143+
// this tracks processed messages
144+
Set<Long> processed = ConcurrentHashMap.newKeySet(messageCount);
145+
// this simulates the actual business processing
146+
List<Long> database = Collections.synchronizedList(new ArrayList<>(messageCount));
147+
// this is for offset tracking
148+
AtomicLong lastProcessedOffset = new AtomicLong(-1);
149+
150+
// tracks the number of received messages
151+
AtomicInteger messageReceivedCount = new AtomicInteger();
152+
153+
MessageHandler handler =
154+
(ctx, msg) -> {
155+
long id = msg.getProperties().getMessageIdAsLong();
156+
// this is where a database transaction should start
157+
// checking whether the message has been processed
158+
if (processed.add(id)) {
159+
// not processed yet, processing
160+
database.add(id);
161+
}
162+
lastProcessedOffset.set(ctx.offset());
163+
// this is where the database transaction should commit
164+
// increments the counter, just for the test assertions
165+
messageReceivedCount.incrementAndGet();
166+
};
167+
168+
// for test assertions
169+
AtomicLong subscriptionStart = new AtomicLong();
170+
// for the external offset tracking:
171+
SubscriptionListener subscriptionListener =
172+
ctx -> {
173+
// we hit the data structure that tracks the last processed
174+
// to restart where we left off
175+
long offset = lastProcessedOffset.get() == -1 ? 0 : lastProcessedOffset.get() + 1;
176+
ctx.offsetSpecification(OffsetSpecification.offset(offset));
177+
subscriptionStart.set(offset);
178+
};
179+
180+
Supplier<Consumer> consumerSupplier =
181+
() ->
182+
environment.consumerBuilder().stream(stream)
183+
.messageHandler(handler)
184+
.subscriptionListener(subscriptionListener)
185+
.build();
186+
187+
// we start the consumer, there is already a batch of messages in the stream
188+
Consumer consumer = consumerSupplier.get();
189+
190+
// make sure we started at the beginning of the stream
191+
assertThat(subscriptionStart).hasValue(0);
192+
193+
// we got all the messages
194+
waitAtMost(() -> messageReceivedCount.get() == messageCount);
195+
assertThat(processed).hasSize(messageCount);
196+
assertThat(database).hasSize(messageCount);
197+
198+
// we publish the batch again, this simulates duplicates
199+
publish.run();
200+
201+
// the counter counts all the messages
202+
waitAtMost(() -> messageReceivedCount.get() == 2 * messageCount);
203+
// no duplicated processing
204+
assertThat(processed).hasSize(messageCount);
205+
assertThat(database).hasSize(messageCount);
206+
207+
consumer.close();
208+
209+
// we restart the consumer
210+
consumer = consumerSupplier.get();
211+
// make sure we started where we left off
212+
assertThat(subscriptionStart).hasPositiveValue().hasValue(lastProcessedOffset.get() + 1);
213+
214+
// position before we publish the batch a 3rd time
215+
long previousProcessedOffset = lastProcessedOffset.get();
216+
217+
// publish the batch again, simulating duplicates
218+
publish.run();
219+
220+
// the counter counted the 3 batches
221+
waitAtMost(() -> messageReceivedCount.get() == 3 * messageCount);
222+
// no duplicated processing though
223+
assertThat(processed).hasSize(messageCount);
224+
assertThat(database).hasSize(messageCount);
225+
// the offset position moved forward
226+
assertThat(lastProcessedOffset).hasValueGreaterThan(previousProcessedOffset);
227+
228+
consumer.close();
229+
}
230+
231+
private static boolean publishBatch(Producer producer, List<Message> batch) {
232+
CountDownLatch batchLatch = new CountDownLatch(batch.size());
233+
ConfirmationHandler confirmationHandler = status -> batchLatch.countDown();
234+
235+
for (Message message : batch) {
236+
producer.send(message, confirmationHandler);
237+
}
238+
239+
try {
240+
return batchLatch.await(10, TimeUnit.SECONDS);
241+
} catch (InterruptedException e) {
242+
return false;
243+
}
244+
}
245+
246+
private static int streamMessageCount(String stream, Environment env) {
247+
AtomicInteger messageReceived = new AtomicInteger();
248+
Consumer consumer =
249+
env.consumerBuilder().stream(stream)
250+
.offset(OffsetSpecification.first())
251+
.messageHandler(
252+
(ctx, msg) -> {
253+
messageReceived.incrementAndGet();
254+
})
255+
.build();
256+
257+
waitUntilStable(messageReceived::longValue);
258+
consumer.close();
259+
return messageReceived.get();
260+
}
261+
}

0 commit comments

Comments
 (0)