Skip to content

Commit b71a521

Browse files
authored
Merge pull request #44 from RADAR-base/release-0.2.3
Release 0.2.3
2 parents 6842fa5 + bbc32da commit b71a521

File tree

12 files changed

+81
-45
lines changed

12 files changed

+81
-45
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ your Fitbit App client ID and client secret. The following tables shows the poss
5353
<tr>
5454
<td>fitbit.api.secret</td></td><td>Secret for the Fitbit API client set in fitbit.api.client.</td></td><td>password</td></td><td></td></td><td></td></td><td>high</td></td></tr>
5555
<tr>
56-
<td>fitbit.api.intraday</td></td><td>Set to true if the client has permissions to Fitbit Intraday API, false otherwise.</td></td><td>boolean</td></td><td>true</td></td><td></td></td><td>medium</td></td></tr>
56+
<td>fitbit.user.poll.interval</td></td><td>Polling interval per Fitbit user per request route in seconds.</td></td><td>int</td></td><td>150</td></td><td></td></td><td>medium</td></td></tr>
57+
<tr>
58+
<td>fitbit.api.intraday</td></td><td>Set to true if the client has permissions to Fitbit Intraday API, false otherwise.</td></td><td>boolean</td></td><td>false</td></td><td></td></td><td>medium</td></td></tr>
5759
<tr>
5860
<td>fitbit.user.repository.class</td></td><td>Class for managing users and authentication.</td></td><td>class</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlUserRepository</td></td><td>Class extending org.radarbase.connect.rest.fitbit.user.UserRepository</td></td><td>medium</td></td></tr>
5961
<tr>

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ subprojects {
1010
apply plugin: 'java'
1111
apply plugin: 'java-library'
1212

13-
group = 'org.radarcns'
14-
version = '0.2.2'
13+
group = 'org.radarbase'
14+
version = '0.2.3'
1515

1616
sourceCompatibility = 1.8
1717
targetCompatibility = 1.8

kafka-connect-fitbit-source/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
dependencies {
22
api project(':kafka-connect-rest-source')
33
api group: 'io.confluent', name: 'kafka-connect-avro-converter', version: confluentVersion
4-
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.0'
4+
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.1'
55

66
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
77
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
7070
private static final boolean FITBIT_API_INTRADAY_ACCESS_DEFAULT = false;
7171
private static final String FITBIT_API_INTRADAY_ACCESS_DISPLAY = "Is Fitbit Intraday API available?";
7272

73+
public static final String FITBIT_USER_POLL_INTERVAL = "fitbit.user.poll.interval";
74+
private static final String FITBIT_USER_POLL_INTERVAL_DOC = "Polling interval per Fitbit user per request route in seconds.";
75+
// 150 requests per hour -> 2.5 per minute. There are currently 5 paths, that limits us to 1
76+
// call per route per 2 minutes.
77+
private static final int FITBIT_USER_POLL_INTERVAL_DEFAULT = 150;
78+
private static final String FITBIT_USER_POLL_INTERVAL_DISPLAY = "Per-user per-route polling interval.";
79+
80+
7381
public static final String FITBIT_USER_CREDENTIALS_DIR_CONFIG = "fitbit.user.dir";
7482
private static final String FITBIT_USER_CREDENTIALS_DIR_DOC = "Directory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.";
7583
private static final String FITBIT_USER_CREDENTIALS_DIR_DISPLAY = "User directory";
@@ -178,9 +186,19 @@ public String toString() {
178186
Width.SHORT,
179187
FITBIT_API_SECRET_DISPLAY)
180188

189+
.define(FITBIT_USER_POLL_INTERVAL,
190+
Type.INT,
191+
FITBIT_USER_POLL_INTERVAL_DEFAULT,
192+
Importance.MEDIUM,
193+
FITBIT_USER_POLL_INTERVAL_DOC,
194+
group,
195+
++orderInGroup,
196+
Width.SHORT,
197+
FITBIT_USER_POLL_INTERVAL_DISPLAY)
198+
181199
.define(FITBIT_API_INTRADAY_ACCESS_CONFIG,
182200
Type.BOOLEAN,
183-
true,
201+
FITBIT_API_INTRADAY_ACCESS_DEFAULT,
184202
Importance.MEDIUM,
185203
FITBIT_API_INTRADAY_ACCESS_DOC,
186204
group,
@@ -352,4 +370,12 @@ public String getActivityLogTopic() {
352370
public boolean hasIntradayAccess() {
353371
return getBoolean(FITBIT_API_INTRADAY_ACCESS_CONFIG);
354372
}
373+
374+
public Duration getPollIntervalPerUser() {
375+
return Duration.ofSeconds(getInt(FITBIT_USER_POLL_INTERVAL));
376+
}
377+
378+
public Duration getTooManyRequestsCooldownInterval() {
379+
return Duration.ofHours(1);
380+
}
355381
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateRoute.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,11 @@
2929
import org.radarbase.connect.rest.fitbit.user.UserRepository;
3030

3131
public class FitbitIntradayHeartRateRoute extends FitbitPollingRoute {
32-
private static final String ROUTE_NAME = "heart_rate";
3332
private final FitbitIntradayHeartRateAvroConverter converter;
3433

3534
public FitbitIntradayHeartRateRoute(FitbitRequestGenerator generator,
3635
UserRepository userRepository, AvroData avroData) {
37-
super(generator, userRepository, ROUTE_NAME);
36+
super(generator, userRepository, "heart_rate");
3837
this.converter = new FitbitIntradayHeartRateAvroConverter(avroData);
3938
}
4039

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,15 @@ public void initialize(RestSourceConnectorConfig config) {
140140

141141
@Override
142142
public void requestSucceeded(RestRequest request, SourceRecord record) {
143+
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
143144
String userKey = ((FitbitRestRequest) request).getUser().getId();
144145
Instant offset = Instant.ofEpochMilli((Long) record.sourceOffset().get(TIMESTAMP_OFFSET_KEY));
145146
offsets.put(userKey, offset);
146147
}
147148

148149
@Override
149150
public void requestEmpty(RestRequest request) {
151+
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
150152
FitbitRestRequest fitbitRequest = (FitbitRestRequest) request;
151153
Instant endOffset = fitbitRequest.getDateRange().end().toInstant();
152154
if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) {
@@ -160,7 +162,16 @@ public void requestFailed(RestRequest request, Response response) {
160162
if (response != null && response.code() == 429) {
161163
User user = ((FitbitRestRequest)request).getUser();
162164
tooManyRequestsForUser.add(user);
163-
Instant backOff = lastPoll.plus(getTooManyRequestsCooldown());
165+
String cooldownString = response.header("Retry-After");
166+
Duration cooldown = getTooManyRequestsCooldown();
167+
if (cooldownString != null) {
168+
try {
169+
cooldown = Duration.ofSeconds(Long.parseLong(cooldownString));
170+
} catch (NumberFormatException ex) {
171+
cooldown = getTooManyRequestsCooldown();
172+
}
173+
}
174+
Instant backOff = lastPoll.plus(cooldown);
164175
lastPollPerUser.put(user.getId(), backOff);
165176
logger.info("Too many requests for user {}. Backing off until {}",
166177
user, backOff.plus(getPollIntervalPerUser()));
@@ -185,10 +196,7 @@ public Stream<FitbitRestRequest> requests() {
185196
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
186197
.filter(u -> lastPoll.isAfter(u.getValue()))
187198
.sorted(Comparator.comparing(Map.Entry::getValue))
188-
.flatMap(u -> {
189-
lastPollPerUser.put(u.getKey().getId(), lastPoll);
190-
return this.createRequests(u.getKey());
191-
})
199+
.flatMap(u -> this.createRequests(u.getKey()))
192200
.filter(Objects::nonNull);
193201
} catch (IOException e) {
194202
logger.warn("Cannot read users");

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ public Stream<? extends User> stream() throws IOException {
9090
Instant now = Instant.now();
9191
if (!now.isAfter(nextFetchTime)
9292
|| !nextFetch.compareAndSet(nextFetchTime, now.plus(FETCH_THRESHOLD))) {
93-
logger.info("Providing cached user information...");
93+
logger.debug("Providing cached user information...");
9494
return timedCachedUsers.stream();
9595
}
9696

9797
logger.info("Requesting user information from webservice");
98-
Request request = requestFor("users" + "?source-type=FitBit").build();
98+
Request request = requestFor("users?source-type=FitBit").build();
9999
this.timedCachedUsers =
100100
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
101101
.filter(u -> u.isComplete()

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -214,12 +214,4 @@ public RequestGenerator getRequestGenerator() {
214214
requestGenerator.initialize(this);
215215
return requestGenerator;
216216
}
217-
218-
public Duration getPollIntervalPerUser() {
219-
return Duration.ofMinutes(30);
220-
}
221-
222-
public Duration getTooManyRequestsCooldownInterval() {
223-
return Duration.ofHours(1);
224-
}
225217
}

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceTask.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import static java.time.temporal.ChronoUnit.MILLIS;
2121
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
2222

23+
import java.io.IOException;
2324
import java.lang.reflect.InvocationTargetException;
2425
import java.time.Instant;
2526
import java.util.ArrayList;
2627
import java.util.Collections;
28+
import java.util.Iterator;
2729
import java.util.List;
2830
import java.util.Map;
2931
import java.util.Objects;
@@ -63,8 +65,8 @@ public void start(Map<String, String> map) {
6365

6466
@Override
6567
public List<SourceRecord> poll() throws InterruptedException {
66-
LongAdder requestsGenerated = new LongAdder();
67-
List<SourceRecord> requests;
68+
long requestsGenerated = 0;
69+
List<SourceRecord> requests = Collections.emptyList();
6870

6971
do {
7072
long timeout = MILLIS.between(Instant.now(), requestGenerator.getTimeOfNextRequest());
@@ -73,23 +75,30 @@ public List<SourceRecord> poll() throws InterruptedException {
7375
Thread.sleep(timeout);
7476
}
7577

76-
requests = requestGenerator.requests()
77-
.sequential()
78-
.filter(RestRequest::isStillValid)
79-
.peek(r -> {
80-
logger.info("Requesting {}", r.getRequest().url());
81-
requestsGenerated.increment();
82-
})
83-
.map(tryOrNull(RestRequest::handleRequest,
84-
(r, ex) -> logger.warn("Failed to make request: {}", ex.toString())))
85-
.filter(Objects::nonNull)
86-
.map(s -> s.collect(Collectors.toList()))
87-
.filter(l -> !l.isEmpty())
88-
.findFirst()
89-
.orElse(Collections.emptyList());
78+
Iterator<? extends RestRequest> requestIterator = requestGenerator.requests()
79+
.iterator();
80+
81+
82+
while (requests.isEmpty() && requestIterator.hasNext()) {
83+
RestRequest request = requestIterator.next();
84+
85+
if (!request.isStillValid()) {
86+
continue;
87+
}
88+
89+
logger.info("Requesting {}", request.getRequest().url());
90+
requestsGenerated++;
91+
92+
try {
93+
requests = request.handleRequest()
94+
.collect(Collectors.toList());
95+
} catch (IOException ex) {
96+
logger.warn("Failed to make request: {}", ex.toString());
97+
}
98+
}
9099
} while (requests.isEmpty());
91100

92-
logger.info("Processed {} records from {} URLs", requests.size(), requestsGenerated.sum());
101+
logger.info("Processed {} records from {} URLs", requests.size(), requestsGenerated);
93102

94103
return requests;
95104
}

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public boolean isStillValid() {
7272

7373
/**
7474
* Handle the request using the internal client, using the request route converter.
75-
* @return stream of resulting source records, or {@code null} if the response was not successful.
75+
* @return stream of resulting source records.
7676
* @throws IOException if making or parsing the request failed.
7777
*/
7878
public Stream<SourceRecord> handleRequest() throws IOException {
@@ -83,7 +83,7 @@ public Stream<SourceRecord> handleRequest() throws IOException {
8383
try (Response response = client.newCall(request).execute()) {
8484
if (!response.isSuccessful()) {
8585
route.requestFailed(this, response);
86-
return null;
86+
return Stream.empty();
8787
}
8888

8989
Collection<SourceRecord> records = route.converter().convert(this, response);

0 commit comments

Comments
 (0)