Skip to content

Commit 0c1da16

Browse files
committed
fix: hrv, breathing rate, and skin temp routes to limit to 30 days
1 parent 83906c5 commit 0c1da16

File tree

4 files changed

+43
-36
lines changed

4 files changed

+43
-36
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitBreathingRateRoute.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@
2323
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
2424
import org.radarbase.connect.rest.fitbit.user.User;
2525
import org.radarbase.connect.rest.fitbit.user.UserRepository;
26-
import org.radarbase.connect.rest.fitbit.util.DateRange;
2726

28-
import java.time.ZonedDateTime;
2927
import java.util.stream.Stream;
3028

31-
import static java.time.ZoneOffset.UTC;
3229
import static java.time.temporal.ChronoUnit.SECONDS;
30+
import java.time.Duration;
3331

3432
public class FitbitBreathingRateRoute extends FitbitPollingRoute {
3533
private final FitbitBreathingRateAvroConverter converter;
34+
protected static final Duration REQUEST_INTERVAL = Duration.ofDays(30);
3635

3736
public FitbitBreathingRateRoute(FitbitRequestGenerator generator,
3837
UserRepository userRepository, AvroData avroData) {
@@ -46,16 +45,18 @@ protected String getUrlFormat(String baseUrl) {
4645
}
4746

4847
protected Stream<FitbitRestRequest> createRequests(User user) {
49-
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
50-
.atZone(UTC)
51-
.truncatedTo(SECONDS);
52-
ZonedDateTime now = ZonedDateTime.now(UTC);
53-
return Stream.of(newRequest(user, new DateRange(startDate, now),
54-
user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now)));
48+
return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS))
49+
.map(dateRange -> newRequest(user, dateRange,
50+
user.getExternalUserId(), DATE_FORMAT.format(dateRange.start())));
51+
}
52+
53+
@Override
54+
Duration getDateRangeInterval() {
55+
return REQUEST_INTERVAL;
5556
}
5657

5758
@Override
5859
public FitbitBreathingRateAvroConverter converter() {
5960
return converter;
6061
}
61-
}
62+
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayHeartRateVariabilityRoute.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,21 @@
1818
package org.radarbase.connect.rest.fitbit.route;
1919

2020
import io.confluent.connect.avro.AvroData;
21-
import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateAvroConverter;
2221
import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateVariabilityAvroConverter;
2322
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
2423
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
2524
import org.radarbase.connect.rest.fitbit.user.User;
2625
import org.radarbase.connect.rest.fitbit.user.UserRepository;
27-
import org.radarbase.connect.rest.fitbit.util.DateRange;
2826

29-
import java.time.ZonedDateTime;
3027
import java.util.stream.Stream;
3128

32-
import static java.time.ZoneOffset.UTC;
3329
import static java.time.temporal.ChronoUnit.SECONDS;
30+
import java.time.Duration;
31+
3432

3533
public class FitbitIntradayHeartRateVariabilityRoute extends FitbitPollingRoute {
3634
private final FitbitIntradayHeartRateVariabilityAvroConverter converter;
35+
protected static final Duration REQUEST_INTERVAL = Duration.ofDays(30);
3736

3837
public FitbitIntradayHeartRateVariabilityRoute(FitbitRequestGenerator generator,
3938
UserRepository userRepository, AvroData avroData) {
@@ -47,16 +46,18 @@ protected String getUrlFormat(String baseUrl) {
4746
}
4847

4948
protected Stream<FitbitRestRequest> createRequests(User user) {
50-
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
51-
.atZone(UTC)
52-
.truncatedTo(SECONDS);
53-
ZonedDateTime now = ZonedDateTime.now(UTC);
54-
return Stream.of(newRequest(user, new DateRange(startDate, now),
55-
user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now)));
49+
return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS))
50+
.map(dateRange -> newRequest(user, dateRange,
51+
user.getExternalUserId(), DATE_FORMAT.format(dateRange.start())));
52+
}
53+
54+
@Override
55+
Duration getDateRangeInterval() {
56+
return REQUEST_INTERVAL;
5657
}
5758

5859
@Override
5960
public FitbitIntradayHeartRateVariabilityAvroConverter converter() {
6061
return converter;
6162
}
62-
}
63+
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ private TemporalAmount getEndDateThreshold() {
326326
}
327327

328328
/**
329-
* Generate one date per day, using UTC time zone. The first date will have the time from the
329+
* Generate one date per day (or specified rangeInterval), using UTC time zone. The first date will have the time from the
330330
* given startDate. Following time stamps will start at 00:00. This will not up to the date of
331331
* {@link #getLookbackTime()} (exclusive).
332332
*/
@@ -345,12 +345,12 @@ Stream<DateRange> startDateGenerator(Instant startDate) {
345345
return Stream.empty();
346346
}
347347
} else {
348-
long numElements = DAYS.between(startDate, lookBack);
348+
Duration rangeInterval = getDateRangeInterval();
349349

350350
Stream<DateRange> elements = Stream
351-
.iterate(dateTime, t -> t.plus(ONE_DAY).truncatedTo(DAYS))
352-
.limit(numElements)
353-
.map(s -> new DateRange(s, s.plus(ONE_DAY).truncatedTo(DAYS).minus(ONE_NANO)));
351+
.iterate(dateTime, t -> t.plus(rangeInterval).truncatedTo(DAYS))
352+
.takeWhile(u -> u.isBefore(lookBackDateStart))
353+
.map(s -> new DateRange(s, s.plus(rangeInterval).truncatedTo(DAYS).minus(ONE_NANO)));
354354

355355
// we're polling at exactly 00:00, should not poll the last date
356356
if (lookBackDateStart.equals(lookBackDate)) {
@@ -362,4 +362,8 @@ Stream<DateRange> startDateGenerator(Instant startDate) {
362362
}
363363
}
364364
}
365+
366+
Duration getDateRangeInterval() {
367+
return ONE_DAY;
368+
}
365369
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitSkinTemperatureRoute.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,15 @@
2323
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
2424
import org.radarbase.connect.rest.fitbit.user.User;
2525
import org.radarbase.connect.rest.fitbit.user.UserRepository;
26-
import org.radarbase.connect.rest.fitbit.util.DateRange;
2726

28-
import java.time.ZonedDateTime;
2927
import java.util.stream.Stream;
3028

31-
import static java.time.ZoneOffset.UTC;
3229
import static java.time.temporal.ChronoUnit.SECONDS;
30+
import java.time.Duration;
3331

3432
public class FitbitSkinTemperatureRoute extends FitbitPollingRoute {
3533
private final FitbitSkinTemperatureAvroConverter converter;
34+
protected static final Duration REQUEST_INTERVAL = Duration.ofDays(30);
3635

3736
public FitbitSkinTemperatureRoute(FitbitRequestGenerator generator,
3837
UserRepository userRepository, AvroData avroData) {
@@ -46,16 +45,18 @@ protected String getUrlFormat(String baseUrl) {
4645
}
4746

4847
protected Stream<FitbitRestRequest> createRequests(User user) {
49-
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
50-
.atZone(UTC)
51-
.truncatedTo(SECONDS);
52-
ZonedDateTime now = ZonedDateTime.now(UTC);
53-
return Stream.of(newRequest(user, new DateRange(startDate, now),
54-
user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now)));
48+
return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS))
49+
.map(dateRange -> newRequest(user, dateRange,
50+
user.getExternalUserId(), DATE_FORMAT.format(dateRange.start())));
51+
}
52+
53+
@Override
54+
Duration getDateRangeInterval() {
55+
return REQUEST_INTERVAL;
5556
}
5657

5758
@Override
5859
public FitbitSkinTemperatureAvroConverter converter() {
5960
return converter;
6061
}
61-
}
62+
}

0 commit comments

Comments
 (0)