Skip to content

Commit 25b3498

Browse files
authored
Merge pull request #42 from RADAR-base/alternative_poll
Alternative poll algorithm
2 parents dfd3e70 + b2a4ea8 commit 25b3498

File tree

5 files changed

+35
-44
lines changed

5 files changed

+35
-44
lines changed

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ your Fitbit App client ID and client secret. The following tables shows the poss
6161
<tr>
6262
<td>fitbit.user.repository.url</td></td><td>URL for webservice containing user credentials. Only used if a webservice-based user repository is configured.</td></td><td>string</td></td><td>""</td></td><td></td></td><td>low</td></td></tr>
6363
<tr>
64-
<td>fitbit.max.users.per.poll</td></td><td>Maximum number of users to query in a single poll operation. Decrease this if memory constrains are pressing.</td></td><td>int</td></td><td>100</td></td><td>[1,...]</td></td><td>low</td></td></tr>
65-
<tr>
6664
<td>fitbit.intraday.steps.topic</td></td><td>Topic for Fitbit intraday steps</td></td><td>string</td></td><td>connect_fitbit_intraday_steps</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
6765
<tr>
6866
<td>fitbit.intraday.heart.rate.topic</td></td><td>Topic for Fitbit intraday heart_rate</td></td><td>string</td></td><td>connect_fitbit_intraday_heart_rate</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>

docker/source-fitbit.properties.template

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,5 @@ rest.source.poll.interval.ms=5000
66
rest.source.request.generator.class=org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator
77
fitbit.api.client=?
88
fitbit.api.secret=?
9-
fitbit.max.users.per.poll=10
109
fitbit.user.repository.class=org.radarbase.connect.rest.fitbit.user.ServiceUserRepository
11-
fitbit.user.repository.url=http://radar-device-auth-backend:8080/
10+
fitbit.user.repository.url=http://localhost:8080/

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,6 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
105105
private static final String FITBIT_TIME_ZONE_TOPIC_DEFAULT = "connect_fitbit_time_zone";
106106
private static final String FITBIT_TIME_ZONE_TOPIC_DISPLAY = "Time zone topic";
107107

108-
private static final String FITBIT_MAX_USERS_PER_POLL_CONFIG = "fitbit.max.users.per.poll";
109-
private static final String FITBIT_MAX_USERS_PER_POLL_DOC = "Maximum number of users to query in a single poll operation. Decrease this if memory constrains are pressing.";
110-
private static final int FITBIT_MAX_USERS_PER_POLL_DEFAULT = 100;
111-
private static final String FITBIT_MAX_USERS_PER_POLL_DISPLAY = "Maximum users per poll";
112-
113108
private static final String FITBIT_ACTIVITY_LOG_TOPIC_CONFIG = "fitbit.activity.log.topic";
114109
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DOC = "Topic for Fitbit activity log.";
115110
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DEFAULT = "connect_fitbit_activity_log";
@@ -152,7 +147,6 @@ public String toString() {
152147
};
153148

154149
return RestSourceConnectorConfig.conf()
155-
156150
.define(FITBIT_USERS_CONFIG,
157151
Type.LIST,
158152
Collections.emptyList(),
@@ -225,17 +219,6 @@ public String toString() {
225219
Width.SHORT,
226220
FITBIT_USER_REPOSITORY_URL_DISPLAY)
227221

228-
.define(FITBIT_MAX_USERS_PER_POLL_CONFIG,
229-
Type.INT,
230-
FITBIT_MAX_USERS_PER_POLL_DEFAULT,
231-
Range.atLeast(1),
232-
Importance.LOW,
233-
FITBIT_MAX_USERS_PER_POLL_DOC,
234-
group,
235-
++orderInGroup,
236-
Width.SHORT,
237-
FITBIT_MAX_USERS_PER_POLL_DISPLAY)
238-
239222
.define(FITBIT_INTRADAY_STEPS_TOPIC_CONFIG,
240223
Type.STRING,
241224
FITBIT_INTRADAY_STEPS_TOPIC_DEFAULT,
@@ -362,10 +345,6 @@ public Headers getClientCredentials() {
362345
return clientCredentials;
363346
}
364347

365-
public long getMaxUsersPerPoll() {
366-
return getInt(FITBIT_MAX_USERS_PER_POLL_CONFIG);
367-
}
368-
369348
public String getActivityLogTopic() {
370349
return getString(FITBIT_ACTIVITY_LOG_TOPIC_CONFIG);
371350
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
109109
private Duration pollInterval;
110110
private Instant lastPoll;
111111
private String baseUrl;
112-
private long maxUsersPerPoll;
113112
private Duration pollIntervalPerUser;
114113
private final Set<User> tooManyRequestsForUser;
115114
private Duration tooManyRequestsCooldown;
@@ -133,7 +132,6 @@ public void initialize(RestSourceConnectorConfig config) {
133132
FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config;
134133
this.pollInterval = fitbitConfig.getPollInterval();
135134
this.baseUrl = fitbitConfig.getUrl();
136-
this.maxUsersPerPoll = fitbitConfig.getMaxUsersPerPoll();
137135
this.pollIntervalPerUser = fitbitConfig.getPollIntervalPerUser();
138136
this.tooManyRequestsCooldown = fitbitConfig.getTooManyRequestsCooldownInterval()
139137
.minus(getPollIntervalPerUser());
@@ -187,7 +185,6 @@ public Stream<FitbitRestRequest> requests() {
187185
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
188186
.filter(u -> lastPoll.isAfter(u.getValue()))
189187
.sorted(Comparator.comparing(Map.Entry::getValue))
190-
.limit(maxUsersPerPoll)
191188
.flatMap(u -> {
192189
lastPollPerUser.put(u.getKey().getId(), lastPoll);
193190
return this.createRequests(u.getKey());
@@ -199,6 +196,15 @@ public Stream<FitbitRestRequest> requests() {
199196
}
200197
}
201198

199+
200+
/** Get the time that this route should be polled again. */
201+
@Override
202+
public Instant getTimeOfNextRequest() {
203+
return nextPolls()
204+
.min(Comparator.naturalOrder())
205+
.orElse(nearFuture());
206+
}
207+
202208
private Map<String, Object> getPartition(User user) {
203209
return partitions.computeIfAbsent(user.getId(),
204210
k -> generator.getPartition(routeName, user));

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@
2222

2323
import java.lang.reflect.InvocationTargetException;
2424
import java.time.Instant;
25+
import java.util.ArrayList;
26+
import java.util.Collections;
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Objects;
2830
import java.util.concurrent.atomic.LongAdder;
31+
import java.util.function.Function;
2932
import java.util.stream.Collectors;
3033
import org.apache.kafka.connect.errors.ConnectException;
3134
import org.apache.kafka.connect.source.SourceRecord;
@@ -60,24 +63,30 @@ public void start(Map<String, String> map) {
6063

6164
@Override
6265
public List<SourceRecord> poll() throws InterruptedException {
63-
long timeout = MILLIS.between(Instant.now(), requestGenerator.getTimeOfNextRequest());
64-
if (timeout > 0) {
65-
logger.info("Waiting {} milliseconds for next available request", timeout);
66-
Thread.sleep(timeout);
67-
}
68-
6966
LongAdder requestsGenerated = new LongAdder();
67+
List<SourceRecord> requests;
68+
69+
do {
70+
long timeout = MILLIS.between(Instant.now(), requestGenerator.getTimeOfNextRequest());
71+
if (timeout > 0) {
72+
logger.info("Waiting {} milliseconds for next available request", timeout);
73+
Thread.sleep(timeout);
74+
}
7075

71-
List<SourceRecord> requests = requestGenerator.requests()
72-
.filter(RestRequest::isStillValid)
73-
.peek(r -> {
74-
logger.info("Requesting {}", r.getRequest().url());
75-
requestsGenerated.increment();
76-
})
77-
.flatMap(tryOrNull(RestRequest::handleRequest,
78-
(r, ex) -> logger.warn("Failed to make request: {}", ex.toString())))
79-
.filter(Objects::nonNull)
80-
.collect(Collectors.toList());
76+
requests = requestGenerator.requests()
77+
.filter(RestRequest::isStillValid)
78+
.peek(r -> {
79+
logger.info("Requesting {}", r.getRequest().url());
80+
requestsGenerated.increment();
81+
})
82+
.map(tryOrNull(RestRequest::handleRequest,
83+
(r, ex) -> logger.warn("Failed to make request: {}", ex.toString())))
84+
.filter(Objects::nonNull)
85+
.map(s -> s.collect(Collectors.toList()))
86+
.filter(l -> !l.isEmpty())
87+
.findAny()
88+
.orElse(Collections.emptyList());
89+
} while (requests.isEmpty());
8190

8291
logger.info("Processed {} records from {} URLs", requests.size(), requestsGenerated.sum());
8392

0 commit comments

Comments
 (0)