Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
package io.smallrye.reactive.messaging.kafka.health;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;

import java.time.Duration;

import io.smallrye.mutiny.TimeoutException;
import io.smallrye.reactive.messaging.ChannelRegistry;
import io.smallrye.reactive.messaging.PausableChannel;
import io.smallrye.reactive.messaging.kafka.KafkaConsumerRebalanceListener;
import io.smallrye.reactive.messaging.providers.impl.InternalChannelRegistry;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

Expand All @@ -30,6 +36,73 @@ private KafkaMapBasedConfig getKafkaSourceConfig(String topic) {
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

@Test
public void testWithInitiallyPausedChannel() {
KafkaMapBasedConfig config = getKafkaSourceConfig(topic);
config
.put("pausable", "true")
.put("initially-paused", "true");
LazyConsumingBean bean = runApplication(config, LazyConsumingBean.class);

ProducerTask produced = companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, "key", i), 10);

produced.awaitCompletion(Duration.ofMinutes(1));

Multi<Integer> channel = bean.getChannel();
try {
channel.select().first().collect().asList().await().atMost(Duration.ofSeconds(5));
fail("We should not have consumed anything since the channel is paused");
} catch (TimeoutException e) {
// normal
}

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);
}

@Test
public void testWithPausedChannel() {
KafkaMapBasedConfig config = getKafkaSourceConfig(topic);
config
.put("pausable", "true");
LazyConsumingBean bean = runApplication(config, LazyConsumingBean.class);

ProducerTask produced = companion.produceIntegers().usingGenerator(i -> new ProducerRecord<>(topic, "key", i), 10);

await().until(() -> isStarted() && isReady() && isAlive());

produced.awaitCompletion(Duration.ofMinutes(1));

Multi<Integer> channel = bean.getChannel();
channel
.select().first(10)
.collect().asList()
.await().atMost(Duration.ofSeconds(10));

ChannelRegistry channelRegistry = getBeanManager().createInstance().select(ChannelRegistry.class).get();
PausableChannel pausableChannel = channelRegistry.getPausable("input");
pausableChannel.pause();

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);
}

@Test
public void testWithIncomingChannel() {
KafkaMapBasedConfig config = getKafkaSourceConfig(topic);
Expand Down Expand Up @@ -91,7 +164,7 @@ public void testWithReadinessDisabled() {

@Test
public void testWithMultipleSubscribedTopic() {
String[] topics = { topic, topic + "-1", topic + "-2" };
String[] topics = {topic, topic + "-1", topic + "-2"};
MapBasedConfig config = getKafkaSourceConfig(topic)
.with("topics", String.join(",", topics))
.without("mp.messaging.incoming.input.topic");
Expand Down Expand Up @@ -158,7 +231,7 @@ public void testWithTopicVerification() {

@Test
public void testWithTopicVerificationMultipleTopics() {
String[] topics = { topic, topic + "-1", topic + "-2" };
String[] topics = {topic, topic + "-1", topic + "-2"};
MapBasedConfig config = getKafkaSourceConfig(topic)
.with("health-topic-verification-enabled", true)
.with("topics", String.join(",", topics))
Expand Down Expand Up @@ -195,7 +268,7 @@ public void testWithTopicVerificationMultipleTopics() {

@Test
public void testWithTopicVerificationTopicsPattern() {
String[] topics = { topic, topic + "-1", topic + "-2" };
String[] topics = {topic, topic + "-1", topic + "-2"};

MapBasedConfig config = getKafkaSourceConfig(topic)
.with("health-topic-verification-enabled", true)
Expand Down
Loading