Skip to content

Commit 83c7e39

Browse files
committed
Fix MessageDrivenAdapterTests for fetch.min.bytes=46
The real content received by the Kafka consumer is: Record key bytes - The serialized bytes of each record's key Record value bytes - The serialized bytes of each record's value (the actual message payload) Record headers bytes - The serialized bytes of any headers attached to the record Record metadata overhead - Internal Kafka metadata for each record (offset, timestamp, etc.) Batch/compression overhead - Additional bytes from batch structures and compression metadata All of those are contributing to the number of bytes received. Since we don't have compression and headers, we calculate: `value + key + timestamp + offset`. In our test-case it is `23` per record.
1 parent 17beaef commit 83c7e39

File tree

1 file changed

+1
-1
lines changed

1 file changed

+1
-1
lines changed

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/inbound/MessageDrivenAdapterTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ protected boolean doSend(Message<?> message, long timeout) {
398398
void testInboundBatch(EmbeddedKafkaBroker embeddedKafka) throws Exception {
399399
Map<String, Object> props = KafkaTestUtils.consumerProps(embeddedKafka, "test2", true);
400400
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
401-
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 24);
401+
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 46);
402402
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 2000);
403403

404404
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);

0 commit comments

Comments
 (0)