Skip to content

Commit 98c260e

Browse files
Merge pull request #48 from RADAR-base/release-0.2.4
Release 0.2.4
2 parents b71a521 + f015cbf commit 98c260e

File tree

9 files changed

+155
-4
lines changed

9 files changed

+155
-4
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ your Fitbit App client ID and client secret. The following tables shows the poss
7474
<td>fitbit.time.zone.topic</td></td><td>Topic for Fitbit profile time zone</td></td><td>string</td></td><td>connect_fitbit_time_zone</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
7575
<tr>
7676
<td>fitbit.activity.log.topic</td></td><td>Topic for Fitbit activity log.</td></td><td>string</td></td><td>connect_fitbit_activity_log</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
77+
<tr>
78+
<td>fitbit.intraday.calories.topic</td></td><td>Topic for Fitbit intraday calories</td></td><td>string</td></td><td>connect_fitbit_intraday_calories</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
7779
</tbody></table>
7880

7981
Now you can run a full Kafka stack using

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ subprojects {
1111
apply plugin: 'java-library'
1212

1313
group = 'org.radarbase'
14-
version = '0.2.3'
14+
version = '0.2.4'
1515

1616
sourceCompatibility = 1.8
1717
targetCompatibility = 1.8

kafka-connect-fitbit-source/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
dependencies {
22
api project(':kafka-connect-rest-source')
33
api group: 'io.confluent', name: 'kafka-connect-avro-converter', version: confluentVersion
4-
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.1'
4+
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.3'
5+
56

67
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
78
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
118118
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DEFAULT = "connect_fitbit_activity_log";
119119
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DISPLAY = "Activity log topic";
120120

121+
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_CONFIG = "fitbit.intraday.calories.topic";
122+
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DOC = "Topic for Fitbit intraday calories";
123+
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY = "Intraday calories topic";
124+
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT = "connect_fitbit_intraday_calories";
125+
121126
private final UserRepository userRepository;
122127
private final Headers clientCredentials;
123128

@@ -302,6 +307,17 @@ public String toString() {
302307
++orderInGroup,
303308
Width.SHORT,
304309
FITBIT_ACTIVITY_LOG_TOPIC_DISPLAY)
310+
311+
.define(FITBIT_INTRADAY_CALORIES_TOPIC_CONFIG,
312+
Type.STRING,
313+
FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT,
314+
nonControlChar,
315+
Importance.LOW,
316+
FITBIT_INTRADAY_CALORIES_TOPIC_DOC,
317+
group,
318+
++orderInGroup,
319+
Width.SHORT,
320+
FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY)
305321
;
306322
}
307323

@@ -378,4 +394,8 @@ public Duration getPollIntervalPerUser() {
378394
public Duration getTooManyRequestsCooldownInterval() {
379395
return Duration.ofHours(1);
380396
}
397+
398+
public String getFitbitIntradayCaloriesTopic() {
399+
return getString(FITBIT_INTRADAY_CALORIES_TOPIC_CONFIG);
400+
}
381401
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package org.radarbase.connect.rest.fitbit.converter;
2+
3+
import static org.radarbase.connect.rest.util.ThrowingFunction.tryOrNull;
4+
5+
import com.fasterxml.jackson.databind.JsonNode;
6+
import io.confluent.connect.avro.AvroData;
7+
import java.time.Instant;
8+
import java.time.LocalTime;
9+
import java.time.ZonedDateTime;
10+
import java.util.stream.Stream;
11+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
12+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
13+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
14+
import org.radarcns.connector.fitbit.FitbitIntradayCalories;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
public class FitbitIntradayCaloriesAvroConverter extends FitbitAvroConverter {
19+
20+
private static final Logger logger =
21+
LoggerFactory.getLogger(FitbitIntradayCaloriesAvroConverter.class);
22+
23+
private String caloriesTopic;
24+
25+
public FitbitIntradayCaloriesAvroConverter(AvroData avroData) {
26+
super(avroData);
27+
}
28+
29+
@Override
30+
protected Stream<TopicData> processRecords(
31+
FitbitRestRequest request, JsonNode root, double timeReceived) {
32+
JsonNode intraday = root.get("activities-calories-intraday");
33+
if (intraday == null) {
34+
return Stream.empty();
35+
}
36+
37+
JsonNode dataset = intraday.get("dataset");
38+
if (dataset == null) {
39+
return Stream.empty();
40+
}
41+
42+
int interval = getRecordInterval(intraday, 60);
43+
44+
// Used as the date to convert the local times in the dataset to absolute times.
45+
ZonedDateTime startDate = request.getDateRange().end();
46+
47+
return iterableToStream(dataset)
48+
.map(
49+
tryOrNull(
50+
activity -> {
51+
Instant time =
52+
startDate.with(LocalTime.parse(activity.get("time").asText())).toInstant();
53+
54+
FitbitIntradayCalories calories =
55+
new FitbitIntradayCalories(
56+
time.toEpochMilli() / 1000d,
57+
timeReceived,
58+
interval,
59+
activity.get("value").asDouble(),
60+
activity.get("level").asInt(),
61+
activity.get("mets").asDouble());
62+
63+
return new TopicData(time, caloriesTopic, calories);
64+
},
65+
(a, ex) ->
66+
logger.warn(
67+
"Failed to convert calories from request {} of user {}, {}",
68+
request.getRequest().url(),
69+
request.getUser(),
70+
a,
71+
ex)));
72+
}
73+
74+
@Override
75+
public void initialize(RestSourceConnectorConfig config) {
76+
caloriesTopic = ((FitbitRestSourceConnectorConfig) config).getFitbitIntradayCaloriesTopic();
77+
logger.info("Using calories topic {}", caloriesTopic);
78+
}
79+
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.radarbase.connect.rest.RestSourceConnectorConfig;
3636
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
3737
import org.radarbase.connect.rest.fitbit.route.FitbitActivityLogRoute;
38+
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayCaloriesRoute;
3839
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayHeartRateRoute;
3940
import org.radarbase.connect.rest.fitbit.route.FitbitIntradayStepsRoute;
4041
import org.radarbase.connect.rest.fitbit.route.FitbitSleepRoute;
@@ -90,6 +91,7 @@ private List<RequestRoute> getRoutes(FitbitRestSourceConnectorConfig config) {
9091
if (config.hasIntradayAccess()) {
9192
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
9293
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
94+
localRoutes.add(new FitbitIntradayCaloriesRoute(this, userRepository, avroData));
9395
}
9496
return localRoutes;
9597
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.radarbase.connect.rest.fitbit.route;
2+
3+
import static java.time.temporal.ChronoUnit.MINUTES;
4+
5+
import io.confluent.connect.avro.AvroData;
6+
import java.util.stream.Stream;
7+
import org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter;
8+
import org.radarbase.connect.rest.fitbit.converter.FitbitIntradayCaloriesAvroConverter;
9+
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
10+
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
11+
import org.radarbase.connect.rest.fitbit.user.User;
12+
import org.radarbase.connect.rest.fitbit.user.UserRepository;
13+
14+
public class FitbitIntradayCaloriesRoute extends FitbitPollingRoute {
15+
16+
private final FitbitIntradayCaloriesAvroConverter caloriesAvroConverter;
17+
18+
public FitbitIntradayCaloriesRoute(
19+
FitbitRequestGenerator generator, UserRepository userRepository, AvroData avroData) {
20+
super(generator, userRepository, "intraday_calories");
21+
caloriesAvroConverter = new FitbitIntradayCaloriesAvroConverter(avroData);
22+
}
23+
24+
@Override
25+
protected Stream<FitbitRestRequest> createRequests(User user) {
26+
return startDateGenerator(this.getOffset(user).plus(ONE_MINUTE).truncatedTo(MINUTES))
27+
.map(
28+
dateRange ->
29+
newRequest(
30+
user,
31+
dateRange,
32+
user.getExternalUserId(),
33+
DATE_FORMAT.format(dateRange.start()),
34+
TIME_FORMAT.format(dateRange.start()),
35+
TIME_FORMAT.format(dateRange.end())));
36+
}
37+
38+
@Override
39+
protected String getUrlFormat(String baseUrl) {
40+
return baseUrl + "/1/user/%s/activities/calories/date/%s/1d/1min/time/%s/%s.json?timezone=UTC";
41+
}
42+
43+
@Override
44+
public PayloadToSourceRecordConverter converter() {
45+
return caloriesAvroConverter;
46+
}
47+
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitIntradayStepsRoute.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.radarbase.connect.rest.fitbit.user.UserRepository;
3030

3131
public class FitbitIntradayStepsRoute extends FitbitPollingRoute {
32-
private static final TemporalAmount ONE_MINUTE = MINUTES.getDuration();
33-
3432
private final FitbitIntradayStepsAvroConverter converter;
3533

3634
public FitbitIntradayStepsRoute(FitbitRequestGenerator generator,

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static java.time.ZoneOffset.UTC;
2121
import static java.time.temporal.ChronoUnit.DAYS;
22+
import static java.time.temporal.ChronoUnit.MINUTES;
2223
import static java.time.temporal.ChronoUnit.NANOS;
2324
import static java.time.temporal.ChronoUnit.SECONDS;
2425
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT;
@@ -95,6 +96,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
9596
protected static final Duration ONE_DAY = DAYS.getDuration();
9697
protected static final Duration ONE_NANO = NANOS.getDuration();
9798
protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration();
99+
protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration();
98100

99101
private static final Logger logger = LoggerFactory.getLogger(FitbitSleepRoute.class);
100102

0 commit comments

Comments
 (0)