Skip to content

Commit 9e2a3bf

Browse files
committed
Separate out route rotation in a different method
1 parent 9d6a34b commit 9e2a3bf

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,19 @@ public Stream<RestRequest> requests() {
118118
if (this.routes == null || this.routes.isEmpty()) {
119119
return Stream.empty();
120120
}
121+
122+
// Rotate routes so that all routes are requested in a round-robin manner
123+
List<Route> rotatedRoutes = getRotatedRoutes();
124+
return rotatedRoutes.stream()
125+
.flatMap((Route r) -> StreamsKt.asStream(ouraRequestGenerator.requests(r, 100)));
126+
}
121127

122-
// Rotate routes
128+
private List<Route> getRotatedRoutes() {
123129
int routeStart = routeStartIndex % this.routes.size();
124-
List<Route> rotatedRoutes = new ArrayList<>(this.routes.size());
125-
rotatedRoutes.addAll(this.routes.subList(routeStart, this.routes.size()));
126-
rotatedRoutes.addAll(this.routes.subList(0, routeStart));
130+
List<Route> rotatedRoutes = new ArrayList<>(this.routes);
131+
Collections.rotate(rotatedRoutes, routeStart);
127132
routeStartIndex = (routeStartIndex + 1) % this.routes.size();
128-
129-
// Generate requests per rotated route across all users (user iteration handled by generator)
130-
return rotatedRoutes.stream()
131-
.flatMap((Route r) -> StreamsKt.asStream(ouraRequestGenerator.requests(r, 100)));
133+
return rotatedRoutes;
132134
}
133135

134136
public Stream<SourceRecord> handleRequest(RestRequest req) throws IOException {

0 commit comments

Comments
 (0)