Skip to content

Commit 09b6857

Browse files
committed
Ensure we're not missing any records
1 parent 8a1d153 commit 09b6857

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public List<SourceRecord> poll() throws InterruptedException {
7474
}
7575

7676
requests = requestGenerator.requests()
77+
.sequential()
7778
.filter(RestRequest::isStillValid)
7879
.peek(r -> {
7980
logger.info("Requesting {}", r.getRequest().url());
@@ -84,7 +85,7 @@ public List<SourceRecord> poll() throws InterruptedException {
8485
.filter(Objects::nonNull)
8586
.map(s -> s.collect(Collectors.toList()))
8687
.filter(l -> !l.isEmpty())
87-
.findAny()
88+
.findFirst()
8889
.orElse(Collections.emptyList());
8990
} while (requests.isEmpty());
9091

0 commit comments

Comments
 (0)