Skip to content

Commit cb11170

Browse files
committed
Use iterator instead of stream to ensure once-only requests
1 parent 85d6c56 commit cb11170

File tree

2 files changed

+23
-18
lines changed

2 files changed

+23
-18
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,11 @@
2929
import org.radarbase.connect.rest.fitbit.user.UserRepository;
3030

3131
public class FitbitIntradayHeartRateRoute extends FitbitPollingRoute {
32-
private static final String ROUTE_NAME = "heart_rate";
3332
private final FitbitIntradayHeartRateAvroConverter converter;
3433

3534
public FitbitIntradayHeartRateRoute(FitbitRequestGenerator generator,
3635
UserRepository userRepository, AvroData avroData) {
37-
super(generator, userRepository, ROUTE_NAME);
36+
super(generator, userRepository, "heart_rate");
3837
this.converter = new FitbitIntradayHeartRateAvroConverter(avroData);
3938
}
4039

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import static java.time.temporal.ChronoUnit.MILLIS;
2121
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
2222

23+
import java.io.IOException;
2324
import java.lang.reflect.InvocationTargetException;
2425
import java.time.Instant;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
28+
import java.util.Iterator;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.Objects;
@@ -63,8 +65,8 @@ public void start(Map<String, String> map) {
6365

6466
@Override
6567
public List<SourceRecord> poll() throws InterruptedException {
66-
LongAdder requestsGenerated = new LongAdder();
67-
List<SourceRecord> requests;
68+
long requestsGenerated = 0;
69+
List<SourceRecord> requests = Collections.emptyList();
6870

6971
do {
7072
long timeout = MILLIS.between(Instant.now(), requestGenerator.getTimeOfNextRequest());
@@ -73,23 +75,27 @@ public List<SourceRecord> poll() throws InterruptedException {
7375
Thread.sleep(timeout);
7476
}
7577

76-
requests = requestGenerator.requests()
77-
.sequential()
78+
Iterator<? extends RestRequest> requestIterator = requestGenerator.requests()
7879
.filter(RestRequest::isStillValid)
79-
.peek(r -> {
80-
logger.info("Requesting {}", r.getRequest().url());
81-
requestsGenerated.increment();
82-
})
83-
.map(tryOrNull(RestRequest::handleRequest,
84-
(r, ex) -> logger.warn("Failed to make request: {}", ex.toString())))
85-
.filter(Objects::nonNull)
86-
.map(s -> s.collect(Collectors.toList()))
87-
.filter(l -> !l.isEmpty())
88-
.findFirst()
89-
.orElse(Collections.emptyList());
80+
.iterator();
81+
82+
83+
while (requests.isEmpty() && requestIterator.hasNext()) {
84+
RestRequest request = requestIterator.next();
85+
86+
logger.info("Requesting {}", request.getRequest().url());
87+
requestsGenerated++;
88+
89+
try {
90+
requests = request.handleRequest()
91+
.collect(Collectors.toList());
92+
} catch (IOException ex) {
93+
logger.warn("Failed to make request: {}", ex.toString());
94+
}
95+
}
9096
} while (requests.isEmpty());
9197

92-
logger.info("Processed {} records from {} URLs", requests.size(), requestsGenerated.sum());
98+
logger.info("Processed {} records from {} URLs", requests.size(), requestsGenerated);
9399

94100
return requests;
95101
}

0 commit comments

Comments
 (0)