Skip to content

Commit 353ce9e

Browse files
authored
GH-2673: Add Mock Consumer and Producer Factories
Resolves #2673 Also remove an unnecessary call to `offsetsForTimes` which is not supported by `MockConsumer`. * Doc polishing.
1 parent 6f58505 commit 353ce9e

File tree

8 files changed

+477
-34
lines changed

8 files changed

+477
-34
lines changed

spring-kafka-docs/src/main/asciidoc/testing.adoc

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,3 +624,71 @@ received = records.poll(10, TimeUnit.SECONDS);
624624
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
625625
----
626626
====
627+
628+
[[mock-cons-prod]]
629+
==== Mock Consumer and Producer
630+
631+
The `kafka-clients` library provides `MockConsumer` and `MockProducer` classes for testing purposes.
632+
633+
If you wish to use these classes in some of your tests with listener containers or `KafkaTemplate` respectively, starting with version 3.0.7, the framework now provides `MockConsumerFactory` and `MockProducerFactory` implementations.
634+
635+
These factories can be used in the listener container and template instead of the default factories, which require a running (or embedded) broker.
636+
637+
Here is an example of a simple implementation returning a single consumer:
638+
639+
====
640+
[source, java]
641+
----
642+
@Bean
643+
ConsumerFactory<String, String> consumerFactory() {
644+
MockConsumer<String, String> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
645+
TopicPartition topicPartition0 = new TopicPartition("topic", 0);
646+
List<TopicPartition> topicPartitions = Arrays.asList(topicPartition0);
647+
Map<TopicPartition, Long> beginningOffsets = topicPartitions.stream().collect(Collectors
648+
.toMap(Function.identity(), tp -> 0L));
649+
consumer.updateBeginningOffsets(beginningOffsets);
650+
consumer.schedulePollTask(() -> {
651+
consumer.addRecord(
652+
new ConsumerRecord<>("topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test1",
653+
new RecordHeaders(), Optional.empty()));
654+
consumer.addRecord(
655+
new ConsumerRecord<>("topic", 0, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "test2",
656+
new RecordHeaders(), Optional.empty()));
657+
});
658+
return new MockConsumerFactory(() -> consumer);
659+
}
660+
----
661+
====
662+
663+
If you wish to test with concurrency, the `Supplier` lambda in the factory's constructor would need create a new instance each time.
664+
665+
With the `MockProducerFactory`, there are two constructors; one to create a simple factory, and one to create factory that supports transactions.
666+
667+
Here are examples:
668+
669+
====
670+
[source, java]
671+
----
672+
@Bean
673+
ProducerFactory<String, String> nonTransFactory() {
674+
return new MockProducerFactory<>(() ->
675+
new MockProducer<>(true, new StringSerializer(), new StringSerializer()));
676+
}
677+
678+
@Bean
679+
ProducerFactory<String, String> transFactory() {
680+
MockProducer<String, String> mockProducer =
681+
new MockProducer<>(true, new StringSerializer(), new StringSerializer());
682+
mockProducer.initTransactions();
683+
return new MockProducerFactory<String, String>((tx, id) -> mockProducer, "defaultTxId");
684+
}
685+
----
686+
====
687+
688+
Notice in the second case, the lambda is a `BiFunction<Boolean, String>` where the first parameter is true if the caller wants a transactional producer; the optional second parameter contains the transactional id.
689+
This can be the default (as provided in the constructor), or can be overridden by the `KafkaTransactionManager` (or `KafkaTemplate` for local transactions), if so configured.
690+
The transactional id is provided in case you wish to use a different `MockProducer` based on this value.
691+
692+
If you are using producers in a multi-threaded environment, the `BiFunction` should return multiple producers (perhaps thread-bound using a `ThreadLocal`).
693+
694+
IMPORTANT: Transactional `MockProducer` s must be initialized for transactions by calling `initTransaction()`.

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,9 @@ Four constants in `KafkaHeaders` that were deprecated in 2.9.x have now been rem
9898
* Instead of `PARTITION_ID`, use `PARTITION`
9999

100100
Similarly, `RECEIVED_MESSAGE_KEY` is replaced by `RECEIVED_KEY` and `RECEIVED_PARTITION_ID` is replaced by `RECEIVED_PARTITION`.
101+
102+
[[x30-testing]]
103+
==== Testing Changes
104+
105+
Version 3.0.7 introduced a `MockConsumerFactory` and `MockProducerFactory`.
106+
See <<mock-cons-prod>> for more information.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3225,15 +3225,17 @@ private void initPartitionsIfNeeded() {
32253225
Map<TopicPartition, Long> times = partitions.entrySet().stream()
32263226
.filter(e -> SeekPosition.TIMESTAMP.equals(e.getValue().seekPosition))
32273227
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().offset));
3228-
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(times);
3229-
offsetsForTimes.forEach((tp, off) -> {
3230-
if (off == null) {
3231-
ends.add(tp);
3232-
}
3233-
else {
3234-
partitions.put(tp, new OffsetMetadata(off.offset(), false, SeekPosition.TIMESTAMP));
3235-
}
3236-
});
3228+
if (!times.isEmpty()) {
3229+
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(times);
3230+
offsetsForTimes.forEach((tp, off) -> {
3231+
if (off == null) {
3232+
ends.add(tp);
3233+
}
3234+
else {
3235+
partitions.put(tp, new OffsetMetadata(off.offset(), false, SeekPosition.TIMESTAMP));
3236+
}
3237+
});
3238+
}
32373239
doInitialSeeks(partitions, beginnings, ends);
32383240
if (this.consumerSeekAwareListener != null) {
32393241
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.mock;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.Properties;
22+
import java.util.function.Supplier;
23+
24+
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.clients.consumer.MockConsumer;
26+
27+
import org.springframework.kafka.core.ConsumerFactory;
28+
import org.springframework.lang.Nullable;
29+
30+
/**
31+
* Support the use of {@link MockConsumer} in tests.
32+
*
33+
* @param <K> the key type.
34+
* @param <V> the value type.
35+
*
36+
* @author Gary Russell
37+
* @since 3.0.7
38+
*
39+
*/
40+
public class MockConsumerFactory<K, V> implements ConsumerFactory<K, V> {
41+
42+
private final Supplier<MockConsumer> consumerProvider;
43+
44+
/**
45+
* Create an instance with the supplied consumer provicer.
46+
* @param consumerProvider the consumer provider.
47+
*/
48+
public MockConsumerFactory(Supplier<MockConsumer> consumerProvider) {
49+
this.consumerProvider = consumerProvider;
50+
}
51+
52+
@Override
53+
public Map<String, Object> getConfigurationProperties() {
54+
return Collections.emptyMap();
55+
}
56+
57+
@Override
58+
public Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
59+
@Nullable String clientIdSuffix) {
60+
61+
return this.consumerProvider.get();
62+
}
63+
64+
@Override
65+
public Consumer<K, V> createConsumer(@Nullable String groupId, @Nullable String clientIdPrefix,
66+
@Nullable String clientIdSuffix, @Nullable Properties properties) {
67+
68+
return this.consumerProvider.get();
69+
}
70+
71+
@Override
72+
public boolean isAutoCommit() {
73+
return false;
74+
}
75+
76+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.mock;
18+
19+
import java.util.function.BiFunction;
20+
import java.util.function.Supplier;
21+
22+
import org.apache.kafka.clients.producer.MockProducer;
23+
import org.apache.kafka.clients.producer.Producer;
24+
25+
import org.springframework.kafka.core.ProducerFactory;
26+
import org.springframework.lang.Nullable;
27+
28+
/**
29+
* Support the use of {@link MockProducer} in tests.
30+
*
31+
* @param <K> the key type.
32+
* @param <V> the value type.
33+
*
34+
* @author Gary Russell
35+
* @since 3.0.7
36+
*
37+
*/
38+
public class MockProducerFactory<K, V> implements ProducerFactory<K, V> {
39+
40+
private final BiFunction<Boolean, String, MockProducer<K, V>> producerProvider;
41+
42+
@Nullable
43+
private final String defaultTxId;
44+
45+
private final boolean transactional;
46+
47+
/**
48+
* Create an instance that does not support transactional producers.
49+
* @param producerProvider a {@link Supplier} for a {@link MockProducer}.
50+
*/
51+
public MockProducerFactory(Supplier<MockProducer> producerProvider) {
52+
this.producerProvider = (tx, id) -> producerProvider.get();
53+
this.defaultTxId = null;
54+
this.transactional = false;
55+
}
56+
57+
/**
58+
* Create an instance that supports transactions, with the supplied producer provider {@link BiFunction}. The
59+
* function has two parameters, a boolean indicating whether a transactional producer
60+
* is being requested and, if true, the transaction id prefix for that producer.
61+
* @param producerProvider the provider function.
62+
* @param defaultTxId the default transactional id.
63+
*/
64+
public MockProducerFactory(BiFunction<Boolean, String, MockProducer<K, V>> producerProvider,
65+
@Nullable String defaultTxId) {
66+
67+
this.producerProvider = producerProvider;
68+
this.defaultTxId = defaultTxId;
69+
this.transactional = true;
70+
}
71+
72+
@Override
73+
public boolean transactionCapable() {
74+
return this.transactional;
75+
}
76+
77+
@Override
78+
public Producer<K, V> createProducer() {
79+
return createProducer(this.defaultTxId);
80+
}
81+
82+
@Override
83+
public Producer<K, V> createProducer(@Nullable String txIdPrefix) {
84+
return txIdPrefix == null && this.defaultTxId == null
85+
? this.producerProvider.apply(false, null)
86+
: this.producerProvider.apply(true, txIdPrefix == null ? this.defaultTxId : txIdPrefix);
87+
}
88+
89+
@Override
90+
public Producer<K, V> createNonTransactionalProducer() {
91+
return this.producerProvider.apply(false, null);
92+
}
93+
94+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/**
2+
* Provides classes to support the use of MockConsumer and MockProducer.
3+
*/
4+
package org.springframework.kafka.mock;

0 commit comments

Comments
 (0)