Skip to content

Commit c8cf57c

Browse files
authored
Seek Improvements
- add convenience methods to `AbstractConsumerSeekAware` - compile tests with Java 11
1 parent 8ae9a8d commit c8cf57c

File tree

6 files changed

+239
-6
lines changed

6 files changed

+239
-6
lines changed

.travis.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
dist: trusty
1+
dist: bionic
22
language: java
3-
jdk: oraclejdk8
3+
jdk: openjdk11
44
install: true
55
before_cache:
66
- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ subprojects { subproject ->
133133
targetCompatibility = 1.8
134134
}
135135

136+
compileTestJava {
137+
sourceCompatibility = 11
138+
}
139+
136140
compileTestKotlin {
137141
kotlinOptions {
138142
jvmTarget = '1.8'

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import java.util.Collection;
2020
import java.util.Collections;
21+
import java.util.LinkedList;
22+
import java.util.List;
2123
import java.util.Map;
2224
import java.util.concurrent.ConcurrentHashMap;
2325

@@ -40,6 +42,8 @@ public abstract class AbstractConsumerSeekAware implements ConsumerSeekAware {
4042

4143
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
4244

45+
private final Map<ConsumerSeekCallback, List<TopicPartition>> callbacksToTopic = new ConcurrentHashMap<>();
46+
4347
@Override
4448
public void registerSeekCallback(ConsumerSeekCallback callback) {
4549
this.callbackForThread.set(callback);
@@ -49,13 +53,27 @@ public void registerSeekCallback(ConsumerSeekCallback callback) {
4953
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
5054
ConsumerSeekCallback threadCallback = this.callbackForThread.get();
5155
if (threadCallback != null) {
52-
assignments.keySet().forEach(tp -> this.callbacks.put(tp, threadCallback));
56+
assignments.keySet().forEach(tp -> {
57+
this.callbacks.put(tp, threadCallback);
58+
this.callbacksToTopic.computeIfAbsent(threadCallback, key -> new LinkedList<>()).add(tp);
59+
});
5360
}
5461
}
5562

5663
@Override
5764
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
58-
partitions.forEach(tp -> this.callbacks.remove(tp));
65+
partitions.forEach(tp -> {
66+
ConsumerSeekCallback removed = this.callbacks.remove(tp);
67+
if (removed != null) {
68+
List<TopicPartition> topics = this.callbacksToTopic.get(removed);
69+
if (topics != null) {
70+
topics.remove(tp);
71+
if (topics.size() == 0) {
72+
this.callbacksToTopic.remove(removed);
73+
}
74+
}
75+
}
76+
});
5977
}
6078

6179
@Override
@@ -81,4 +99,38 @@ protected Map<TopicPartition, ConsumerSeekCallback> getSeekCallbacks() {
8199
return Collections.unmodifiableMap(this.callbacks);
82100
}
83101

102+
/**
103+
* Return the currently registered callbacks and their associated {@link TopicPartition}(s).
104+
* @return the map of callbacks and partitions.
105+
* @since 2.6
106+
*/
107+
protected Map<ConsumerSeekCallback, List<TopicPartition>> getCallbacksAndTopics() {
108+
return Collections.unmodifiableMap(this.callbacksToTopic);
109+
}
110+
111+
/**
112+
* Seek all assigned partitions to the beginning.
113+
* @since 2.6
114+
*/
115+
public void seekToBeginning() {
116+
getCallbacksAndTopics().forEach((cb, topics) -> cb.seekToBeginning(topics));
117+
}
118+
119+
/**
120+
* Seek all assigned partitions to the end.
121+
* @since 2.6
122+
*/
123+
public void seekToEnd() {
124+
getCallbacksAndTopics().forEach((cb, topics) -> cb.seekToEnd(topics));
125+
}
126+
127+
/**
128+
* Seek all assigned partitions to the offset represented by the timestamp.
129+
* @param time the time to seek to.
130+
* @since 2.6
131+
*/
132+
public void seekToTimestamp(long time) {
133+
getCallbacksAndTopics().forEach((cb, topcis) -> cb.seekToTimestamp(topcis, time));
134+
}
135+
84136
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.verify;
22+
23+
import java.util.Collections;
24+
import java.util.LinkedHashMap;
25+
import java.util.LinkedList;
26+
import java.util.Map;
27+
import java.util.concurrent.Callable;
28+
import java.util.concurrent.Executors;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
import org.apache.kafka.common.TopicPartition;
32+
import org.junit.jupiter.api.Test;
33+
34+
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
35+
import org.springframework.kafka.test.utils.KafkaTestUtils;
36+
37+
/**
38+
* @author Gary Russell
39+
* @since 2.6
40+
*
41+
*/
42+
public class ConsumerSeekAwareTests {
43+
44+
@SuppressWarnings("unchecked")
45+
@Test
46+
void beginningEndAndBulkSeekToTimestamp() throws Exception {
47+
class CSA extends AbstractConsumerSeekAware {
48+
}
49+
AbstractConsumerSeekAware csa = new CSA();
50+
var exec1 = Executors.newSingleThreadExecutor();
51+
var exec2 = Executors.newSingleThreadExecutor();
52+
var cb1 = mock(ConsumerSeekCallback.class);
53+
var cb2 = mock(ConsumerSeekCallback.class);
54+
var first = new AtomicBoolean(true);
55+
var map1 = new LinkedHashMap<>(Map.of(new TopicPartition("foo", 0), 0L, new TopicPartition("foo", 1), 0L));
56+
var map2 = new LinkedHashMap<>(Map.of(new TopicPartition("foo", 2), 0L, new TopicPartition("foo", 3), 0L));
57+
var register = new Callable<Void>() {
58+
59+
@Override
60+
public Void call() {
61+
if (first.getAndSet(false)) {
62+
csa.registerSeekCallback(cb1);
63+
csa.onPartitionsAssigned(map1, null);
64+
}
65+
else {
66+
csa.registerSeekCallback(cb2);
67+
csa.onPartitionsAssigned(map2, null);
68+
}
69+
return null;
70+
}
71+
72+
};
73+
exec1.submit(register).get();
74+
exec2.submit(register).get();
75+
csa.seekToBeginning();
76+
verify(cb1).seekToBeginning(new LinkedList<>(map1.keySet()));
77+
verify(cb2).seekToBeginning(new LinkedList<>(map2.keySet()));
78+
csa.seekToEnd();
79+
verify(cb1).seekToEnd(new LinkedList<>(map1.keySet()));
80+
verify(cb2).seekToEnd(new LinkedList<>(map2.keySet()));
81+
csa.seekToTimestamp(42L);
82+
verify(cb1).seekToTimestamp(new LinkedList<>(map1.keySet()), 42L);
83+
verify(cb2).seekToTimestamp(new LinkedList<>(map2.keySet()), 42L);
84+
var revoke1 = new Callable<Void>() {
85+
86+
@Override
87+
public Void call() {
88+
if (!first.getAndSet(true)) {
89+
csa.onPartitionsRevoked(Collections.singletonList(map1.keySet().iterator().next()));
90+
}
91+
else {
92+
csa.onPartitionsRevoked(Collections.singletonList(map2.keySet().iterator().next()));
93+
}
94+
return null;
95+
}
96+
97+
};
98+
exec1.submit(revoke1).get();
99+
exec2.submit(revoke1).get();
100+
map1.remove(map1.keySet().iterator().next());
101+
map2.remove(map2.keySet().iterator().next());
102+
csa.seekToTimestamp(43L);
103+
verify(cb1).seekToTimestamp(new LinkedList<>(map1.keySet()), 43L);
104+
verify(cb2).seekToTimestamp(new LinkedList<>(map2.keySet()), 43L);
105+
var revoke2 = new Callable<Void>() {
106+
107+
@Override
108+
public Void call() {
109+
if (first.getAndSet(false)) {
110+
csa.onPartitionsRevoked(Collections.singletonList(map1.keySet().iterator().next()));
111+
}
112+
else {
113+
csa.onPartitionsRevoked(Collections.singletonList(map2.keySet().iterator().next()));
114+
}
115+
return null;
116+
}
117+
118+
};
119+
exec1.submit(revoke2).get();
120+
exec2.submit(revoke2).get();
121+
assertThat(KafkaTestUtils.getPropertyValue(csa, "callbacks", Map.class)).isEmpty();
122+
assertThat(KafkaTestUtils.getPropertyValue(csa, "callbacksToTopic", Map.class)).isEmpty();
123+
var checkTL = new Callable<Void>() {
124+
125+
@Override
126+
public Void call() throws Exception {
127+
csa.unregisterSeekCallback();
128+
assertThat(KafkaTestUtils.getPropertyValue(csa, "callbackForThread", ThreadLocal.class).get()).isNull();
129+
return null;
130+
}
131+
132+
};
133+
exec1.submit(checkTL).get();
134+
exec2.submit(checkTL).get();
135+
exec1.shutdown();
136+
exec2.shutdown();
137+
}
138+
139+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2721,7 +2721,7 @@ NOTE: The `seekToBeginning` method that accepts a collection is useful, for exam
27212721
====
27222722
[source, java]
27232723
----
2724-
public class MyListener extends AbstractConsumerSeekAware {
2724+
public class MyListener implements ConsumerSeekAware {
27252725
27262726
...
27272727
@@ -2851,6 +2851,41 @@ public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
28512851
----
28522852
====
28532853

2854+
Version 2.6 added convenience methods to the abstract class:
2855+
2856+
* `seekToBeginning()` - seeks all assigned partitions to the beginning
2857+
* `seekToEnd()` - seeks all assigned partitions to the end
2858+
* `seekToTimestamp(long time)` - seeks all assigned partitions to the offset represented by that timestamp.
2859+
2860+
Example:
2861+
2862+
====
2863+
[source, java]
2864+
----
2865+
public class MyListener extends AbstractConsumerSeekAware {
2866+
2867+
@KafkaListener(...)
2868+
void listn(...) {
2869+
...
2870+
}
2871+
}
2872+
2873+
public class SomeOtherBean {
2874+
2875+
MyListener listener;
2876+
2877+
...
2878+
2879+
void someMethod() {
2880+
this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
2881+
}
2882+
2883+
}
2884+
2885+
----
2886+
====
2887+
2888+
28542889
[[container-factory]]
28552890
==== Container factory
28562891

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@ When using manual partition assignment, you can now specify a wildcard for deter
2626
In addition, if the listener implements `ConsumerSeekAware`, `onPartitionsAssigned()` is called after the manual assignment.
2727
(Also added in version 2.5.5).
2828
See <<manual-assignment>> for more information.
29+
30+
Convenience methods have been added to `AbstractConsumerSeekAware` to make seeking easier.
31+
See <<seek>> for more information.

0 commit comments

Comments
 (0)