diff --git a/api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java b/api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java index 82bada939..79f5db25f 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java +++ b/api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java @@ -44,7 +44,7 @@ protected TreeMap nextPollingRange(TreeMap result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition)); readToOffsets.forEach((tp, toOffset) -> { long tpStartOffset = seekOperations.getBeginOffsets().get(tp); diff --git a/api/src/main/java/io/kafbat/ui/emitter/ForwardEmitter.java b/api/src/main/java/io/kafbat/ui/emitter/ForwardEmitter.java index b111170e5..1f14bb47a 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/ForwardEmitter.java +++ b/api/src/main/java/io/kafbat/ui/emitter/ForwardEmitter.java @@ -44,7 +44,7 @@ protected TreeMap nextPollingRange(TreeMap result = new TreeMap<>(Comparator.comparingInt(TopicPartition::partition)); readFromOffsets.forEach((tp, fromOffset) -> { long tpEndOffset = seekOperations.getEndOffsets().get(tp); diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index 507404873..87d8c2a0b 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -242,7 +242,7 @@ public Flux loadMessages(KafkaCluster cluster, String topi cursor.deserializer(), cursor.consumerPosition(), cursor.filter(), - cursor.limit() + fixPageSize(cursor.limit()) ); } diff --git a/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java b/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java index 85ab75976..aaa2121ce 100644 --- a/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/RecordEmitterTest.java @@ -323,6 +323,48 @@ void backwardEmitterSeekToBegin() { ); } + @Test + void forwardEmitterCompletesWithZeroPageSize() { + var emitter = new ForwardEmitter( + this::createConsumer, + new ConsumerPosition(EARLIEST, TOPIC, List.of(), null, null), + 0, + RECORD_DESERIALIZER, + NOOP_FILTER, + PollingSettings.createDefault(), + CURSOR_MOCK + ); + + StepVerifier.create(Flux.create(emitter)) + .expectNextMatches(m -> TopicMessageEventDTO.TypeEnum.PHASE.equals(m.getType())) + .thenConsumeWhile(m -> TopicMessageEventDTO.TypeEnum.PHASE.equals(m.getType()) + || TopicMessageEventDTO.TypeEnum.MESSAGE.equals(m.getType())) + .expectNextMatches(m -> TopicMessageEventDTO.TypeEnum.DONE.equals(m.getType())) + .expectComplete() + .verify(); + } + + @Test + void backwardEmitterCompletesWithZeroPageSize() { + var emitter = new BackwardEmitter( + this::createConsumer, + new ConsumerPosition(LATEST, TOPIC, List.of(), null, null), + 0, + RECORD_DESERIALIZER, + NOOP_FILTER, + PollingSettings.createDefault(), + CURSOR_MOCK + ); + + StepVerifier.create(Flux.create(emitter)) + .expectNextMatches(m -> TopicMessageEventDTO.TypeEnum.PHASE.equals(m.getType())) + .thenConsumeWhile(m -> TopicMessageEventDTO.TypeEnum.PHASE.equals(m.getType()) + || TopicMessageEventDTO.TypeEnum.MESSAGE.equals(m.getType())) + .expectNextMatches(m -> TopicMessageEventDTO.TypeEnum.DONE.equals(m.getType())) + .expectComplete() + .verify(); + } + private void expectEmitter(Consumer> emitter, List expectedValues) { expectEmitter(emitter, expectedValues.size(),