Kafka & Testing #40941
-
Hi, I'm trying to implement an API that publishes messages to Kafka. I use this as a reference: https://quarkus.io/guides/kafka#testing-using-a-kafka-broker. So, my API looks like this: import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.jboss.resteasy.reactive.ResponseStatus;
import org.jboss.resteasy.reactive.RestHeader;
import org.jboss.resteasy.reactive.RestQuery;
import java.time.Instant;
import java.util.Base64;
import java.util.Optional;
import java.util.UUID;
@Path("/foo")
public class IngestResource {
@Inject
@Channel("ingestion-raw")
Emitter<Msg> emitter;
record Msg(
Instant time,
String userId,
String userMessageId,
byte[] payload
) {
}
@POST
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
@ResponseStatus(200)
public void upload(
@RestHeader("userid") String userId,
@RestQuery("userMessageId") Optional<String> userMessageId,
byte[] incoming) {
emitter.send(new Msg(
Instant.now(),
userId,
userMessageId.orElseGet(() -> UUID.randomUUID().toString()),
Base64.getEncoder().encode(incoming))).thenApply(ignored -> null);
}
} My test: import io.quarkus.logging.Log;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.kafka.InjectKafkaCompanion;
import io.quarkus.test.kafka.KafkaCompanionResource;
import io.restassured.http.ContentType;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import jakarta.ws.rs.core.MediaType;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.junit.jupiter.api.Test;
import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
@QuarkusTest
@QuarkusTestResource(KafkaCompanionResource.class)
public class IngestResourceTests {
@InjectKafkaCompanion
KafkaCompanion kafkaCompanion;
@Test
void testIngestion() {
var response = given().when()
.contentType(ContentType.fromContentType(MediaType.APPLICATION_OCTET_STREAM))
.body("test".getBytes())
.post("/foo");
response.then().statusCode(200);
Log.infof("Topics: {%s}", kafkaCompanion.topics().list());
var result = kafkaCompanion.consumeStrings()
.withOffsetReset(OffsetResetStrategy.EARLIEST)
.fromTopics("whatever")
.awaitCompletion();
assertThat(result.count()).isOne();
}
} Unfortunately, this approach fails, as the test never consumes the message I expected. |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
/cc @alesj (kafka), @cescoffier (kafka), @ozangunalp (kafka) |
Beta Was this translation helpful? Give feedback.
-
When using Kafka companion consumer you either need to pass how many records you want to receive with In your consuming example the Hope this helps. |
Beta Was this translation helpful? Give feedback.
When using Kafka companion consumer you either need to pass how many records you want to receive with
.fromTopics("whatever", 1)
or if you'd like to keep the consumer polling, need to await records accordingly using methods like.awaitNextRecord()
.In your consuming example the
awaitCompletion
would timeout after 10 seconds (the default timeout).You of course need to check that you are consuming from the same topic you are producing to.
Hope this helps.