-
Notifications
You must be signed in to change notification settings - Fork 16
Closed
Description
Hi folks,
I'm working on the rabbit-stream client in Rust, specifically on feature message batching. I have this behavior in my branch where when automatic dedup is enabled (by setting the producer name) it happens that some messages are not delivered to the consumer even though the confirm status is confirmed. I was able to reproduce the same behavior with the Java client.
My question is in case of message deduplication does the producer needs to be single-thread or manually protected in a multi-thread environment?
I'm attaching for reference the test that i did for reproducing it. Thanks
@Test
void sendAndConsume() throws Exception {
int batchSize = 10;
int messageCount = batchSize;
CountDownLatch publishLatch = new CountDownLatch(messageCount);
Producer producer = environment.producerBuilder().stream(stream).name("sendAndConsume").batchSize(batchSize)
.build();
AtomicLong count = new AtomicLong(0);
ExecutorService service = Executors.newFixedThreadPool(batchSize);
IntStream.range(0, messageCount).forEach(i -> {
service.execute(() -> {
producer.send(producer.messageBuilder().addData("".getBytes()).build(), confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
count.incrementAndGet();
publishLatch.countDown();
}
});
});
});
boolean completed = publishLatch.await(10, TimeUnit.SECONDS);
assertThat(completed).isTrue();
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
AtomicLong chunkTimestamp = new AtomicLong();
Consumer consumer = environment.consumerBuilder().stream(stream).offset(OffsetSpecification.first())
.messageHandler((context, message) -> {
chunkTimestamp.set(context.timestamp());
consumeLatch.countDown();
}).build();
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(chunkTimestamp.get()).isNotZero();
consumer.close();
}Metadata
Metadata
Assignees
Labels
No labels