Skip to content

Commit 4430b9f

Browse files
Add BR and SkinTemp converters and routes
1 parent 8e43417 commit 4430b9f

File tree

7 files changed

+315
-9
lines changed

7 files changed

+315
-9
lines changed

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ object Versions {
1919
const val okhttp = "4.11.0"
2020

2121
const val firebaseAdmin = "9.1.0"
22-
const val radarSchemas = "0.8.6-SNAPSHOT"
22+
const val radarSchemas = "0.8.7-SNAPSHOT"
2323
const val ktor = "2.3.5"
2424

2525
const val junit = "5.9.3"

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,16 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
105105
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DISPLAY = "Intraday heart rate variability topic";
106106
private static final String FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_DEFAULT = "connect_fitbit_intraday_heart_rate_variability";
107107

108+
private static final String FITBIT_BREATHING_RATE_TOPIC_CONFIG = "fitbit.breathing.rate.topic";
109+
private static final String FITBIT_BREATHING_RATE_TOPIC_DOC = "Topic for Fitbit breathing rate";
110+
private static final String FITBIT_BREATHING_RATE_TOPIC_DISPLAY = "Breathing rate topic";
111+
private static final String FITBIT_BREATHING_RATE_TOPIC_DEFAULT = "connect_fitbit_breathing_rate";
112+
113+
private static final String FITBIT_SKIN_TEMPERATURE_TOPIC_CONFIG = "fitbit.skin.temperature.rate.topic";
114+
private static final String FITBIT_SKIN_TEMPERATURE_TOPIC_DOC = "Topic for Fitbit skin temperature";
115+
private static final String FITBIT_SKIN_TEMPERATURE_TOPIC_DISPLAY = "Skin temperature topic";
116+
private static final String FITBIT_SKIN_TEMPERATURE_TOPIC_DEFAULT = "connect_fitbit_skin_temperature";
117+
108118
private static final String FITBIT_RESTING_HEART_RATE_TOPIC_CONFIG = "fitbit.resting.heart.rate.topic";
109119
private static final String FITBIT_RESTING_HEART_RATE_TOPIC_DOC = "Topic for Fitbit resting heart_rate";
110120
private static final String FITBIT_RESTING_HEART_RATE_TOPIC_DISPLAY = "Resting heartrate topic";
@@ -467,6 +477,14 @@ public String getFitbitIntradayHeartRateVariabilityTopic() {
467477
return getString(FITBIT_INTRADAY_HEART_RATE_VARIABILITY_TOPIC_CONFIG);
468478
}
469479

480+
public String getFitbitBreathingRateTopic() {
481+
return getString(FITBIT_BREATHING_RATE_TOPIC_CONFIG);
482+
}
483+
484+
public String getFitbitSkinTemperatureTopic() {
485+
return getString(FITBIT_SKIN_TEMPERATURE_TOPIC_CONFIG);
486+
}
487+
470488
public String getFitbitRestingHeartRateTopic() {
471489
return getString(FITBIT_RESTING_HEART_RATE_TOPIC_CONFIG);
472490
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.converter;
19+
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import io.confluent.connect.avro.AvroData;
22+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
23+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
24+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
25+
import org.radarcns.connector.fitbit.FitbitBreathingRate;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.time.Instant;
30+
import java.time.LocalDateTime;
31+
import java.time.ZonedDateTime;
32+
import java.util.stream.Stream;
33+
34+
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
35+
36+
public class FitbitBreathingRateAvroConverter extends FitbitAvroConverter {
37+
private static final Logger logger = LoggerFactory.getLogger(FitbitBreathingRateAvroConverter.class);
38+
private String breathingRateTopic;
39+
40+
public FitbitBreathingRateAvroConverter(AvroData avroData) {
41+
super(avroData);
42+
}
43+
44+
@Override
45+
public void initialize(RestSourceConnectorConfig config) {
46+
breathingRateTopic = ((FitbitRestSourceConnectorConfig) config).getFitbitBreathingRateTopic();
47+
logger.info("Using breathing rate topic {}", breathingRateTopic);
48+
}
49+
50+
@Override
51+
protected Stream<TopicData> processRecords(FitbitRestRequest request, JsonNode root, double timeReceived) {
52+
JsonNode br = root.get("br");
53+
if (br == null || !br.isArray()) {
54+
logger.warn("No BR is provided for {}: {}", request, root);
55+
return Stream.empty();
56+
}
57+
ZonedDateTime startDate = request.getDateRange().end();
58+
59+
return iterableToStream(br)
60+
.filter(m -> m != null && m.isObject())
61+
.flatMap(FitbitAvroConverter::iterableToStream)
62+
.map(tryOrNull(m -> parseBr(m, startDate, timeReceived),
63+
(a, ex) -> logger.warn("Failed to convert breathing rate from request {}, {}", request, a, ex)));
64+
}
65+
66+
private TopicData parseBr(JsonNode data, ZonedDateTime startDate, double timeReceived) {
67+
Instant time = startDate.with(LocalDateTime.parse(data.get("dateTime").asText())).toInstant();
68+
JsonNode value = data.get("value");
69+
if (value == null || !value.isObject()) {
70+
return null;
71+
}
72+
FitbitBreathingRate fitbitBr = new FitbitBreathingRate(time.toEpochMilli() / 1000d,
73+
timeReceived,
74+
(float) value.get("deepSleepBrSummary").get("breathingRate").asDouble(),
75+
(float) value.get("remSleepBrSummary").get("breathingRate").asDouble(),
76+
(float) value.get("fullSleepBrSummary").get("breathingRate").asDouble(),
77+
(float) value.get("lightSleepBrSummary").get("breathingRate").asDouble());
78+
return new TopicData(time, breathingRateTopic, fitbitBr);
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.converter;
19+
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import io.confluent.connect.avro.AvroData;
22+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
23+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
24+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
25+
import org.radarcns.connector.fitbit.*;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.time.Instant;
30+
import java.time.LocalDateTime;
31+
import java.time.ZonedDateTime;
32+
import java.util.HashMap;
33+
import java.util.Map;
34+
import java.util.stream.Stream;
35+
36+
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
37+
38+
public class FitbitSkinTemperatureAvroConverter extends FitbitAvroConverter {
39+
private static final Logger logger = LoggerFactory.getLogger(FitbitSkinTemperatureAvroConverter.class);
40+
41+
private static final Map<String, FitbitSkinTemperatureLogType> LOG_TYPE_MAP = new HashMap<>();
42+
43+
static {
44+
LOG_TYPE_MAP.put("dedicated_temp_sensor", FitbitSkinTemperatureLogType.DEDICATED_TEMP_SENSOR);
45+
LOG_TYPE_MAP.put("other_sensors", FitbitSkinTemperatureLogType.OTHER_SENSORS);
46+
}
47+
48+
private String skinTemperatureTopic;
49+
50+
public FitbitSkinTemperatureAvroConverter(AvroData avroData) {
51+
super(avroData);
52+
}
53+
54+
@Override
55+
public void initialize(RestSourceConnectorConfig config) {
56+
skinTemperatureTopic = ((FitbitRestSourceConnectorConfig) config).getFitbitSkinTemperatureTopic();
57+
logger.info("Using skin temperature topic {}", skinTemperatureTopic);
58+
}
59+
60+
@Override
61+
protected Stream<TopicData> processRecords(FitbitRestRequest request, JsonNode root, double timeReceived) {
62+
JsonNode tempSkin = root.get("tempSkin");
63+
if (tempSkin == null || !tempSkin.isArray()) {
64+
logger.warn("No tempSkin is provided for {}: {}", request, root);
65+
return Stream.empty();
66+
}
67+
ZonedDateTime startDate = request.getDateRange().end();
68+
69+
return iterableToStream(tempSkin)
70+
.filter(m -> m != null && m.isObject())
71+
.flatMap(FitbitAvroConverter::iterableToStream)
72+
.map(tryOrNull(m -> parseTempSkin(m, startDate, timeReceived),
73+
(a, ex) -> logger.warn("Failed to convert skin temperature from request {}, {}", request, a, ex)));
74+
}
75+
76+
private TopicData parseTempSkin(JsonNode data, ZonedDateTime startDate, double timeReceived) {
77+
Instant time = startDate.with(LocalDateTime.parse(data.get("dateTime").asText())).toInstant();
78+
JsonNode value = data.get("value");
79+
if (value == null || !value.isObject()) {
80+
return null;
81+
}
82+
String logType = data.get("level").asText();
83+
FitbitSkinTemperature fitbitHrv = new FitbitSkinTemperature(
84+
time.toEpochMilli() / 1000d,
85+
timeReceived,
86+
(float) value.get("nightlyRelative").asDouble(),
87+
LOG_TYPE_MAP.getOrDefault(logType, FitbitSkinTemperatureLogType.UNKNOWN)
88+
);
89+
return new TopicData(time, skinTemperatureTopic, fitbitHrv);
90+
}
91+
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,7 @@
3535
import okhttp3.OkHttpClient;
3636
import org.radarbase.connect.rest.RestSourceConnectorConfig;
3737
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
38-
import org.radarbase.connect.rest.fitbit.route.FitbitActivityLogRoute;
39-
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayCaloriesRoute;
40-
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayHeartRateRoute;
41-
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayHeartRateVariabilityRoute;
42-
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayStepsRoute;
43-
import org.radarbase.connect.rest.fitbit.route.FitbitRestingHeartRateRoute;
44-
import org.radarbase.connect.rest.fitbit.route.FitbitSleepRoute;
45-
import org.radarbase.connect.rest.fitbit.route.FitbitTimeZoneRoute;
38+
import org.radarbase.connect.rest.fitbit.route.*;
4639
import org.radarbase.connect.rest.fitbit.user.User;
4740
import org.radarbase.connect.rest.fitbit.user.UserRepository;
4841
import org.radarbase.connect.rest.request.RequestGeneratorRouter;
@@ -96,6 +89,8 @@ private List<RequestRoute> getRoutes(FitbitRestSourceConnectorConfig config) {
9689
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
9790
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
9891
localRoutes.add(new FitbitIntradayHeartRateVariabilityRoute(this, userRepository, avroData));
92+
localRoutes.add(new FitbitBreathingRateRoute(this, userRepository, avroData));
93+
localRoutes.add(new FitbitSkinTemperatureRoute(this, userRepository, avroData));
9994
localRoutes.add(new FitbitIntradayCaloriesRoute(this, userRepository, avroData));
10095
}
10196
return localRoutes;
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.route;
19+
20+
import io.confluent.connect.avro.AvroData;
21+
import org.radarbase.connect.rest.fitbit.converter.FitbitBreathingRateAvroConverter;
22+
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
23+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
24+
import org.radarbase.connect.rest.fitbit.user.User;
25+
import org.radarbase.connect.rest.fitbit.user.UserRepository;
26+
import org.radarbase.connect.rest.fitbit.util.DateRange;
27+
28+
import java.time.ZonedDateTime;
29+
import java.util.stream.Stream;
30+
31+
import static java.time.ZoneOffset.UTC;
32+
import static java.time.temporal.ChronoUnit.SECONDS;
33+
34+
public class FitbitBreathingRateRoute extends FitbitPollingRoute {
35+
private final FitbitBreathingRateAvroConverter converter;
36+
37+
public FitbitBreathingRateRoute(FitbitRequestGenerator generator,
38+
UserRepository userRepository, AvroData avroData) {
39+
super(generator, userRepository, "breathing_rate");
40+
this.converter = new FitbitBreathingRateAvroConverter(avroData);
41+
}
42+
43+
@Override
44+
protected String getUrlFormat(String baseUrl) {
45+
return baseUrl + "/1/user/%s/br/date/%s/%s/all.json";
46+
}
47+
48+
protected Stream<FitbitRestRequest> createRequests(User user) {
49+
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
50+
.atZone(UTC)
51+
.truncatedTo(SECONDS);
52+
ZonedDateTime now = ZonedDateTime.now(UTC);
53+
return Stream.of(newRequest(user, new DateRange(startDate, now),
54+
user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now)));
55+
}
56+
57+
@Override
58+
public FitbitBreathingRateAvroConverter converter() {
59+
return converter;
60+
}
61+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.route;
19+
20+
import io.confluent.connect.avro.AvroData;
21+
import org.radarbase.connect.rest.fitbit.converter.FitbitSkinTemperatureAvroConverter;
22+
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
23+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
24+
import org.radarbase.connect.rest.fitbit.user.User;
25+
import org.radarbase.connect.rest.fitbit.user.UserRepository;
26+
import org.radarbase.connect.rest.fitbit.util.DateRange;
27+
28+
import java.time.ZonedDateTime;
29+
import java.util.stream.Stream;
30+
31+
import static java.time.ZoneOffset.UTC;
32+
import static java.time.temporal.ChronoUnit.SECONDS;
33+
34+
public class FitbitSkinTemperatureRoute extends FitbitPollingRoute {
35+
private final FitbitSkinTemperatureAvroConverter converter;
36+
37+
public FitbitSkinTemperatureRoute(FitbitRequestGenerator generator,
38+
UserRepository userRepository, AvroData avroData) {
39+
super(generator, userRepository, "skin_temperature");
40+
this.converter = new FitbitSkinTemperatureAvroConverter(avroData);
41+
}
42+
43+
@Override
44+
protected String getUrlFormat(String baseUrl) {
45+
return baseUrl + "/1/user/%s/temp/skin/date/%s/%s/.json";
46+
}
47+
48+
protected Stream<FitbitRestRequest> createRequests(User user) {
49+
ZonedDateTime startDate = this.getOffset(user).plus(ONE_SECOND)
50+
.atZone(UTC)
51+
.truncatedTo(SECONDS);
52+
ZonedDateTime now = ZonedDateTime.now(UTC);
53+
return Stream.of(newRequest(user, new DateRange(startDate, now),
54+
user.getExternalUserId(), DATE_FORMAT.format(startDate), DATE_FORMAT.format(now)));
55+
}
56+
57+
@Override
58+
public FitbitSkinTemperatureAvroConverter converter() {
59+
return converter;
60+
}
61+
}

0 commit comments

Comments
 (0)