Skip to content

Commit cbbd4db

Browse files
authored
Merge pull request #180 from RADAR-base/fix/oura-requests
Allow configurable Oura routes
2 parents 10d28dd + 12db4e7 commit cbbd4db

File tree

6 files changed

+286
-11
lines changed

6 files changed

+286
-11
lines changed

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,59 @@ public class OuraRestSourceConnectorConfig extends AbstractConfig {
102102
private static final String OURA_USER_REPOSITORY_TOKEN_URL_DOC = "OAuth 2.0 token url for retrieving client credentials.";
103103
private static final String OURA_USER_REPOSITORY_TOKEN_URL_DISPLAY = "OAuth 2.0 token URL.";
104104

105+
// Route enabled configs strings
106+
private static final String OURA_DAILY_ACTIVITY_ENABLED_CONFIG = "oura.daily.activity.enabled";
107+
private static final String OURA_DAILY_ACTIVITY_ENABLED_DOC = "Enable or disable Oura daily activity";
108+
private static final String OURA_DAILY_ACTIVITY_ENABLED_DISPLAY = "Oura daily activity enabled";
109+
110+
private static final String OURA_DAILY_READINESS_ENABLED_CONFIG = "oura.daily.readiness.enabled";
111+
private static final String OURA_DAILY_READINESS_ENABLED_DOC = "Enable or disable Oura daily readiness";
112+
private static final String OURA_DAILY_READINESS_ENABLED_DISPLAY = "Oura daily readiness enabled";
113+
114+
private static final String OURA_DAILY_SLEEP_ENABLED_CONFIG = "oura.daily.sleep.enabled";
115+
private static final String OURA_DAILY_SLEEP_ENABLED_DOC = "Enable or disable Oura daily sleep";
116+
private static final String OURA_DAILY_SLEEP_ENABLED_DISPLAY = "Oura daily sleep enabled";
117+
118+
private static final String OURA_DAILY_OXYGEN_SATURATION_ENABLED_CONFIG = "oura.daily.oxygen.saturation.enabled";
119+
private static final String OURA_DAILY_OXYGEN_SATURATION_ENABLED_DOC = "Enable or disable Oura daily oxygen saturation";
120+
private static final String OURA_DAILY_OXYGEN_SATURATION_ENABLED_DISPLAY = "Oura daily oxygen saturation enabled";
121+
122+
private static final String OURA_HEART_RATE_ENABLED_CONFIG = "oura.heart.rate.enabled";
123+
private static final String OURA_HEART_RATE_ENABLED_DOC = "Enable or disable Oura heart rate";
124+
private static final String OURA_HEART_RATE_ENABLED_DISPLAY = "Oura heart rate enabled";
125+
126+
private static final String OURA_PERSONAL_INFO_ENABLED_CONFIG = "oura.personal.info.enabled";
127+
private static final String OURA_PERSONAL_INFO_ENABLED_DOC = "Enable or disable Oura personal info";
128+
private static final String OURA_PERSONAL_INFO_ENABLED_DISPLAY = "Oura personal info enabled";
129+
130+
private static final String OURA_SESSION_ENABLED_CONFIG = "oura.session.enabled";
131+
private static final String OURA_SESSION_ENABLED_DOC = "Enable or disable Oura sessions";
132+
private static final String OURA_SESSION_ENABLED_DISPLAY = "Oura sessions enabled";
133+
134+
private static final String OURA_SLEEP_ENABLED_CONFIG = "oura.sleep.enabled";
135+
private static final String OURA_SLEEP_ENABLED_DOC = "Enable or disable Oura sleep";
136+
private static final String OURA_SLEEP_ENABLED_DISPLAY = "Oura sleep enabled";
137+
138+
private static final String OURA_TAG_ENABLED_CONFIG = "oura.tag.enabled";
139+
private static final String OURA_TAG_ENABLED_DOC = "Enable or disable Oura tags";
140+
private static final String OURA_TAG_ENABLED_DISPLAY = "Oura tags enabled";
141+
142+
private static final String OURA_WORKOUT_ENABLED_CONFIG = "oura.workout.enabled";
143+
private static final String OURA_WORKOUT_ENABLED_DOC = "Enable or disable Oura workouts";
144+
private static final String OURA_WORKOUT_ENABLED_DISPLAY = "Oura workouts enabled";
145+
146+
private static final String OURA_RING_CONFIGURATION_ENABLED_CONFIG = "oura.ring.configuration.enabled";
147+
private static final String OURA_RING_CONFIGURATION_ENABLED_DOC = "Enable or disable Oura ring configuration";
148+
private static final String OURA_RING_CONFIGURATION_ENABLED_DISPLAY = "Oura ring configuration enabled";
149+
150+
private static final String OURA_REST_MODE_PERIOD_ENABLED_CONFIG = "oura.rest.mode.period.enabled";
151+
private static final String OURA_REST_MODE_PERIOD_ENABLED_DOC = "Enable or disable Oura rest mode period";
152+
private static final String OURA_REST_MODE_PERIOD_ENABLED_DISPLAY = "Oura rest mode period enabled";
153+
154+
private static final String OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_CONFIG = "oura.sleep.time.recommendation.enabled";
155+
private static final String OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_DOC = "Enable or disable Oura sleep time recommendation";
156+
private static final String OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_DISPLAY = "Oura sleep time recommendation enabled";
157+
105158
private OuraUserRepository userRepository;
106159
private final Headers clientCredentials;
107160

@@ -240,6 +293,137 @@ public String toString() {
240293
++orderInGroup,
241294
Width.SHORT,
242295
OURA_USER_REPOSITORY_TOKEN_URL_DISPLAY)
296+
297+
// Route enable/disable flags
298+
.define(OURA_DAILY_ACTIVITY_ENABLED_CONFIG,
299+
Type.BOOLEAN,
300+
true,
301+
Importance.LOW,
302+
OURA_DAILY_ACTIVITY_ENABLED_DOC,
303+
group,
304+
++orderInGroup,
305+
Width.SHORT,
306+
OURA_DAILY_ACTIVITY_ENABLED_DISPLAY)
307+
308+
.define(OURA_DAILY_READINESS_ENABLED_CONFIG,
309+
Type.BOOLEAN,
310+
true,
311+
Importance.LOW,
312+
OURA_DAILY_READINESS_ENABLED_DOC,
313+
group,
314+
++orderInGroup,
315+
Width.SHORT,
316+
OURA_DAILY_READINESS_ENABLED_DISPLAY)
317+
318+
.define(OURA_DAILY_SLEEP_ENABLED_CONFIG,
319+
Type.BOOLEAN,
320+
true,
321+
Importance.LOW,
322+
OURA_DAILY_SLEEP_ENABLED_DOC,
323+
group,
324+
++orderInGroup,
325+
Width.SHORT,
326+
OURA_DAILY_SLEEP_ENABLED_DISPLAY)
327+
328+
.define(OURA_DAILY_OXYGEN_SATURATION_ENABLED_CONFIG,
329+
Type.BOOLEAN,
330+
true,
331+
Importance.LOW,
332+
OURA_DAILY_OXYGEN_SATURATION_ENABLED_DOC,
333+
group,
334+
++orderInGroup,
335+
Width.SHORT,
336+
OURA_DAILY_OXYGEN_SATURATION_ENABLED_DISPLAY)
337+
338+
.define(OURA_HEART_RATE_ENABLED_CONFIG,
339+
Type.BOOLEAN,
340+
true,
341+
Importance.LOW,
342+
OURA_HEART_RATE_ENABLED_DOC,
343+
group,
344+
++orderInGroup,
345+
Width.SHORT,
346+
OURA_HEART_RATE_ENABLED_DISPLAY)
347+
348+
.define(OURA_PERSONAL_INFO_ENABLED_CONFIG,
349+
Type.BOOLEAN,
350+
true,
351+
Importance.LOW,
352+
OURA_PERSONAL_INFO_ENABLED_DOC,
353+
group,
354+
++orderInGroup,
355+
Width.SHORT,
356+
OURA_PERSONAL_INFO_ENABLED_DISPLAY)
357+
358+
.define(OURA_SESSION_ENABLED_CONFIG,
359+
Type.BOOLEAN,
360+
true,
361+
Importance.LOW,
362+
OURA_SESSION_ENABLED_DOC,
363+
group,
364+
++orderInGroup,
365+
Width.SHORT,
366+
OURA_SESSION_ENABLED_DISPLAY)
367+
368+
.define(OURA_SLEEP_ENABLED_CONFIG,
369+
Type.BOOLEAN,
370+
true,
371+
Importance.LOW,
372+
OURA_SLEEP_ENABLED_DOC,
373+
group,
374+
++orderInGroup,
375+
Width.SHORT,
376+
OURA_SLEEP_ENABLED_DISPLAY)
377+
378+
.define(OURA_TAG_ENABLED_CONFIG,
379+
Type.BOOLEAN,
380+
true,
381+
Importance.LOW,
382+
OURA_TAG_ENABLED_DOC,
383+
group,
384+
++orderInGroup,
385+
Width.SHORT,
386+
OURA_TAG_ENABLED_DISPLAY)
387+
388+
.define(OURA_WORKOUT_ENABLED_CONFIG,
389+
Type.BOOLEAN,
390+
true,
391+
Importance.LOW,
392+
OURA_WORKOUT_ENABLED_DOC,
393+
group,
394+
++orderInGroup,
395+
Width.SHORT,
396+
OURA_WORKOUT_ENABLED_DISPLAY)
397+
398+
.define(OURA_RING_CONFIGURATION_ENABLED_CONFIG,
399+
Type.BOOLEAN,
400+
true,
401+
Importance.LOW,
402+
OURA_RING_CONFIGURATION_ENABLED_DOC,
403+
group,
404+
++orderInGroup,
405+
Width.SHORT,
406+
OURA_RING_CONFIGURATION_ENABLED_DISPLAY)
407+
408+
.define(OURA_REST_MODE_PERIOD_ENABLED_CONFIG,
409+
Type.BOOLEAN,
410+
true,
411+
Importance.LOW,
412+
OURA_REST_MODE_PERIOD_ENABLED_DOC,
413+
group,
414+
++orderInGroup,
415+
Width.SHORT,
416+
OURA_REST_MODE_PERIOD_ENABLED_DISPLAY)
417+
418+
.define(OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_CONFIG,
419+
Type.BOOLEAN,
420+
true,
421+
Importance.LOW,
422+
OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_DOC,
423+
group,
424+
++orderInGroup,
425+
Width.SHORT,
426+
OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_DISPLAY)
243427
;
244428
}
245429

@@ -327,4 +511,20 @@ public URL getOuraUserRepositoryTokenUrl() {
327511
}
328512
}
329513
}
514+
515+
// Route enabled getters
516+
public boolean getOuraDailyActivityEnabled() { return getBoolean(OURA_DAILY_ACTIVITY_ENABLED_CONFIG); }
517+
public boolean getOuraDailyReadinessEnabled() { return getBoolean(OURA_DAILY_READINESS_ENABLED_CONFIG); }
518+
public boolean getOuraDailySleepEnabled() { return getBoolean(OURA_DAILY_SLEEP_ENABLED_CONFIG); }
519+
public boolean getOuraDailyOxygenSaturationEnabled() { return getBoolean(OURA_DAILY_OXYGEN_SATURATION_ENABLED_CONFIG); }
520+
public boolean getOuraHeartRateEnabled() { return getBoolean(OURA_HEART_RATE_ENABLED_CONFIG); }
521+
public boolean getOuraPersonalInfoEnabled() { return getBoolean(OURA_PERSONAL_INFO_ENABLED_CONFIG); }
522+
public boolean getOuraSessionEnabled() { return getBoolean(OURA_SESSION_ENABLED_CONFIG); }
523+
public boolean getOuraSleepEnabled() { return getBoolean(OURA_SLEEP_ENABLED_CONFIG); }
524+
public boolean getOuraTagEnabled() { return getBoolean(OURA_TAG_ENABLED_CONFIG); }
525+
public boolean getOuraWorkoutEnabled() { return getBoolean(OURA_WORKOUT_ENABLED_CONFIG); }
526+
public boolean getOuraRingConfigurationEnabled() { return getBoolean(OURA_RING_CONFIGURATION_ENABLED_CONFIG); }
527+
public boolean getOuraRestModePeriodEnabled() { return getBoolean(OURA_REST_MODE_PERIOD_ENABLED_CONFIG); }
528+
public boolean getOuraSleepTimeRecommendationEnabled() { return getBoolean(OURA_SLEEP_TIME_RECOMMENDATION_ENABLED_CONFIG); }
529+
330530
}

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java

Lines changed: 79 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import java.io.IOException;
2323
import java.time.Instant;
2424
import java.util.Collections;
25+
import java.util.ArrayList;
2526
import java.util.HashMap;
2627
import java.util.Iterator;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.stream.Collectors;
3031
import java.util.stream.Stream;
32+
import java.time.Duration;
3133

3234
import org.apache.kafka.connect.data.SchemaAndValue;
3335
import org.apache.kafka.connect.errors.ConnectException;
@@ -44,7 +46,21 @@
4446
import org.radarbase.oura.request.OuraResult.Error;
4547
import org.radarbase.oura.request.OuraErrorBase;
4648
import org.radarbase.oura.request.RestRequest;
49+
import org.radarbase.oura.route.OuraRouteFactory;
4750
import org.radarbase.oura.route.Route;
51+
import org.radarbase.oura.route.OuraDailyActivityRoute;
52+
import org.radarbase.oura.route.OuraDailyReadinessRoute;
53+
import org.radarbase.oura.route.OuraDailySleepRoute;
54+
import org.radarbase.oura.route.OuraDailyOxygenSaturationRoute;
55+
import org.radarbase.oura.route.OuraHeartRateRoute;
56+
import org.radarbase.oura.route.OuraPersonalInfoRoute;
57+
import org.radarbase.oura.route.OuraSessionRoute;
58+
import org.radarbase.oura.route.OuraSleepRoute;
59+
import org.radarbase.oura.route.OuraTagRoute;
60+
import org.radarbase.oura.route.OuraWorkoutRoute;
61+
import org.radarbase.oura.route.OuraRingConfigurationRoute;
62+
import org.radarbase.oura.route.OuraRestModePeriodRoute;
63+
import org.radarbase.oura.route.OuraSleepTimeRecommendationRoute;
4864
import org.slf4j.Logger;
4965
import org.slf4j.LoggerFactory;
5066
import org.radarbase.oura.user.User;
@@ -64,18 +80,64 @@ public class OuraSourceTask extends SourceTask {
6480
private KafkaOffsetManager offsetManager;
6581
String TIMESTAMP_OFFSET_KEY = "timestamp";
6682
long TIMEOUT = 60000L;
83+
private int routeStartIndex = 0;
6784

6885
public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) {
6986
OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config;
7087
this.baseClient = new OkHttpClient();
7188

7289
this.userRepository = ouraConfig.getUserRepository();
7390
this.offsetManager = new KafkaOffsetManager(offsetStorageReader);
74-
this.ouraRequestGenerator = new OuraRequestGenerator(this.userRepository, this.offsetManager);
75-
this.routes = this.ouraRequestGenerator.getRoutes();
91+
this.routes = this.getRoutes(ouraConfig);
92+
this.ouraRequestGenerator = new OuraRequestGenerator(this.userRepository, this.offsetManager, this.routes);
7693
this.offsetManager.initialize(getPartitions());
7794
}
7895

96+
private List<Route> getRoutes(OuraRestSourceConnectorConfig config) {
97+
List<Route> routes = new ArrayList<>();
98+
99+
if (config.getOuraDailyActivityEnabled()) {
100+
routes.add(new OuraDailyActivityRoute(userRepository));
101+
}
102+
if (config.getOuraDailyReadinessEnabled()) {
103+
routes.add(new OuraDailyReadinessRoute(userRepository));
104+
}
105+
if (config.getOuraDailySleepEnabled()) {
106+
routes.add(new OuraDailySleepRoute(userRepository));
107+
}
108+
if (config.getOuraDailyOxygenSaturationEnabled()) {
109+
routes.add(new OuraDailyOxygenSaturationRoute(userRepository));
110+
}
111+
if (config.getOuraHeartRateEnabled()) {
112+
routes.add(new OuraHeartRateRoute(userRepository));
113+
}
114+
if (config.getOuraPersonalInfoEnabled()) {
115+
routes.add(new OuraPersonalInfoRoute(userRepository));
116+
}
117+
if (config.getOuraSessionEnabled()) {
118+
routes.add(new OuraSessionRoute(userRepository));
119+
}
120+
if (config.getOuraSleepEnabled()) {
121+
routes.add(new OuraSleepRoute(userRepository));
122+
}
123+
if (config.getOuraTagEnabled()) {
124+
routes.add(new OuraTagRoute(userRepository));
125+
}
126+
if (config.getOuraWorkoutEnabled()) {
127+
routes.add(new OuraWorkoutRoute(userRepository));
128+
}
129+
if (config.getOuraRingConfigurationEnabled()) {
130+
routes.add(new OuraRingConfigurationRoute(userRepository));
131+
}
132+
if (config.getOuraRestModePeriodEnabled()) {
133+
routes.add(new OuraRestModePeriodRoute(userRepository));
134+
}
135+
if (config.getOuraSleepTimeRecommendationEnabled()) {
136+
routes.add(new OuraSleepTimeRecommendationRoute(userRepository));
137+
}
138+
return routes;
139+
}
140+
79141
public List<Map<String, Object>> getPartitions() {
80142
try {
81143
return StreamsKt.asStream(userRepository.stream())
@@ -95,8 +157,21 @@ public Map<String, Object> getPartition(String route, User user) {
95157
}
96158

97159
public Stream<RestRequest> requests() {
98-
Stream<Route> routes = this.routes.stream();
99-
return routes.flatMap((Route r) -> StreamsKt.asStream(ouraRequestGenerator.requests(r, 100)));
160+
if (this.routes == null || this.routes.isEmpty()) {
161+
return Stream.empty();
162+
}
163+
164+
// Rotate routes so that all routes are requested in a round-robin manner
165+
List<Route> rotatedRoutes = getRotatedRoutes();
166+
return rotatedRoutes.stream()
167+
.flatMap((Route r) -> StreamsKt.asStream(ouraRequestGenerator.requests(r, 100)));
168+
}
169+
170+
private List<Route> getRotatedRoutes() {
171+
List<Route> rotatedRoutes = new ArrayList<>(this.routes);
172+
Collections.rotate(rotatedRoutes, routeStartIndex % this.routes.size());
173+
routeStartIndex = (routeStartIndex + 1) % this.routes.size();
174+
return rotatedRoutes;
100175
}
101176

102177
public Stream<SourceRecord> handleRequest(RestRequest req) throws IOException {

oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraDailyActivityClassConverter.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ class OuraDailyActivityClassConverter(
2525
val startTimeEpoch = startTime.toInstant().toEpochMilli() / 1000.0
2626
val timeReceivedEpoch = System.currentTimeMillis() / 1000.0
2727
val id = this.get("id").textValue()
28-
val items = this.get("class_5_min").textValue().toCharArray()
29-
return if (items.isEmpty()) {
28+
val items = this.get("class_5_min")?.textValue()?.toCharArray()
29+
return if (items == null || items.isEmpty()) {
3030
emptySequence()
3131
} else {
3232
items.asSequence().mapIndexedCatching { index, value ->

oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraSleepMovementConverter.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ class OuraSleepMovementConverter(
3434
val startTimeEpoch = startTime.toInstant().toEpochMilli() / 1000.0
3535
val timeReceivedEpoch = System.currentTimeMillis() / 1000.0
3636
val id = this.get("id").textValue()
37-
val items = this.get("movement_30_sec").textValue().toCharArray()
38-
return if (items.isEmpty()) {
37+
val items = this.get("movement_30_sec")?.textValue()?.toCharArray()
38+
return if (items == null || items.isEmpty()) {
3939
emptySequence()
4040
} else {
4141
items.asSequence()

oura-library/src/main/kotlin/org/radarbase/oura/converter/OuraSleepTimeRecommendationConverter.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ class OuraSleepTimeRecommendationConverter(
4848
optimalBedtimeStartOffset = data.get("optimal_bedtime")?.get("start_offset")?.intValue()
4949
optimalBedtimeEndOffset = data.get("optimal_bedtime")?.get("end_offset")?.intValue()
5050
optimalBedtimeTimezoneOffset = data.get("optimal_bedtime")?.get("day_tz")?.intValue()
51-
recommendation = data.get("recommendation").textValue()?.classifyRecommendation()
51+
recommendation = data.get("recommendation")?.textValue()?.classifyRecommendation()
5252
?: OuraSleepRecommendation.UNKNOWN
53-
status = data.get("status").textValue()?.classifyStatus()
53+
status = data.get("status")?.textValue()?.classifyStatus()
5454
?: OuraSleepStatus.UNKNOWN
5555
}.build()
5656
}

oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ class OuraRequestGenerator
2020
@JvmOverloads
2121
constructor(
2222
private val userRepository: UserRepository,
23-
private val defaultQueryRange: Duration = Duration.ofDays(15),
2423
private val ouraOffsetManager: OuraOffsetManager,
2524
public val routes: List<Route> = OuraRouteFactory.getRoutes(userRepository),
25+
private val defaultQueryRange: Duration = Duration.ofDays(15),
2626
) : RequestGenerator {
2727
private val routeNextRequest: MutableMap<String, Instant> = mutableMapOf()
2828

0 commit comments

Comments
 (0)