Skip to content

Commit 0cf696f

Browse files
committed
Small fixes
1 parent cb11170 commit 0cf696f

File tree

5 files changed

+46
-17
lines changed

5 files changed

+46
-17
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
7070
private static final boolean FITBIT_API_INTRADAY_ACCESS_DEFAULT = false;
7171
private static final String FITBIT_API_INTRADAY_ACCESS_DISPLAY = "Is Fitbit Intraday API available?";
7272

73+
public static final String FITBIT_USER_POLL_INTERVAL = "fitbit.user.poll.interval";
74+
private static final String FITBIT_USER_POLL_INTERVAL_DOC = "Polling interval per Fitbit user per request route in seconds.";
75+
// 150 requests per hour -> 2.5 per minute. There are currently 5 paths, that limits us to 1
76+
// call per route per 2 minutes.
77+
private static final int FITBIT_USER_POLL_INTERVAL_DEFAULT = 150;
78+
private static final String FITBIT_USER_POLL_INTERVAL_DISPLAY = "Per-user per-route polling interval.";
79+
80+
7381
public static final String FITBIT_USER_CREDENTIALS_DIR_CONFIG = "fitbit.user.dir";
7482
private static final String FITBIT_USER_CREDENTIALS_DIR_DOC = "Directory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.";
7583
private static final String FITBIT_USER_CREDENTIALS_DIR_DISPLAY = "User directory";
@@ -178,9 +186,19 @@ public String toString() {
178186
Width.SHORT,
179187
FITBIT_API_SECRET_DISPLAY)
180188

189+
.define(FITBIT_USER_POLL_INTERVAL,
190+
Type.INT,
191+
FITBIT_USER_POLL_INTERVAL_DEFAULT,
192+
Importance.MEDIUM,
193+
FITBIT_USER_POLL_INTERVAL_DOC,
194+
group,
195+
++orderInGroup,
196+
Width.SHORT,
197+
FITBIT_USER_POLL_INTERVAL_DISPLAY)
198+
181199
.define(FITBIT_API_INTRADAY_ACCESS_CONFIG,
182200
Type.BOOLEAN,
183-
true,
201+
FITBIT_API_INTRADAY_ACCESS_DEFAULT,
184202
Importance.MEDIUM,
185203
FITBIT_API_INTRADAY_ACCESS_DOC,
186204
group,
@@ -352,4 +370,12 @@ public String getActivityLogTopic() {
352370
public boolean hasIntradayAccess() {
353371
return getBoolean(FITBIT_API_INTRADAY_ACCESS_CONFIG);
354372
}
373+
374+
public Duration getPollIntervalPerUser() {
375+
return Duration.ofSeconds(getInt(FITBIT_USER_POLL_INTERVAL));
376+
}
377+
378+
public Duration getTooManyRequestsCooldownInterval() {
379+
return Duration.ofHours(1);
380+
}
355381
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,15 @@ public void initialize(RestSourceConnectorConfig config) {
140140

141141
@Override
142142
public void requestSucceeded(RestRequest request, SourceRecord record) {
143+
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
143144
String userKey = ((FitbitRestRequest) request).getUser().getId();
144145
Instant offset = Instant.ofEpochMilli((Long) record.sourceOffset().get(TIMESTAMP_OFFSET_KEY));
145146
offsets.put(userKey, offset);
146147
}
147148

148149
@Override
149150
public void requestEmpty(RestRequest request) {
151+
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
150152
FitbitRestRequest fitbitRequest = (FitbitRestRequest) request;
151153
Instant endOffset = fitbitRequest.getDateRange().end().toInstant();
152154
if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) {
@@ -160,7 +162,16 @@ public void requestFailed(RestRequest request, Response response) {
160162
if (response != null && response.code() == 429) {
161163
User user = ((FitbitRestRequest)request).getUser();
162164
tooManyRequestsForUser.add(user);
163-
Instant backOff = lastPoll.plus(getTooManyRequestsCooldown());
165+
String cooldownString = response.header("Retry-After");
166+
Duration cooldown = getTooManyRequestsCooldown();
167+
if (cooldownString != null) {
168+
try {
169+
cooldown = Duration.ofSeconds(Long.parseLong(cooldownString));
170+
} catch (NumberFormatException ex) {
171+
cooldown = getTooManyRequestsCooldown();
172+
}
173+
}
174+
Instant backOff = lastPoll.plus(cooldown);
164175
lastPollPerUser.put(user.getId(), backOff);
165176
logger.info("Too many requests for user {}. Backing off until {}",
166177
user, backOff.plus(getPollIntervalPerUser()));
@@ -185,10 +196,7 @@ public Stream<FitbitRestRequest> requests() {
185196
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
186197
.filter(u -> lastPoll.isAfter(u.getValue()))
187198
.sorted(Comparator.comparing(Map.Entry::getValue))
188-
.flatMap(u -> {
189-
lastPollPerUser.put(u.getKey().getId(), lastPoll);
190-
return this.createRequests(u.getKey());
191-
})
199+
.flatMap(u -> this.createRequests(u.getKey()))
192200
.filter(Objects::nonNull);
193201
} catch (IOException e) {
194202
logger.warn("Cannot read users");

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,4 @@ public RequestGenerator getRequestGenerator() {
214214
requestGenerator.initialize(this);
215215
return requestGenerator;
216216
}
217-
218-
public Duration getPollIntervalPerUser() {
219-
return Duration.ofMinutes(30);
220-
}
221-
222-
public Duration getTooManyRequestsCooldownInterval() {
223-
return Duration.ofHours(1);
224-
}
225217
}

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,13 +76,16 @@ public List<SourceRecord> poll() throws InterruptedException {
7676
}
7777

7878
Iterator<? extends RestRequest> requestIterator = requestGenerator.requests()
79-
.filter(RestRequest::isStillValid)
8079
.iterator();
8180

8281

8382
while (requests.isEmpty() && requestIterator.hasNext()) {
8483
RestRequest request = requestIterator.next();
8584

85+
if (!request.isStillValid()) {
86+
continue;
87+
}
88+
8689
logger.info("Requesting {}", request.getRequest().url());
8790
requestsGenerated++;
8891

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public boolean isStillValid() {
7272

7373
/**
7474
* Handle the request using the internal client, using the request route converter.
75-
* @return stream of resulting source records, or {@code null} if the response was not successful.
75+
* @return stream of resulting source records.
7676
* @throws IOException if making or parsing the request failed.
7777
*/
7878
public Stream<SourceRecord> handleRequest() throws IOException {
@@ -83,7 +83,7 @@ public Stream<SourceRecord> handleRequest() throws IOException {
8383
try (Response response = client.newCall(request).execute()) {
8484
if (!response.isSuccessful()) {
8585
route.requestFailed(this, response);
86-
return null;
86+
return Stream.empty();
8787
}
8888

8989
Collection<SourceRecord> records = route.converter().convert(this, response);

0 commit comments

Comments
 (0)