Skip to content

Commit 78da8e3

Browse files
authored
Merge pull request #105 from RADAR-base/fitbit-hrv
Added Fitbit Intraday HRV route
2 parents 0300a6a + 8e43417 commit 78da8e3

File tree

5 files changed

+156
-1
lines changed

5 files changed

+156
-1
lines changed

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ object Versions {
1919
const val okhttp = "4.11.0"
2020

2121
const val firebaseAdmin = "9.1.0"
22-
const val radarSchemas = "0.8.4"
22+
const val radarSchemas = "0.8.6-SNAPSHOT"
2323
const val ktor = "2.3.5"
2424

2525
const val junit = "5.9.3"

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
100100
private static final String FITBIT_INTRADAY_HEART_RATE_TOPIC_DISPLAY = "Intraday heartrate topic";
101101
private static final String FITBIT_INTRADAY_HEART_RATE_TOPIC_DEFAULT = "connect_fitbit_intraday_heart_rate";
102102

103+
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG = "fitbit.intraday.heart.rate.variability.topic";
104+
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DOC = "Topic for Fitbit intraday intraday_heart_rate_variability";
105+
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY = "Intraday heart rate variability topic";
106+
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DEFAULT = "connect_fitbit_intraday_heart_rate_variability";
107+
103108
private static final String FITBIT_RESTING_HEART_RATE_TOPIC_CONFIG = "fitbit.resting.heart.rate.topic";
104109
private static final String FITBIT_RESTING_HEART_RATE_TOPIC_DOC = "Topic for Fitbit resting heart_rate";
105110
private static final String FITBIT_RESTING_HEART_RATE_TOPIC_DISPLAY = "Resting heartrate topic";
@@ -458,6 +463,10 @@ public String getFitbitIntradayHeartRateTopic() {
458463
return getString(FITBIT_INTRADAY_HEART_RATE_TOPIC_CONFIG);
459464
}
460465

466+
public String getFitbitIntradayHeartRateVariabilityTopic() {
467+
return getString(FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG);
468+
}
469+
461470
public String getFitbitRestingHeartRateTopic() {
462471
return getString(FITBIT_RESTING_HEART_RATE_TOPIC_CONFIG);
463472
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.converter;
19+
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import io.confluent.connect.avro.AvroData;
22+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
23+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
24+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
25+
import org.radarcns.connector.fitbit.FitbitIntradayHeartRateVariability;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.time.Instant;
30+
import java.time.LocalDateTime;
31+
import java.time.ZonedDateTime;
32+
import java.util.stream.Stream;
33+
34+
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
35+
36+
public class FitbitIntradayHeartRateVariabilityAvroConverter extends FitbitAvroConverter {
37+
private static final Logger logger = LoggerFactory.getLogger(FitbitIntradayHeartRateVariabilityAvroConverter.class);
38+
private String heartRateVariabilityTopic;
39+
40+
public FitbitIntradayHeartRateVariabilityAvroConverter(AvroData avroData) {
41+
super(avroData);
42+
}
43+
44+
@Override
45+
public void initialize(RestSourceConnectorConfig config) {
46+
heartRateVariabilityTopic = ((FitbitRestSourceConnectorConfig) config).getFitbitIntradayHeartRateVariabilityTopic();
47+
logger.info("Using intraday heart rate variability topic {}", heartRateVariabilityTopic);
48+
}
49+
50+
@Override
51+
protected Stream<TopicData> processRecords(FitbitRestRequest request, JsonNode root, double timeReceived) {
52+
JsonNode hrv = root.get("hrv");
53+
if (hrv == null || !hrv.isArray()) {
54+
logger.warn("No HRV is provided for {}: {}", request, root);
55+
return Stream.empty();
56+
}
57+
ZonedDateTime startDate = request.getDateRange().end();
58+
59+
return iterableToStream(hrv)
60+
.filter(m -> m != null && m.isObject())
61+
.map(m -> m.get("minutes"))
62+
.filter(minutes -> minutes != null && minutes.isArray())
63+
.flatMap(FitbitAvroConverter::iterableToStream)
64+
.map(tryOrNull(minuteData -> parseHrv(minuteData, startDate, timeReceived),
65+
(a, ex) -> logger.warn("Failed to convert heart rate variability from request {}, {}", request, a, ex)));
66+
}
67+
68+
private TopicData parseHrv(JsonNode minuteData, ZonedDateTime startDate, double timeReceived) {
69+
Instant time = startDate.with(LocalDateTime.parse(minuteData.get("minute").asText())).toInstant();
70+
JsonNode value = minuteData.get("value");
71+
if (value == null || !value.isObject()) {
72+
return null;
73+
}
74+
FitbitIntradayHeartRateVariability fitbitHrv = new FitbitIntradayHeartRateVariability(time.toEpochMilli() / 1000d,
75+
timeReceived,
76+
(float) value.get("dailyRmssd").asDouble(),
77+
(float) value.get("coverage").asDouble(),
78+
(float) value.get("hf").asDouble(),
79+
(float) value.get("lf").asDouble());
80+
return new TopicData(time, heartRateVariabilityTopic, fitbitHrv);
81+
}
82+
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.radarbase.connect.rest.fitbit.route.FitbitActivityLogRoute;
3939
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayCaloriesRoute;
4040
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayHeartRateRoute;
41+
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayHeartRateVariabilityRoute;
4142
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayStepsRoute;
4243
import org.radarbase.connect.rest.fitbit.route.FitbitRestingHeartRateRoute;
4344
import org.radarbase.connect.rest.fitbit.route.FitbitSleepRoute;
@@ -94,6 +95,7 @@ private List<RequestRoute> getRoutes(FitbitRestSourceConnectorConfig config) {
9495
if (config.hasIntradayAccess()) {
9596
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
9697
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
98+
localRoutes.add(new FitbitIntradayHeartRateVariabilityRoute(this, userRepository, avroData));
9799
localRoutes.add(new FitbitIntradayCaloriesRoute(this, userRepository, avroData));
98100
}
99101
return localRoutes;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.route;
19+
20+
import io.confluent.connect.avro.AvroData;
21+
import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateAvroConverter;
22+
import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayHeartRateVariabilityAvroConverter;
23+
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
24+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
25+
import org.radarbase.connect.rest.fitbit.user.User;
26+
import org.radarbase.connect.rest.fitbit.user.UserRepository;
27+
import org.radarbase.connect.rest.fitbit.util.DateRange;
28+
29+
import java.time.ZonedDateTime;
30+
import java.util.stream.Stream;
31+
32+
import static java.time.ZoneOffset.UTC;
33+
import static java.time.temporal.ChronoUnit.SECONDS;
34+
35+
public class FitbitIntradayHeartRateVariabilityRoute extends FitbitPollingRoute {
36+
private final FitbitIntradayHeartRateVariabilityAvroConverter converter;
37+
38+
public FitbitIntradayHeartRateVariabilityRoute(FitbitRequestGenerator generator,
39+
UserRepository userRepository, AvroData avroData) {
40+
super(generator, userRepository, "intraday_heart_rate_variability");
41+
this.converter = new FitbitIntradayHeartRateVariabilityAvroConverter(avroData);
42+
}
43+
44+
@Override
45+
protected String getUrlFormat(String baseUrl) {
46+
return baseUrl + "/1/user/%s/hrv/date/%s/%s/all.json";
47+
}
48+
49+
protected Stream<FitbitRestRequest> createRequests(User user) {
50+
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
51+
.atZone(UTC)
52+
.truncatedTo(SECONDS);
53+
ZonedDateTime now = ZonedDateTime.now(UTC);
54+
return Stream.of(newRequest(user, new DateRange(startDate, now),
55+
user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now)));
56+
}
57+
58+
@Override
59+
public FitbitIntradayHeartRateVariabilityAvroConverter converter() {
60+
return converter;
61+
}
62+
}

0 commit comments

Comments
 (0)