Skip to content

Commit 906caaf

Browse files
bradskusesmcvb
authored andcommitted
Address PR comments
(cherry picked from commit 1748681)
1 parent 01be04f commit 906caaf

File tree

4 files changed

+66
-19
lines changed

4 files changed

+66
-19
lines changed

kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/AsyncFetcher.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2022. Axon Framework
2+
* Copyright (c) 2010-2025. Axon Framework
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -94,26 +94,26 @@ protected AsyncFetcher(Builder<K, V, E> builder) {
9494
*/
9595
@Override
9696
public Registration poll(Consumer<K, V> consumer,
97-
RecordConverter<K, V, E> recordConverter,
98-
EventConsumer<E> eventConsumer) {
97+
RecordConverter<K, V, E> recordConverter,
98+
EventConsumer<E> eventConsumer) {
9999
return poll(consumer, recordConverter, eventConsumer,
100-
e -> logger.warn("Error from fetching thread, should be handled properly", e));
100+
e -> logger.warn("Error from fetching thread, should be handled properly", e));
101101
}
102102

103103
/**
104104
* {@inheritDoc}
105105
*/
106106
@Override
107107
public Registration poll(Consumer<K, V> consumer, RecordConverter<K, V, E> recordConverter,
108-
EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
108+
EventConsumer<E> eventConsumer, RuntimeErrorHandler runtimeErrorHandler) {
109109
FetchEventsTask<K, V, E> fetcherTask =
110110
new FetchEventsTask<>(consumer,
111-
pollTimeout,
112-
recordConverter,
113-
eventConsumer,
114-
activeFetchers::remove,
115-
runtimeErrorHandler,
116-
offsetCommitType);
111+
pollTimeout,
112+
recordConverter,
113+
eventConsumer,
114+
activeFetchers::remove,
115+
runtimeErrorHandler,
116+
offsetCommitType);
117117

118118
activeFetchers.add(fetcherTask);
119119
executorService.execute(fetcherTask);
@@ -162,22 +162,32 @@ public static final class Builder<K, V, E> {
162162
*/
163163
public Builder<K, V, E> pollTimeout(long timeoutMillis) {
164164
assertThat(timeoutMillis, timeout -> timeout > 0,
165-
"The poll timeout may not be negative [" + timeoutMillis + "]");
165+
"The poll timeout may not be negative [" + timeoutMillis + "]");
166166
this.pollTimeout = Duration.ofMillis(timeoutMillis);
167167
return this;
168168
}
169169

170170
/**
171-
* Set the {@code offsetCommitType}, options are:
172-
* {@link OffsetCommitType#AUTO} let the Kafka consumer commit offsets automatically in background
173-
* {@link OffsetCommitType#COMMIT_SYNC} let the Kafka consumer commit offsets synchronously after processing
174-
* {@link OffsetCommitType#COMMIT_ASYNC} let the Kafka consumer commit offsets asynchronously after processing
175-
* Defaults to {@code OffsetCommitType#AUTO}
171+
* Sets the {@code offsetCommitType} defining how the {@link FetchEventsTask} will commit offsets during
172+
* processing of events.
173+
* <p>
174+
* Options are:
175+
* <ul>
176+
* <li>{@link OffsetCommitType#AUTO} - let the Kafka consumer commit offsets automatically in background.
177+
* </li>
178+
* <li>{@link OffsetCommitType#COMMIT_SYNC} - let the Kafka consumer commit offsets synchronously after
179+
* processing.</li>
180+
* <li>{@link OffsetCommitType#COMMIT_ASYNC} - let the Kafka consumer commit offsets asynchronously after
181+
* processing.</li>
182+
* </ul>
183+
* <p>
184+
* Defaults to {@code OffsetCommitType#AUTO}, meaning the offset commit task happens in the background.
176185
*
177186
* @param offsetCommitType {@link OffsetCommitType} enum to specify the offset commit type
178187
* @return the current Builder instance, for fluent interfacing
179188
*/
180189
public AsyncFetcher.Builder<K, V, E> offsetCommitType(OffsetCommitType offsetCommitType) {
190+
assertNonNull(offsetCommitType, "OffsetCommitType may not be null");
181191
this.offsetCommitType = offsetCommitType;
182192
return this;
183193
}

kafka/src/main/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2022. Axon Framework
2+
* Copyright (c) 2010-2025. Axon Framework
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,47 @@
1+
/*
2+
* Copyright (c) 2010-2025. Axon Framework
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
117
package org.axonframework.extensions.kafka.eventhandling.consumer;
218

19+
import org.apache.kafka.clients.consumer.KafkaConsumer;
20+
321
/**
422
* Enum to define how the consumer will handle committing offsets.
23+
*
24+
* @author Bradley Skuse
25+
* @since 4.11.1
526
*/
627
public enum OffsetCommitType {
28+
29+
/**
30+
* Kafka consumer will commit offsets automatically in the background.
31+
*/
732
AUTO,
33+
34+
/**
35+
* Kafka consumer will commit offsets asynchronously after processing
36+
*
37+
* @see KafkaConsumer#commitAsync()
38+
*/
839
COMMIT_ASYNC,
40+
41+
/**
42+
* Kafka consumer will commit offsets synchronously after processing
43+
*
44+
* @see KafkaConsumer#commitSync()
45+
*/
946
COMMIT_SYNC
1047
}

kafka/src/test/java/org/axonframework/extensions/kafka/eventhandling/consumer/FetchEventsTaskTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010-2022. Axon Framework
2+
* Copyright (c) 2010-2025. Axon Framework
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.

0 commit comments

Comments
 (0)