Skip to content

Commit daaa278

Browse files
authored
Merge pull request #102 from RADAR-base/oura-impl
Add Oura Library implementation
2 parents 78da8e3 + 095eebf commit daaa278

File tree

61 files changed

+2672
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+2672
-0
lines changed

build.gradle.kts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ plugins {
66
id("org.radarbase.radar-kotlin") version Versions.radarCommons apply false
77
}
88

9+
repositories {
10+
// Use jcenter for resolving dependencies.
11+
// You can declare any Maven/Ivy/file repository here.
12+
mavenCentral()
13+
}
14+
915
description = "Kafka connector for REST API sources"
1016

1117
radarRootProject {

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ object Versions {
99
const val radarCommons = "1.1.1"
1010
const val confluent = "7.5.0"
1111
const val kafka = "$confluent-ce"
12+
const val avro = "1.11.0"
1213

1314
// From image
1415
const val jackson = "2.14.2"
@@ -25,4 +26,6 @@ object Versions {
2526
const val junit = "5.9.3"
2627
const val wiremock = "2.27.2"
2728
const val mockito = "5.3.1"
29+
30+
const val kotlinVersion = "1.8.21"
2831
}

kafka-connect-fitbit-source/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ description = "Kafka connector for Fitbit API source"
22

33
dependencies {
44
api(project(":kafka-connect-rest-source"))
5+
api(project(":oura-library"))
56
api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}")
67
api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}")
78
implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}")
@@ -10,6 +11,7 @@ dependencies {
1011
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
1112
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
1213
implementation("com.google.firebase:firebase-admin:${Versions.firebaseAdmin}")
14+
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.3.41")
1315

1416
implementation("io.ktor:ktor-client-auth:${Versions.ktor}")
1517
implementation("io.ktor:ktor-client-content-negotiation:${Versions.ktor}")

oura-library/build.gradle

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
2+
group = 'org.radarbase'
3+
version = '0.0.1'
4+
5+
apply plugin: 'maven-publish'
6+
7+
repositories {
8+
// Use jcenter for resolving dependencies.
9+
// You can declare any Maven/Ivy/file repository here.
10+
mavenCentral()
11+
}
12+
13+
dependencies {
14+
// Use the Kotlin JDK 8 standard library.
15+
implementation 'org.jetbrains.kotlin:kotlin-stdlib-jdk8'
16+
17+
implementation "com.squareup.okhttp3:okhttp:$Versions.okhttp"
18+
19+
implementation "org.radarbase:radar-schemas-commons:$Versions.radarSchemas"
20+
21+
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: "$Versions.jackson"
22+
23+
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: "$Versions.jackson"
24+
25+
implementation group: 'org.apache.avro', name: 'avro', version: "$Versions.avro"
26+
27+
implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:$Versions.jackson"
28+
29+
// Use the Kotlin test library.
30+
testImplementation 'org.jetbrains.kotlin:kotlin-test'
31+
32+
// Use the Kotlin JUnit integration.
33+
testImplementation 'org.jetbrains.kotlin:kotlin-test-junit'
34+
}
35+
36+
project.afterEvaluate {
37+
publishing {
38+
publications {
39+
library(MavenPublication) {
40+
setGroupId "$group"
41+
setArtifactId "oura-library"
42+
version "$version"
43+
from components.java
44+
}
45+
}
46+
}
47+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.oura.converter
19+
20+
import java.time.ZonedDateTime
21+
22+
data class DateRange(
23+
val start: ZonedDateTime,
24+
val end: ZonedDateTime,
25+
)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.radarbase.oura.converter
2+
3+
import com.fasterxml.jackson.databind.JsonNode
4+
import org.radarbase.oura.user.User
5+
import org.radarcns.connector.oura.OuraActivityClass
6+
import org.radarcns.connector.oura.OuraActivityClassType
7+
import org.slf4j.LoggerFactory
8+
import java.io.IOException
9+
import java.time.OffsetDateTime
10+
11+
class OuraDailyActivityClassConverter(
12+
private val topic: String = "connect_oura_activity_class",
13+
) : OuraDataConverter {
14+
15+
final val ACTIVITY_CLASS_INTERVAL = 300 // in seconds
16+
17+
@Throws(IOException::class)
18+
override fun processRecords(root: JsonNode, user: User): Sequence<Result<TopicData>> {
19+
val array = root.get("data") ?: return emptySequence()
20+
return array.asSequence().flatMap { it.processSamples(user) }
21+
}
22+
23+
private fun JsonNode.processSamples(user: User): Sequence<Result<TopicData>> {
24+
val startTime = OffsetDateTime.parse(this["timestamp"].textValue())
25+
val startTimeEpoch = startTime.toInstant().toEpochMilli() / 1000.0
26+
val timeReceivedEpoch = System.currentTimeMillis() / 1000.0
27+
val id = this.get("id").textValue()
28+
val items = this.get("class_5_min").textValue().toCharArray()
29+
return if (items.isEmpty()) {
30+
emptySequence()
31+
} else {
32+
items.asSequence().mapIndexedCatching { index, value ->
33+
val offset = ACTIVITY_CLASS_INTERVAL * index
34+
val time = startTimeEpoch + offset
35+
TopicData(
36+
key = user.observationKey,
37+
topic = topic,
38+
offset = time.toLong(),
39+
value =
40+
toActivityClass(
41+
time,
42+
timeReceivedEpoch,
43+
id,
44+
value.toString(),
45+
),
46+
)
47+
}
48+
}
49+
}
50+
51+
private fun toActivityClass(
52+
startTimeEpoch: Double,
53+
timeReceivedEpoch: Double,
54+
idString: String,
55+
value: String,
56+
): OuraActivityClass {
57+
return OuraActivityClass.newBuilder()
58+
.apply {
59+
id = idString
60+
time = startTimeEpoch
61+
timeReceived = timeReceivedEpoch
62+
type = value.classify()
63+
}
64+
.build()
65+
}
66+
67+
private fun String.classify(): OuraActivityClassType {
68+
return when (this) {
69+
"0" -> OuraActivityClassType.NON_WEAR
70+
"1" -> OuraActivityClassType.REST
71+
"2" -> OuraActivityClassType.INACTIVE
72+
"3" -> OuraActivityClassType.LOW_ACTIVITY
73+
"4" -> OuraActivityClassType.MEDIUM_ACTIVITY
74+
"5" -> OuraActivityClassType.HIGH_ACTIVITY
75+
else -> OuraActivityClassType.UNKNOWN
76+
}
77+
}
78+
79+
companion object {
80+
val logger = LoggerFactory.getLogger(OuraDailyActivityClassConverter::class.java)
81+
}
82+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package org.radarbase.oura.converter
2+
3+
import com.fasterxml.jackson.databind.JsonNode
4+
import org.radarbase.oura.user.User
5+
import org.radarcns.connector.oura.OuraDailyActivity
6+
import org.slf4j.LoggerFactory
7+
import java.time.Instant
8+
import java.time.OffsetDateTime
9+
10+
class OuraDailyActivityConverter(
11+
private val topic: String = "connect_oura_daily_activity",
12+
) : OuraDataConverter {
13+
override fun processRecords(
14+
root: JsonNode,
15+
user: User,
16+
): Sequence<Result<TopicData>> {
17+
val array = root.get("data")
18+
?: return emptySequence()
19+
return array.asSequence()
20+
.mapCatching {
21+
val startTime = OffsetDateTime.parse(it["timestamp"].textValue())
22+
val startInstant = startTime.toInstant()
23+
TopicData(
24+
key = user.observationKey,
25+
topic = topic,
26+
offset = startInstant.toEpoch(),
27+
value = it.toDailyActivity(startInstant),
28+
)
29+
}
30+
}
31+
32+
private fun JsonNode.toDailyActivity(
33+
startTime: Instant,
34+
): OuraDailyActivity {
35+
val data = this
36+
return OuraDailyActivity.newBuilder().apply {
37+
time = startTime.toEpochMilli() / 1000.0
38+
timeReceived = System.currentTimeMillis() / 1000.0
39+
id = data.get("id").textValue()
40+
activeCalories = data.get("active_calories").intValue()
41+
contributorMeetDailyTargets =
42+
data.get("contributors")?.get("meet_daily_targets")?.intValue()
43+
contributorMoveEveryHour = data.get("contributors")?.get("move_every_hour")?.intValue()
44+
contributorRecoveryTime = data.get("contributors")?.get("recovery_time")?.intValue()
45+
contributorStayActive = data.get("contributors")?.get("stay_active")?.intValue()
46+
contributorTrainingFrequency =
47+
data.get("contributors")?.get("training_frequency")?.intValue()
48+
contributorTrainingVolume = data.get("contributors")?.get("training_volume")?.intValue()
49+
equivalentWalkingDistance = data.get("equivalent_walking_distance").intValue()
50+
highActivityMetMinutes = data.get("high_activity_met_minutes").intValue()
51+
highActivityTime = data.get("high_activity_time").intValue()
52+
inactivityAlerts = data.get("inactivity_alerts").intValue()
53+
lowActivityMetMinutes = data.get("low_activity_met_minutes").intValue()
54+
lowActivityTime = data.get("low_activity_time").intValue()
55+
mediumActivityMetMinutes = data.get("medium_activity_met_minutes").intValue()
56+
mediumActivityTime = data.get("medium_activity_time").intValue()
57+
metersToTarget = data.get("meters_to_target").intValue()
58+
nonWearTime = data.get("non_wear_time").intValue()
59+
restingTime = data.get("resting_time").intValue()
60+
sedentaryMetMinutes = data.get("sedentary_met_minutes").intValue()
61+
sedentaryTime = data.get("sedentary_time").intValue()
62+
steps = data.get("steps").intValue()
63+
targetCalories = data.get("target_calories").intValue()
64+
targetMeters = data.get("target_meters").intValue()
65+
totalCalories = data.get("total_calories").intValue()
66+
day = data.get("day").textValue()
67+
score = data.get("score").intValue()
68+
}.build()
69+
}
70+
71+
companion object {
72+
val logger = LoggerFactory.getLogger(OuraDailyActivityConverter::class.java)
73+
}
74+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.radarbase.oura.converter
2+
3+
import com.fasterxml.jackson.databind.JsonNode
4+
import org.radarbase.oura.user.User
5+
import org.radarcns.connector.oura.OuraMet
6+
import org.slf4j.LoggerFactory
7+
import java.io.IOException
8+
import java.time.OffsetDateTime
9+
10+
class OuraDailyActivityMetConverter(
11+
private val topic: String = "connect_oura_met",
12+
private val sampleKey: String = "met",
13+
) : OuraDataConverter {
14+
15+
@Throws(IOException::class)
16+
override fun processRecords(
17+
root: JsonNode,
18+
user: User,
19+
): Sequence<Result<TopicData>> {
20+
val array = root.get("data")
21+
?: return emptySequence()
22+
return array.asSequence()
23+
.flatMap {
24+
runCatching {
25+
it.processSamples(user)
26+
}.getOrElse {
27+
logger.error("Error processing records", it.message)
28+
emptySequence()
29+
}
30+
}
31+
}
32+
33+
private fun JsonNode.processSamples(
34+
user: User,
35+
): Sequence<Result<TopicData>> {
36+
val startTime = OffsetDateTime.parse(this["timestamp"].textValue())
37+
val startTimeEpoch = startTime.toInstant().toEpochMilli() / 1000.0
38+
val timeReceivedEpoch = System.currentTimeMillis() / 1000.0
39+
val id = this.get("id").textValue()
40+
val interval = this.get(sampleKey)?.get("interval")?.intValue()
41+
?: throw IOException("Unable to get sample interval.")
42+
val items = this.get(sampleKey)?.get("items") ?: throw IOException("Unable to get items.")
43+
return items.asSequence()
44+
.mapIndexedCatching { index, value ->
45+
val offset = interval * index
46+
val time = startTimeEpoch + offset
47+
TopicData(
48+
key = user.observationKey,
49+
topic = topic,
50+
offset = time.toLong(),
51+
value = toMet(
52+
time,
53+
timeReceivedEpoch,
54+
id,
55+
value.floatValue(),
56+
),
57+
)
58+
}
59+
}
60+
61+
private fun toMet(
62+
startTimeEpoch: Double,
63+
timeReceivedEpoch: Double,
64+
idString: String,
65+
value: Float,
66+
): OuraMet {
67+
return OuraMet.newBuilder().apply {
68+
id = idString
69+
time = startTimeEpoch
70+
timeReceived = timeReceivedEpoch
71+
met = value
72+
}.build()
73+
}
74+
75+
companion object {
76+
val logger = LoggerFactory.getLogger(OuraDailyActivityMetConverter::class.java)
77+
}
78+
}

0 commit comments

Comments
 (0)