@@ -2017,11 +2017,14 @@ Create a Quarkus Test using the test resource created above:
2017
2017
2018
2018
[source, java]
2019
2019
----
2020
+ import static org.awaitility.Awaitility.await;
2021
+
2020
2022
@QuarkusTest
2021
2023
@QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
2022
2024
class BaristaTest {
2023
2025
2024
2026
@Inject
2027
+ @Connector("smallrye-in-memory")
2025
2028
InMemoryConnector connector; // <1>
2026
2029
2027
2030
@Test
@@ -2054,6 +2057,74 @@ class BaristaTest {
2054
2057
The application will process this message and send a message to `beverages` channel.
2055
2058
<5> Use the `received` method on `beverages` channel to check the messages produced by the application.
2056
2059
2060
+ If your Kafka consumer is batch based, you will need to send a batch of messages to the channel as by creating them manually.
2061
+
2062
+ For instance:
2063
+
2064
+ [source, java]
2065
+ ----
2066
+ @ApplicationScoped
2067
+ public class BeverageProcessor {
2068
+
2069
+ @Incoming("orders")
2070
+ CompletionStage<Void> process(KafkaRecordBatch<String, Order> orders) {
2071
+ System.out.println("Order received " + orders.getPayload().size());
2072
+ return orders.ack();
2073
+ }
2074
+ }
2075
+ ----
2076
+
2077
+ [source, java]
2078
+ ----
2079
+ import static org.awaitility.Awaitility.await;
2080
+
2081
+ @QuarkusTest
2082
+ @QuarkusTestResource(KafkaTestResourceLifecycleManager.class)
2083
+ class BaristaTest {
2084
+
2085
+ @Inject
2086
+ @Connector("smallrye-in-memory")
2087
+
2088
+ InMemoryConnector connector;
2089
+
2090
+ @Test
2091
+ void testProcessOrder() {
2092
+ InMemorySource<IncomingKafkaRecordBatch<String, Order>> ordersIn = connector.source("orders");
2093
+ var committed = new AtomicBoolean(false); // <1>
2094
+ var commitHandler = new KafkaCommitHandler() {
2095
+ @Override
2096
+ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record) {
2097
+ committed.set(true); // <2>
2098
+ return null;
2099
+ }
2100
+ };
2101
+ var failureHandler = new KafkaFailureHandler() {
2102
+ @Override
2103
+ public <K, V> Uni<Void> handle(IncomingKafkaRecord<K, V> record, Throwable reason, Metadata metadata) {
2104
+ return null;
2105
+ }
2106
+ };
2107
+
2108
+ Order order = new Order();
2109
+ order.setProduct("coffee");
2110
+ order.setName("Coffee lover");
2111
+ order.setOrderId("1234");
2112
+ var record = new ConsumerRecord<>("topic", 0, 0, "key", order);
2113
+ var records = new ConsumerRecords<>(Map.of(new TopicPartition("topic", 1), List.of(record)));
2114
+ var batch = new IncomingKafkaRecordBatch<>(
2115
+ records, "kafka", 0, commitHandler, failureHandler, false, false); // <3>
2116
+
2117
+ ordersIn.send(batch);
2118
+
2119
+ await().until(committed::get); // <4>
2120
+ }
2121
+ }
2122
+ ----
2123
+ <1> Create an `AtomicBoolean` to track if the batch has been committed.
2124
+ <2> Update `committed` when the batch is committed.
2125
+ <3> Create a `IncomingKafkaRecordBatch` with a single record.
2126
+ <4> Wait until the batch is committed.
2127
+
2057
2128
[IMPORTANT]
2058
2129
====
2059
2130
With in-memory channels we were able to test application code processing messages without starting a Kafka broker.
0 commit comments