Skip to content

Commit 9bd8046

Browse files
authored
Merge pull request #119 from RADAR-base/add-spo2
Add Fitbit spo2 route and converter
2 parents aabce98 + 835101d commit 9bd8046

File tree

3 files changed

+162
-0
lines changed

3 files changed

+162
-0
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
106106
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY = "Intraday heart rate variability topic";
107107
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DEFAULT = "connect_fitbit_intraday_heart_rate_variability";
108108

109+
private static final String FITBIT_INTRADAY_SPO2_TOPIC_CONFIG = "fitbit.intraday.spo2.topic";
110+
private static final String FITBIT_INTRADAY_SPO2_TOPIC_DOC = "Topic for Fitbit intraday intraday_spo2";
111+
private static final String FITBIT_INTRADAY_SPO2_TOPIC_DISPLAY = "Intraday spo2 topic";
112+
private static final String FITBIT_INTRADAY_SPO2_TOPIC_DEFAULT = "connect_fitbit_intraday_spo2";
113+
109114
private static final String FITBIT_BREATHING_RATE_TOPIC_CONFIG = "fitbit.breathing.rate.topic";
110115
private static final String FITBIT_BREATHING_RATE_TOPIC_DOC = "Topic for Fitbit breathing rate";
111116
private static final String FITBIT_BREATHING_RATE_TOPIC_DISPLAY = "Breathing rate topic";
@@ -343,6 +348,17 @@ public String toString() {
343348
Width.SHORT,
344349
FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY)
345350

351+
.define(FITBIT_INTRADAY_SPO2_TOPIC_CONFIG,
352+
Type.STRING,
353+
FITBIT_INTRADAY_SPO2_TOPIC_DEFAULT,
354+
nonControlChar,
355+
Importance.LOW,
356+
FITBIT_INTRADAY_SPO2_TOPIC_DOC,
357+
group,
358+
++orderInGroup,
359+
Width.SHORT,
360+
FITBIT_INTRADAY_SPO2_TOPIC_DISPLAY)
361+
346362
.define(FITBIT_BREATHING_RATE_TOPIC_CONFIG,
347363
Type.STRING,
348364
FITBIT_BREATHING_RATE_TOPIC_DEFAULT,
@@ -511,6 +527,10 @@ public String getFitbitIntradayHeartRateVariabilityTopic() {
511527
return getString(FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG);
512528
}
513529

530+
public String getFitbitIntradaySpo2Topic() {
531+
return getString(FITBIT_INTRADAY_SPO2_TOPIC_CONFIG);
532+
}
533+
514534
public String getFitbitBreathingRateTopic() {
515535
return getString(FITBIT_BREATHING_RATE_TOPIC_CONFIG);
516536
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.converter;
19+
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import io.confluent.connect.avro.AvroData;
22+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
23+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
24+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
25+
import org.radarcns.connector.fitbit.FitbitIntradaySpo2;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.time.Instant;
30+
import java.time.LocalDateTime;
31+
import java.time.ZonedDateTime;
32+
import java.util.stream.Stream;
33+
34+
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
35+
36+
public class FitbitIntradaySpo2AvroConverter extends FitbitAvroConverter {
37+
private static final Logger logger = LoggerFactory.getLogger(FitbitIntradaySpo2AvroConverter.class);
38+
private String spo2Topic;
39+
40+
public FitbitIntradaySpo2AvroConverter(AvroData avroData) {
41+
super(avroData);
42+
}
43+
44+
@Override
45+
public void initialize(RestSourceConnectorConfig config) {
46+
spo2Topic = ((FitbitRestSourceConnectorConfig) config).getFitbitIntradaySpo2Topic();
47+
logger.info("Using intraday spo2 topic {}", spo2Topic);
48+
}
49+
50+
@Override
51+
protected Stream<TopicData> processRecords(FitbitRestRequest request, JsonNode root, double timeReceived) {
52+
JsonNode spo2 = root;
53+
if (spo2 == null || !spo2.isArray()) {
54+
logger.warn("No Spo2 is provided for {}: {}", request, root);
55+
return Stream.empty();
56+
}
57+
ZonedDateTime startDate = request.getDateRange().end();
58+
59+
return iterableToStream(spo2)
60+
.filter(m -> m != null && m.isObject())
61+
.map(m -> m.get("minutes"))
62+
.filter(minutes -> minutes != null && minutes.isArray())
63+
.flatMap(FitbitAvroConverter::iterableToStream)
64+
.map(tryOrNull(minuteData -> parseSpo2(minuteData, startDate, timeReceived),
65+
(a, ex) -> logger.warn("Failed to convert spo2 from request {}, {}", request, a, ex)));
66+
}
67+
68+
private TopicData parseSpo2(JsonNode minuteData, ZonedDateTime startDate, double timeReceived) {
69+
Instant time = startDate.with(LocalDateTime.parse(minuteData.get("minute").asText())).toInstant();
70+
Float value = (float) minuteData.get("value").asDouble();
71+
if (value == null) {
72+
return null;
73+
}
74+
FitbitIntradaySpo2 fitbitSpo2 = new FitbitIntradaySpo2(time.toEpochMilli() / 1000d,
75+
timeReceived,
76+
(float) value);
77+
return new TopicData(time, spo2Topic, fitbitSpo2);
78+
}
79+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.route;
19+
20+
import io.confluent.connect.avro.AvroData;
21+
import org.radarbase.connect.rest.fitbit.converter.FitbitIntradaySpo2AvroConverter;
22+
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
23+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
24+
import org.radarbase.connect.rest.fitbit.user.User;
25+
import org.radarbase.connect.rest.fitbit.user.UserRepository;
26+
27+
import java.util.stream.Stream;
28+
29+
import static java.time.temporal.ChronoUnit.SECONDS;
30+
import java.time.Duration;
31+
32+
33+
public class FitbitIntradaySpo2Route extends FitbitPollingRoute {
34+
private final FitbitIntradaySpo2AvroConverter converter;
35+
36+
public FitbitIntradaySpo2Route(FitbitRequestGenerator generator,
37+
UserRepository userRepository, AvroData avroData) {
38+
super(generator, userRepository, "intraday_spo2");
39+
this.converter = new FitbitIntradaySpo2AvroConverter(avroData);
40+
}
41+
42+
@Override
43+
protected String getUrlFormat(String baseUrl) {
44+
return baseUrl + "/1/user/%s/spo2/date/%s/%s/all.json";
45+
}
46+
47+
protected Stream<FitbitRestRequest> createRequests(User user) {
48+
return startDateGenerator(getOffset(user).plus(ONE_SECOND).truncatedTo(SECONDS))
49+
.map(dateRange -> newRequest(user, dateRange,
50+
user.getExternalUserId(), DATE_FORMAT.format(dateRange.start()), DATE_FORMAT.format(dateRange.end())));
51+
}
52+
53+
/** Limit range to 30 days as documented here: https://dev.fitbit.com/build/reference/web-api/intraday/get-spo2-intraday-by-interval/ */
54+
@Override
55+
Duration getDateRangeInterval() {
56+
return THIRTY_DAYS;
57+
}
58+
59+
@Override
60+
public FitbitIntradaySpo2AvroConverter converter() {
61+
return converter;
62+
}
63+
}

0 commit comments

Comments
 (0)