Skip to content

Commit 9fb778c

Browse files
committed
Add ConsumerSeekAware Example
1 parent c7a7829 commit 9fb778c

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

src/reference/asciidoc/kafka.adoc

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2000,6 +2000,75 @@ See <<idle-containers>> for how to enable idle container detection.
20002000

20012001
To arbitrarily seek at runtime, use the callback reference from the `registerSeekCallback` for the appropriate thread.
20022002

2003+
Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting `<Enter>` in the console causes all partitions to seek to the beginning.
2004+
2005+
====
2006+
[source, java]
2007+
----
2008+
@SpringBootApplication
2009+
public class SeekExampleApplication {
2010+
2011+
public static void main(String[] args) {
2012+
SpringApplication.run(SeekExampleApplication.class, args);
2013+
}
2014+
2015+
@Bean
2016+
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
2017+
return args -> {
2018+
IntStream.range(0, 10).forEach(i -> template.send(
2019+
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
2020+
while (true) {
2021+
System.in.read();
2022+
listener.seekToStart();
2023+
}
2024+
};
2025+
}
2026+
2027+
@Bean
2028+
public NewTopic topic() {
2029+
return new NewTopic("seekExample", 3, (short) 1);
2030+
}
2031+
2032+
}
2033+
2034+
@Component
2035+
class Listener implements ConsumerSeekAware {
2036+
2037+
2038+
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
2039+
2040+
2041+
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
2042+
2043+
private static final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
2044+
2045+
@Override
2046+
public void registerSeekCallback(ConsumerSeekCallback callback) {
2047+
callbackForThread.set(callback);
2048+
}
2049+
2050+
@Override
2051+
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
2052+
assignments.keySet().forEach(tp -> this.callbacks.put(tp, callbackForThread.get()));
2053+
}
2054+
2055+
@Override
2056+
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
2057+
}
2058+
2059+
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
2060+
public void listen(ConsumerRecord<String, String> in) {
2061+
logger.info(in.toString());
2062+
}
2063+
2064+
public void seekToStart() {
2065+
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
2066+
}
2067+
2068+
}
2069+
----
2070+
====
2071+
20032072
[[container-factory]]
20042073
===== Container factory
20052074

0 commit comments

Comments
 (0)