diff --git a/build.gradle.kts b/build.gradle.kts index 0a6d2536..dbb46d70 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -57,14 +57,12 @@ dependencies { implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion") val grizzlyVersion: String by project - runtimeOnly("org.glassfish.grizzly:grizzly-framework-monitoring:$grizzlyVersion") - runtimeOnly("org.glassfish.grizzly:grizzly-http-monitoring:$grizzlyVersion") - runtimeOnly("org.glassfish.grizzly:grizzly-http-server-monitoring:$grizzlyVersion") - - val log4j2Version: String by project - runtimeOnly("org.apache.logging.log4j:log4j-slf4j-impl:$log4j2Version") - runtimeOnly("org.apache.logging.log4j:log4j-api:$log4j2Version") - runtimeOnly("org.apache.logging.log4j:log4j-jul:$log4j2Version") + implementation("org.glassfish.grizzly:grizzly-framework-monitoring:$grizzlyVersion") + implementation("org.glassfish.grizzly:grizzly-http-monitoring:$grizzlyVersion") + implementation("org.glassfish.grizzly:grizzly-http-server-monitoring:$grizzlyVersion") + + val logbackVersion: String by project + runtimeOnly("ch.qos.logback:logback-classic:$logbackVersion") val jedisVersion: String by project implementation("redis.clients:jedis:$jedisVersion") diff --git a/gateway.yml b/gateway.yml index d3d9dc68..84565a6d 100644 --- a/gateway.yml +++ b/gateway.yml @@ -38,3 +38,12 @@ pushIntegration: uri: redis://localhost:6379 # Key prefix for locks lockPrefix: radar-push-garmin/lock/ + fitbit: + enabled: true + verificationCode: "" + clientId: "" + clientSecret: "" + userRepositoryUrl: "" + userRepositoryClientId: "" + userRepositoryClientSecret: "" + userRepositoryTokenUrl: "" diff --git a/gradle.properties b/gradle.properties index e96f6a9a..2bcb6553 100644 --- a/gradle.properties +++ b/gradle.properties @@ -3,13 +3,13 @@ dockerComposeStopContainers=true kotlinVersion=1.6.10 okhttp3Version=4.9.3 -radarJerseyVersion=0.8.1 +radarJerseyVersion=0.9.1 radarCommonsVersion=0.13.2 -radarSchemasVersion=0.7.6 +radarSchemasVersion=0.8.1 radarOauthClientVersion=0.8.0 -jacksonVersion=2.12.2 -slf4jVersion=1.7.32 -log4j2Version=2.17.0 +jacksonVersion=2.13.4 +slf4jVersion=2.0.3 +logbackVersion=1.4.4 kafkaVersion=2.8.1 confluentVersion=6.2.0 junitVersion=5.7.2 diff --git a/src/main/kotlin/org/radarbase/gateway/Config.kt b/src/main/kotlin/org/radarbase/gateway/Config.kt index 691109b0..ea7db644 100644 --- a/src/main/kotlin/org/radarbase/gateway/Config.kt +++ b/src/main/kotlin/org/radarbase/gateway/Config.kt @@ -5,8 +5,10 @@ import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGI import org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG import org.radarbase.gateway.inject.PushIntegrationEnhancerFactory import org.radarbase.jersey.enhancer.EnhancerFactory +import org.radarbase.push.integration.fitbit.user.FitbitUserRepository import org.radarbase.push.integration.garmin.user.GarminUserRepository import java.net.URI +import java.time.Duration import java.time.Instant data class Config( @@ -33,10 +35,12 @@ data class Config( } data class PushIntegrationConfig( - val garmin: GarminConfig = GarminConfig() + val garmin: GarminConfig = GarminConfig(), + val fitbit: FitbitConfig = FitbitConfig() ) { fun validate() { garmin.validate() + fitbit.validate() // Add more validations as services are added } } @@ -81,6 +85,48 @@ data class GarminConfig( } } +data class FitbitConfig( + val enabled: Boolean = false, + val verificationCode: String = "", + val clientSecret: String = "", + val clientId: String = "", + val subscriptionConfig: SubscriptionConfig = SubscriptionConfig(), + val userRepositoryClass: String = + "org.radarbase.push.integration.fitbit.user.FitbitUserRepository", + val userRepositoryUrl: String = "http://localhost:8080/", + val userRepositoryClientId: String = "radar_pushendpoint", + val userRepositoryClientSecret: String = "", + val userRepositoryTokenUrl: String = "http://localhost:8080/token/", + val sleepStagesTopic: String = "connect_fitbit_sleep_stages", + val sleepClassicTopic: String = "connect_fitbit_sleep_classic", + val activityLogTopic: String = "connect_fitbit_activity_log", + val foodLogTopic: String = "connect_fitbit_food_log", + val routePollIntervalMs: Long = 5000, + val pollIntervalPerUserSeconds: Long = 150, + val requestMaxThreads: Int = 4, + val redis: RedisConfig = RedisConfig(lockPrefix = "radar-fitbit-subscription/lock"), + val baseUrl: String = "https://api.fitbit.com", +) { + val userRepository: Class<*> = Class.forName(userRepositoryClass) + val routePollInterval: Duration = Duration.ofMillis(routePollIntervalMs) + val pollIntervalPerUser: Duration = Duration.ofSeconds(pollIntervalPerUserSeconds) + val tooManyRequestsCooldown: Duration = Duration.ofHours(1) + + fun validate() { + if (enabled) { + check(FitbitUserRepository::class.java.isAssignableFrom(userRepository)) { + "$userRepositoryClass is not valid. Please specify a class that is a subclass of" + + " `org.radarbase.push.integration.fitbit.user.FitbitUserRepository`" + } + } + } +} + +data class SubscriptionConfig( + val maxThreads: Int = 4, + val subscriberID: String = "1", +) + data class BackfillConfig( val enabled: Boolean = false, val redis: RedisConfig = RedisConfig(), diff --git a/src/main/kotlin/org/radarbase/gateway/inject/PushIntegrationEnhancerFactory.kt b/src/main/kotlin/org/radarbase/gateway/inject/PushIntegrationEnhancerFactory.kt index d2f1cfc9..e7922b87 100644 --- a/src/main/kotlin/org/radarbase/gateway/inject/PushIntegrationEnhancerFactory.kt +++ b/src/main/kotlin/org/radarbase/gateway/inject/PushIntegrationEnhancerFactory.kt @@ -2,10 +2,10 @@ package org.radarbase.gateway.inject import okhttp3.internal.toImmutableList import org.radarbase.gateway.Config -import org.radarbase.jersey.config.ConfigLoader import org.radarbase.jersey.enhancer.Enhancers import org.radarbase.jersey.enhancer.EnhancerFactory import org.radarbase.jersey.enhancer.JerseyResourceEnhancer +import org.radarbase.push.integration.FitbitPushIntegrationResourceEnhancer import org.radarbase.push.integration.GarminPushIntegrationResourceEnhancer import org.radarbase.push.integration.common.inject.PushIntegrationResourceEnhancer @@ -24,6 +24,9 @@ class PushIntegrationEnhancerFactory(private val config: Config) : EnhancerFacto if (config.pushIntegration.garmin.enabled) { enhancersList.add(GarminPushIntegrationResourceEnhancer(config)) } + if (config.pushIntegration.fitbit.enabled) { + enhancersList.add(FitbitPushIntegrationResourceEnhancer(config)) + } // Add more enhancers as services are added return enhancersList.toImmutableList() diff --git a/src/main/kotlin/org/radarbase/push/integration/FitbitPushIntegrationResourceEnhancer.kt b/src/main/kotlin/org/radarbase/push/integration/FitbitPushIntegrationResourceEnhancer.kt new file mode 100644 index 00000000..438d0a56 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/FitbitPushIntegrationResourceEnhancer.kt @@ -0,0 +1,54 @@ +package org.radarbase.push.integration + +import com.fasterxml.jackson.databind.JsonNode +import jakarta.inject.Singleton +import org.glassfish.hk2.api.TypeLiteral +import org.glassfish.jersey.internal.inject.AbstractBinder +import org.glassfish.jersey.process.internal.RequestScoped +import org.glassfish.jersey.server.ResourceConfig +import org.radarbase.gateway.Config +import org.radarbase.jersey.auth.AuthValidator +import org.radarbase.jersey.enhancer.JerseyResourceEnhancer +import org.radarbase.push.integration.common.auth.DelegatedAuthValidator.Companion.FITBIT_QUALIFIER +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.fitbit.auth.FitbitAuthValidator +import org.radarbase.push.integration.fitbit.factory.FitbitUserTreeMapFactory +import org.radarbase.push.integration.fitbit.service.fitbitapi.FitbitApiService +import org.radarbase.push.integration.fitbit.service.fitbitapi.FitbitRequestProcessor +import org.radarbase.push.integration.fitbit.user.FitbitUserRepository + +class FitbitPushIntegrationResourceEnhancer(private val config: Config) : JerseyResourceEnhancer { + override fun ResourceConfig.enhance() { + packages( + "org.radarbase.push.integration.fitbit.resource", + "org.radarbase.push.integration.common.filter", + "org.radarbase.push.integration.fitbit.filter" + ) + } + + override val classes: Array> + get() = arrayOf(FitbitRequestProcessor::class.java) + + override fun AbstractBinder.enhance() { + + bind(config.pushIntegration.fitbit.userRepository) + .to(FitbitUserRepository::class.java) + .named(FITBIT_QUALIFIER) + .`in`(Singleton::class.java) + + bind(FitbitAuthValidator::class.java) + .to(AuthValidator::class.java) + .named(FITBIT_QUALIFIER) + .`in`(Singleton::class.java) + + bind(FitbitApiService::class.java) + .to(FitbitApiService::class.java) + .`in`(Singleton::class.java) + + bindFactory(FitbitUserTreeMapFactory::class.java) + .to(object : TypeLiteral>() {}.type) + .proxy(true) + .named(FITBIT_QUALIFIER) + .`in`(RequestScoped::class.java) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/common/auth/DelegatedAuthValidator.kt b/src/main/kotlin/org/radarbase/push/integration/common/auth/DelegatedAuthValidator.kt index adefc19b..0440fbf3 100644 --- a/src/main/kotlin/org/radarbase/push/integration/common/auth/DelegatedAuthValidator.kt +++ b/src/main/kotlin/org/radarbase/push/integration/common/auth/DelegatedAuthValidator.kt @@ -17,6 +17,7 @@ class DelegatedAuthValidator( fun delegate(): AuthValidator { return when { uriInfo.matches(GARMIN_QUALIFIER) -> namedValidators.named(GARMIN_QUALIFIER).get() + uriInfo.matches(FITBIT_QUALIFIER) -> namedValidators.named(FITBIT_QUALIFIER).get() // Add support for more as integrations are added else -> throw IllegalStateException() } @@ -27,6 +28,7 @@ class DelegatedAuthValidator( companion object { const val GARMIN_QUALIFIER = "garmin" + const val FITBIT_QUALIFIER = "fitbit" } override fun verify(token: String, request: ContainerRequestContext): Auth? = diff --git a/src/main/kotlin/org/radarbase/push/integration/common/filter/CorsFilter.kt b/src/main/kotlin/org/radarbase/push/integration/common/filter/CorsFilter.kt index f6a23ee7..1927cafd 100644 --- a/src/main/kotlin/org/radarbase/push/integration/common/filter/CorsFilter.kt +++ b/src/main/kotlin/org/radarbase/push/integration/common/filter/CorsFilter.kt @@ -16,7 +16,7 @@ class CorsFilter : ContainerResponseFilter { cres.headers .add("Access-Control-Allow-Headers", "origin, content-type, accept, authorization") cres.headers.add("Access-Control-Allow-Credentials", "true") - cres.headers.add("Access-Control-Allow-Methods", "POST, OPTIONS") + cres.headers.add("Access-Control-Allow-Methods", "GET, POST, OPTIONS") cres.headers.add("Access-Control-Max-Age", "1209600") } } diff --git a/src/main/kotlin/org/radarbase/push/integration/garmin/util/RedisHolder.kt b/src/main/kotlin/org/radarbase/push/integration/common/redis/RedisHolder.kt similarity index 91% rename from src/main/kotlin/org/radarbase/push/integration/garmin/util/RedisHolder.kt rename to src/main/kotlin/org/radarbase/push/integration/common/redis/RedisHolder.kt index 42d38ce1..778b83ab 100644 --- a/src/main/kotlin/org/radarbase/push/integration/garmin/util/RedisHolder.kt +++ b/src/main/kotlin/org/radarbase/push/integration/common/redis/RedisHolder.kt @@ -1,4 +1,4 @@ -package org.radarbase.push.integration.garmin.util +package org.radarbase.push.integration.common.redis import redis.clients.jedis.Jedis import redis.clients.jedis.JedisPool diff --git a/src/main/kotlin/org/radarbase/push/integration/garmin/util/RedisRemoteLockManager.kt b/src/main/kotlin/org/radarbase/push/integration/common/redis/RedisRemoteLockManager.kt similarity index 90% rename from src/main/kotlin/org/radarbase/push/integration/garmin/util/RedisRemoteLockManager.kt rename to src/main/kotlin/org/radarbase/push/integration/common/redis/RedisRemoteLockManager.kt index c9ab2a7f..ab967a66 100644 --- a/src/main/kotlin/org/radarbase/push/integration/garmin/util/RedisRemoteLockManager.kt +++ b/src/main/kotlin/org/radarbase/push/integration/common/redis/RedisRemoteLockManager.kt @@ -1,4 +1,4 @@ -package org.radarbase.push.integration.garmin.util +package org.radarbase.push.integration.common.redis import org.slf4j.LoggerFactory import redis.clients.jedis.params.SetParams @@ -6,8 +6,8 @@ import java.time.Duration import java.util.* class RedisRemoteLockManager( - private val redisHolder: RedisHolder, - private val keyPrefix: String + private val redisHolder: RedisHolder, + private val keyPrefix: String ) : RemoteLockManager { private val uuid: String = UUID.randomUUID().toString() diff --git a/src/main/kotlin/org/radarbase/push/integration/garmin/util/RemoteLockManager.kt b/src/main/kotlin/org/radarbase/push/integration/common/redis/RemoteLockManager.kt similarity index 82% rename from src/main/kotlin/org/radarbase/push/integration/garmin/util/RemoteLockManager.kt rename to src/main/kotlin/org/radarbase/push/integration/common/redis/RemoteLockManager.kt index dbbee310..621f85da 100644 --- a/src/main/kotlin/org/radarbase/push/integration/garmin/util/RemoteLockManager.kt +++ b/src/main/kotlin/org/radarbase/push/integration/common/redis/RemoteLockManager.kt @@ -1,4 +1,4 @@ -package org.radarbase.push.integration.garmin.util +package org.radarbase.push.integration.common.redis import java.io.Closeable diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/auth/FitbitAuthValidator.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/auth/FitbitAuthValidator.kt new file mode 100644 index 00000000..c32b60df --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/auth/FitbitAuthValidator.kt @@ -0,0 +1,119 @@ +package org.radarbase.push.integration.fitbit.auth + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import jakarta.inject.Named +import jakarta.ws.rs.container.ContainerRequestContext +import jakarta.ws.rs.core.Context +import org.radarbase.gateway.Config +import org.radarbase.jersey.auth.Auth +import org.radarbase.jersey.auth.AuthValidator +import org.radarbase.jersey.auth.disabled.DisabledAuth +import org.radarbase.jersey.exception.HttpNotFoundException +import org.radarbase.push.integration.common.auth.DelegatedAuthValidator.Companion.FITBIT_QUALIFIER +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.fitbit.user.FitbitUserRepository +import java.security.InvalidKeyException +import java.security.NoSuchAlgorithmException +import java.util.Base64 +import javax.crypto.Mac +import javax.crypto.spec.SecretKeySpec + +class FitbitAuthValidator( + @Context val objectMapper: ObjectMapper, + @Context val config: Config, + @Named(FITBIT_QUALIFIER) private val userRepository: FitbitUserRepository +) : AuthValidator { + + override fun verify(token: String, request: ContainerRequestContext): Auth { + return if (token.isBlank()) { + throw HttpNotFoundException("not_found", "Signature was not found") + } else { + + val tree: JsonNode? = if (request.hasEntity()) { + // We put the json tree in the request because the entity stream will be closed here + val tree1 = objectMapper.readTree(request.entityStream) + request.setProperty("tree", tree1) + tree1 + } else null + + if (!isSignatureValid(token, tree)) { + throw HttpNotFoundException("invalid_signature", "Valid Signature not found") + } + + if (!checkIsUserAuthorized(request, tree)) { + request.setProperty("user_tree_map", null) + } + + // Disable auth since we don't have proper auth support + DisabledAuth("res_gateway") + } + } + + override fun getToken(request: ContainerRequestContext): String = + request.getHeaderString("X-Fitbit-Signature") + ?: throw HttpNotFoundException("not_found", "Signature was not found") + + + fun checkIsUserAuthorized(request: ContainerRequestContext, tree: JsonNode?): Boolean { + + if (tree == null) { + return false + } + + val userTreeMap: Map = + tree[tree.fieldNames().next()] + .groupBy { node -> + node[USER_ID_KEY].asText() + } + .filter { (userId, userData) -> + try { + userRepository.findByExternalId(userId) + true + } catch (ex: NoSuchElementException) { + false + } + } + .entries + .associate { (userId, userData) -> + userRepository.findByExternalId(userId) to + objectMapper.createObjectNode() + .set(tree.fieldNames().next(), objectMapper.valueToTree(userData)) + + } + request.setProperty("user_tree_map", userTreeMap) + return true + } + + + fun isSignatureValid(signature: String?, contents: JsonNode?): Boolean { + val signingKey = "${config.pushIntegration.fitbit.clientSecret}&" + if (signature == null) { + return false + } + if (contents == null) { + return false + } + val genHMAC = genHMAC(contents.asText(), signingKey) + return genHMAC.equals(signature) + } + + fun genHMAC(data: String, key: String): String? { + var result: ByteArray? = null + try { + val signinKey = SecretKeySpec(key.toByteArray(), "HmacSHA1") + val mac = Mac.getInstance("HmacSHA1") + mac.init(signinKey) + val rawHmac = mac.doFinal(data.toByteArray()) + result = Base64.getEncoder().encode(rawHmac) + } catch (e: NoSuchAlgorithmException) { + System.err.println(e.message) + } catch (e: InvalidKeyException) { + System.err.println(e.message) + } + return result?.let { String(it) } + } + companion object { + const val USER_ID_KEY = "ownerId" + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/DateRange.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/DateRange.kt new file mode 100644 index 00000000..80ab12ac --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/DateRange.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.converter + +import java.time.ZonedDateTime + +data class DateRange( + val start: ZonedDateTime, + val end: ZonedDateTime, +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitActivityLogDataConverter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitActivityLogDataConverter.kt new file mode 100644 index 00000000..70316353 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitActivityLogDataConverter.kt @@ -0,0 +1,145 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.converter + +import com.fasterxml.jackson.databind.JsonNode +import org.radarcns.connector.fitbit.* +import org.slf4j.LoggerFactory +import java.time.Instant +import java.time.OffsetDateTime +import java.time.ZoneOffset + +class FitbitActivityLogDataConverter( + private val activityLogTopic: String +) : FitbitDataConverter { + override fun processRecords( + dateRange: DateRange, root: JsonNode, timeReceived: Double + ): Sequence> { + val array = root.optArray("activities") + ?: return emptySequence() + + return array.asSequence() + .sortedBy { it["startTime"].textValue() } + .mapCatching { s -> + val startTime = OffsetDateTime.parse(s["startTime"].textValue()) + val startInstant = startTime.toInstant() + TopicData( + sourceOffset = startInstant, + topic = activityLogTopic, + value = s.toActivityLogRecord(startInstant, startTime.offset), + ) + } + } + + private fun JsonNode.toActivityLogRecord( + startTime: Instant, + offset: ZoneOffset, + ): FitbitActivityLogRecord { + return FitbitActivityLogRecord.newBuilder().apply { + time = startTime.toEpochMilli() / 1000.0 + timeReceived = System.currentTimeMillis() / 1000.0 + timeLastModified = Instant.parse(get("lastModified").asText()).toEpochMilli() / 1000.0 + id = requireNotNull(optLong("logId")) { "Activity log ID not specified" } + logType = optString("logType") + type = optLong("activityType") + speed = optDouble("speed") + distance = optDouble("distance")?.toFloat() + steps = optInt("steps") + energy = optInt("calories")?.let { it * FOOD_CAL_TO_KJOULE_FACTOR } + duration = (requireNotNull(optLong("duration")) { "Activity log duration not specified" } + / 1000f) + durationActive = requireNotNull(optLong("durationActive")) { "Activity active log duration not specified" } / 1000f + timeZoneOffset = offset.totalSeconds + name = optString("activityName") + heartRate = toHeartRate() + manualDataEntry = optObject("manualValuesSpecified")?.toManualDataEntry() + levels = optArray("activityLevels")?.toActivityLevels() + source = optObject("source")?.toSource() + }.build() + } + + private fun JsonNode.toSource(): FitbitSource? = + optString("id")?.let { sourceId -> + FitbitSource.newBuilder().apply { + id = sourceId + name = optString("name") + type = optString("type") + url = optString("url") + }.build() + } + + private fun Iterable.toActivityLevels(): FitbitActivityLevels = + FitbitActivityLevels.newBuilder().apply { + forEach { level -> + val durationMinutes = level.optInt("minutes") ?: return@forEach + val duration = durationMinutes * 60 + when (level.optString("name")) { + "sedentary" -> durationSedentary = duration + "lightly" -> durationLightly = duration + "fairly" -> durationFairly = duration + "very" -> durationVery = duration + } + } + }.build() + + private fun JsonNode.toManualDataEntry(): FitbitManualDataEntry = + FitbitManualDataEntry.newBuilder().apply { + steps = optBoolean("steps") + distance = optBoolean("distance") + energy = optBoolean("calorie") + }.build() + + private fun JsonNode.toHeartRate(): FitbitActivityHeartRate? { + val averageHeartRate: Int? = optInt("averageHeartRate") + val zones = optArray("heartRateZones") + if (averageHeartRate == null && zones == null) { + return null + } + return FitbitActivityHeartRate.newBuilder().apply { + mean = averageHeartRate + zones?.forEach { zone -> + val minValue = zone.optInt("min") + val duration = zone.optInt("minutes")?.let { it * 60 } + when (val zoneText = zone.optString("name")) { + "Out of Range" -> { + min = minValue + durationOutOfRange = duration + } + "Fat Burn" -> { + minFatBurn = minValue + durationFatBurn = duration + } + "Cardio" -> { + minCardio = minValue + durationCardio = duration + } + "Peak" -> { + minPeak = minValue + max = zone.optInt("max") + durationPeak = duration + } + else -> logger.warn("Cannot process unknown heart rate zone {}", zoneText) + } + } + }.build() + } + + companion object { + private val logger = LoggerFactory.getLogger(FitbitActivityLogDataConverter::class.java) + private const val FOOD_CAL_TO_KJOULE_FACTOR = 4.1868f + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitDataConverter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitDataConverter.kt new file mode 100644 index 00000000..e766cfcb --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitDataConverter.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.converter + +import com.fasterxml.jackson.databind.JsonNode +import okhttp3.Headers +import org.radarbase.push.integration.fitbit.request.FitbitRequestGenerator.Companion.JSON_READER +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import java.time.Instant + +/** + * Abstract class to help convert Fitbit data to Avro Data. + */ +interface FitbitDataConverter: PayloadToSourceRecordConverter { + /** Process the JSON records generated by given request. */ + fun processRecords( + dateRange: DateRange, + root: JsonNode, + timeReceived: Double + ): Sequence> + + override fun convert(request: FitbitRestRequest, headers: Headers, data: ByteArray): Sequence> { + val node = JSON_READER.readTree(data) + + return this.processRecords(request.getDateRange(), node, Instant.now().epochSecond.toDouble()) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitFoodLogConverter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitFoodLogConverter.kt new file mode 100644 index 00000000..d328416f --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitFoodLogConverter.kt @@ -0,0 +1,72 @@ +package org.radarbase.push.integration.fitbit.converter + +import com.fasterxml.jackson.databind.JsonNode +import org.apache.avro.specific.SpecificRecordBase +import org.radarcns.connector.fitbit.FitbitFoodLog +import java.time.Instant +import java.time.OffsetDateTime +import java.time.ZoneOffset + +class FitbitFoodLogConverter( + private val foodLogTopic: String +) : FitbitDataConverter { + override fun processRecords( + dateRange: DateRange, + root: JsonNode, + timeReceived: Double + ): Sequence> { + val array = root.optArray("foods") ?: return emptySequence() + + return array.asSequence() + .sortedBy { it["logDate"].textValue() } + .mapCatching { s -> + val startTime = OffsetDateTime.parse(s["logDate"].textValue()) + val startInstant = startTime.toInstant() + TopicData( + sourceOffset = startInstant, + topic = foodLogTopic, + value = s.toFoodLogRecord(startInstant, startTime.offset) + ) + } + } + + private fun JsonNode.toFoodLogRecord( + startTime: Instant, offset: ZoneOffset + ): SpecificRecordBase { + return FitbitFoodLog.newBuilder().apply { + time = startTime.toEpochMilli() / 1000.0 + timeReceived = System.currentTimeMillis() / 1000.0 + isFavorite = + requireNotNull(optBoolean("isFavorite")) { "Food log isFavorite not specified" } + logId = requireNotNull(optLong("logId")) { "Food log logId not specified" } + accessLevel = get("loggedFood").optString("accessLevel") + amount = + requireNotNull(get("loggedFood").optInt("amount")) { "Food log amount not specified" } + brand = get("loggedFood").optString("brand") + foodId = + requireNotNull(get("loggedFood").optLong("foodId")) { "Food log foodId not specified" } + locale = get("loggedFood").optString("locale") + mealTypeId = + requireNotNull(get("loggedFood").optLong("mealTypeId")) { "Food log mealTypeId not specified" } + name = get("loggedFood").optString("name") + unitId = requireNotNull( + get("loggedFood").get("unit").optLong("id") + ) { "Food log unitId not specified" } + unitName = get("loggedFood").get("unit").optString("name") + unitPlural = get("loggedFood").get("unit").optString("plural") + calories = + requireNotNull(get("nutritionalValues").optFloat("calories")) { "Food log calories not specified" } + carbs = + requireNotNull(get("nutritionalValues").optFloat("carbs")) { "Food log carbs not specified" } + fat = + requireNotNull(get("nutritionalValues").optFloat("fat")) { "Food log fat not specified" } + fiber = + requireNotNull(get("nutritionalValues").optFloat("fiber")) { "Food log fiber not specified" } + protein = + requireNotNull(get("nutritionalValues").optFloat("protein")) { "Food log protein not specified" } + sodium = + requireNotNull(get("nutritionalValues").optFloat("sodium")) { "Food log sodium not specified" } + }.build() + } + +} \ No newline at end of file diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitSleepDataConverter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitSleepDataConverter.kt new file mode 100644 index 00000000..5a58244c --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitSleepDataConverter.kt @@ -0,0 +1,123 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.converter + +import com.fasterxml.jackson.databind.JsonNode +import org.radarcns.connector.fitbit.FitbitSleepClassic +import org.radarcns.connector.fitbit.FitbitSleepClassicLevel +import org.radarcns.connector.fitbit.FitbitSleepStage +import org.radarcns.connector.fitbit.FitbitSleepStageLevel +import java.time.Duration +import java.time.Instant +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import kotlin.Result.Companion.failure +import kotlin.Result.Companion.success + +class FitbitSleepDataConverter( + private val sleepStagesTopic: String, + private val sleepClassicTopic: String, +) : FitbitDataConverter { + override fun processRecords( + dateRange: DateRange, root: JsonNode, timeReceived: Double, + ): Sequence> { + val meta = root.optObject("meta") + if (meta != null && meta["state"]?.asText() == "pending") { + return emptySequence() + } + val sleepArray = root.optArray("sleep") + ?: return emptySequence() + return sleepArray.asSequence() + .sortedBy { s -> s.get("startTime").asText() } + .mapCatching { s -> + val startTime = Instant.from(DATE_TIME_FORMAT.parse(s.get("startTime").asText())) + val type = s.optString("type") + val isStages = type == null || type == "stages" + + // use an intermediate offset for all records but the last. Since the query time + // depends only on the start time of a sleep stages group, this will reprocess the entire + // sleep stages group if something goes wrong while processing. + val intermediateOffset = startTime.minus(Duration.ofSeconds(1)) + val allRecords: List = s.optObject("levels") + ?.optArray("data") + ?.map { d -> + val dateTime: String = d.get("dateTime").asText() + val duration: Int = d.get("seconds").asInt() + val level: String = d.get("level").asText() + val efficiency = d["efficiency"].asInt() + if (isStages) { + TopicData( + sourceOffset = intermediateOffset, + topic = sleepStagesTopic, + value = FitbitSleepStage( + dateTime, + timeReceived, + duration, + level.toStagesLevel(), + efficiency, + ), + ) + } else { + TopicData( + sourceOffset = intermediateOffset, + topic = sleepClassicTopic, + value = FitbitSleepClassic( + dateTime, + timeReceived, + duration, + level.toClassicLevel(), + efficiency, + ) + ) + } + } + ?: emptyList() + + // The final group gets the actual offset, to ensure that the group does not get queried + // again. + allRecords.lastOrNull()?.sourceOffset = startTime + allRecords + } + .flatMap { res -> + res.fold( + onSuccess = { data -> data.asSequence().map { success(it) } }, + onFailure = { sequenceOf(failure(it)) }, + ) + } + } + + companion object { + private val DATE_TIME_FORMAT = DateTimeFormatter.ISO_LOCAL_DATE_TIME + .withZone(ZoneOffset.UTC) + + + private fun String.toClassicLevel() = when (this) { + "awake" -> FitbitSleepClassicLevel.AWAKE + "asleep" -> FitbitSleepClassicLevel.ASLEEP + "restless" -> FitbitSleepClassicLevel.RESTLESS + else -> FitbitSleepClassicLevel.UNKNOWN + } + + private fun String.toStagesLevel() = when (this) { + "wake" -> FitbitSleepStageLevel.AWAKE + "rem" -> FitbitSleepStageLevel.REM + "deep" -> FitbitSleepStageLevel.DEEP + "light" -> FitbitSleepStageLevel.LIGHT + else -> FitbitSleepStageLevel.UNKNOWN + } + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitTimeZoneDataConverter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitTimeZoneDataConverter.kt new file mode 100644 index 00000000..94102d82 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/FitbitTimeZoneDataConverter.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.converter + +import com.fasterxml.jackson.databind.JsonNode +import org.radarcns.connector.fitbit.FitbitTimeZone +import org.slf4j.LoggerFactory +import kotlin.Result.Companion.failure +import kotlin.Result.Companion.success + +class FitbitTimeZoneDataConverter(private val timeZoneTopic: String) : FitbitDataConverter { + override fun processRecords( + dateRange: DateRange, + root: JsonNode, + timeReceived: Double, + ): Sequence> { + val user = root.optObject("user") ?: run { + return sequenceOf( + failure(IllegalArgumentException("Failed to get timezone from $root")) + ) + } + val offset = user.optInt("offsetFromUTCMillis")?.let { it / 1000 } + return sequenceOf( + success( + TopicData( + sourceOffset = dateRange.start.toInstant(), + topic = timeZoneTopic, + value = FitbitTimeZone(timeReceived, offset), + ) + ) + ) + } + + companion object { + private val logger = LoggerFactory.getLogger(FitbitTimeZoneDataConverter::class.java) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/JsonNodeExtensions.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/JsonNodeExtensions.kt new file mode 100644 index 00000000..2d58d822 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/JsonNodeExtensions.kt @@ -0,0 +1,90 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.radarbase.push.integration.fitbit.converter + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import org.slf4j.LoggerFactory +import java.util.concurrent.TimeUnit + +private val logger = LoggerFactory.getLogger("org.radarbase.convert.fitbit.JsonNodeExtensionsKt") + +/** Get Fitbit dataset interval used in some intraday API calls. */ +internal fun JsonNode.getRecordInterval(defaultValue: Int): Int { + val type = this["datasetType"] + val interval = this["datasetInterval"] + if (type == null || interval == null) { + logger.warn("Failed to get data interval; using {} instead", defaultValue) + return defaultValue + } + return when (type.asText()) { + "minute" -> TimeUnit.MINUTES + "second" -> TimeUnit.SECONDS + "hour" -> TimeUnit.HOURS + "day" -> TimeUnit.DAYS + "millisecond" -> TimeUnit.MILLISECONDS + "nanosecond" -> TimeUnit.NANOSECONDS + "microsecond" -> TimeUnit.MICROSECONDS + else -> { + logger.warn( + "Failed to parse dataset interval type {} for {}; using {} seconds instead", + type.asText(), + interval.asLong(), + defaultValue + ) + return defaultValue + } + }.toSeconds(interval.asLong()).toInt() +} + +internal fun JsonNode.optLong(fieldName: String): Long? = this[fieldName] + ?.takeIf { it.canConvertToLong() } + ?.longValue() + +internal fun JsonNode.optDouble(fieldName: String): Double? = this[fieldName] + ?.takeIf { it.isNumber } + ?.doubleValue() + +internal fun JsonNode.optFloat(fieldName: String): Float? = this[fieldName] + ?.takeIf { it.isNumber } + ?.floatValue() + +internal fun JsonNode.optInt(fieldName: String): Int? = this[fieldName] + ?.takeIf { it.canConvertToInt() } + ?.intValue() + +internal fun JsonNode.optString(fieldName: String?): String? = this[fieldName] + ?.takeIf { it.isTextual } + ?.textValue() + +internal fun JsonNode.optBoolean(fieldName: String?): Boolean? = this[fieldName] + ?.takeIf { it.isBoolean } + ?.booleanValue() + +internal fun JsonNode.optObject(fieldName: String?): ObjectNode? = this[fieldName] + ?.takeIf { it.isObject } as ObjectNode? + +internal fun JsonNode.optArray(fieldName: String?): Iterable? = this[fieldName] + ?.takeIf { it.isArray && it.size() > 0 } + + +internal fun Sequence.mapCatching(fn: (T) -> S): Sequence> = map { t -> + runCatching { + fn(t) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/PayloadToSourceRecordConverter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/PayloadToSourceRecordConverter.kt new file mode 100644 index 00000000..7431d9f5 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/PayloadToSourceRecordConverter.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.converter + +import okhttp3.Headers +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import java.io.IOException +import java.time.Duration +import java.time.Instant + +interface PayloadToSourceRecordConverter { + @Throws(IOException::class) + fun convert( + request: FitbitRestRequest, headers: Headers, data: ByteArray + ): Sequence> + + companion object { + fun nearFuture(): Instant { + return Instant.now().plus(NEAR_FUTURE) + } + + val MIN_INSTANT = Instant.EPOCH + const val TIMESTAMP_OFFSET_KEY = "timestamp" + private val NEAR_FUTURE = Duration.ofDays(31L) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/TopicData.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/TopicData.kt new file mode 100644 index 00000000..45b90a24 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/converter/TopicData.kt @@ -0,0 +1,28 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.radarbase.push.integration.fitbit.converter + +import org.apache.avro.generic.IndexedRecord +import java.time.Instant + +/** Single value for a topic. */ +data class TopicData( + var sourceOffset: Instant, + val topic: String, + val value: IndexedRecord, +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/dto/FitbitNotification.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/dto/FitbitNotification.kt new file mode 100644 index 00000000..8161d65d --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/dto/FitbitNotification.kt @@ -0,0 +1,9 @@ +package org.radarbase.push.integration.fitbit.dto + +data class FitbitNotification( + val collectionType: String, + val date: String, + val ownerId: String, + val ownerType: String, + val subscriptionId: String, +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/factory/FitbitUserTreeMapFactory.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/factory/FitbitUserTreeMapFactory.kt new file mode 100644 index 00000000..d5103d35 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/factory/FitbitUserTreeMapFactory.kt @@ -0,0 +1,16 @@ +package org.radarbase.push.integration.fitbit.factory + +import com.fasterxml.jackson.databind.JsonNode +import jakarta.ws.rs.container.ContainerRequestContext +import jakarta.ws.rs.core.Context +import org.radarbase.push.integration.common.user.User +import java.util.function.Supplier + +@Suppress("UNCHECKED_CAST") +class FitbitUserTreeMapFactory( + @Context private val requestContext: ContainerRequestContext +) : Supplier> { + override fun get(): Map = + requestContext.getProperty("user_tree_map") as Map +} + diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerification.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerification.kt new file mode 100644 index 00000000..a99c17a8 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerification.kt @@ -0,0 +1,22 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.radarbase.push.integration.fitbit.filter + +@Target(AnnotationTarget.FUNCTION) +@Retention(AnnotationRetention.RUNTIME) +annotation class ClientDomainVerification(val domainName: String) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerificationFeature.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerificationFeature.kt new file mode 100644 index 00000000..1e038691 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerificationFeature.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2019. The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * + * See the file LICENSE in the root of this repository. + */ + +package org.radarbase.push.integration.fitbit.filter + +import jakarta.inject.Singleton +import jakarta.ws.rs.Priorities +import jakarta.ws.rs.container.DynamicFeature +import jakarta.ws.rs.container.ResourceInfo +import jakarta.ws.rs.core.FeatureContext +import jakarta.ws.rs.ext.Provider + +/** Authorization for different auth tags. */ +@Provider +@Singleton +class ClientDomainVerificationFeature : DynamicFeature { + override fun configure(resourceInfo: ResourceInfo, context: FeatureContext) { + val resourceMethod = resourceInfo.resourceMethod + if (resourceMethod.isAnnotationPresent(ClientDomainVerification::class.java)) { + context.register(ClientDomainVerificationFilter::class.java, Priorities.AUTHORIZATION) + } + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerificationFilter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerificationFilter.kt new file mode 100644 index 00000000..a6f560fa --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/filter/ClientDomainVerificationFilter.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.radarbase.push.integration.fitbit.filter + +import jakarta.inject.Provider +import jakarta.ws.rs.container.ContainerRequestContext +import jakarta.ws.rs.container.ContainerRequestFilter +import jakarta.ws.rs.container.ResourceInfo +import jakarta.ws.rs.core.Context +import jakarta.ws.rs.core.Response +import org.glassfish.grizzly.http.server.Request +import org.slf4j.LoggerFactory +import java.net.InetAddress + +class ClientDomainVerificationFilter( + /** + * Check that the token has given permissions. + */ + @Context private val resourceInfo: ResourceInfo, + @Context private val req: Provider +) : ContainerRequestFilter { + override fun filter(requestContext: ContainerRequestContext) { + val annotation = resourceInfo.resourceMethod.getAnnotation(ClientDomainVerification::class.java) + + val ipAddress = requestContext.getHeaderString("X-Forwarded-For") + ?: req.get().remoteAddr + + val remoteHostName = InetAddress.getByName(ipAddress).hostName + if (remoteHostName != annotation.domainName && !remoteHostName.endsWith(".${annotation.domainName}")) { + logger.error("Failed to verify that IP address {} belongs to domain name {}. It resolves to {} instead.", ipAddress, annotation.domainName, remoteHostName) + requestContext.abortWith(Response.status(Response.Status.NOT_FOUND).build()) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(ClientDomainVerificationFilter::class.java.name) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/OffsetPersistenceFactory.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/OffsetPersistenceFactory.kt new file mode 100644 index 00000000..bcd0729a --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/OffsetPersistenceFactory.kt @@ -0,0 +1,19 @@ +package org.radarbase.push.integration.fitbit.redis + +import java.nio.file.Path + +/** + * Accesses a OffsetRange file using the CSV format. On construction, this will create the file if + * not present. + */ +interface OffsetPersistenceFactory { + /** + * Read offsets from the persistence store. On error, this will return null. + */ + fun read(path: String): Offsets? + + /** + * Add a specific Offset to the provided path. + */ + fun add(path: Path, offset: UserRouteOffset) +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/OffsetRedisPersistence.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/OffsetRedisPersistence.kt new file mode 100644 index 00000000..702905e6 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/OffsetRedisPersistence.kt @@ -0,0 +1,92 @@ +package org.radarbase.push.integration.fitbit.redis + +import com.fasterxml.jackson.databind.ObjectReader +import com.fasterxml.jackson.databind.ObjectWriter +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import org.radarbase.push.integration.common.redis.RedisHolder +import org.slf4j.LoggerFactory +import java.io.IOException +import java.nio.file.Path +import java.time.Instant + +/** + * Accesses a OffsetRange json object a Redis entry. + */ +class OffsetRedisPersistence( + private val redisHolder: RedisHolder +) : OffsetPersistenceFactory { + + override fun read(path: String): Offsets? { + return try { + redisHolder.execute { redis -> + redis[path]?.let { value -> + redisOffsetReader.readValue(value) + .offsets + .fold(Offsets()) { set, (userId, route, lastSuccessOffset, latestOffset) -> + set.apply { + add( + UserRouteOffset( + userId, + route, + lastSuccessOffset, + latestOffset + ) + ) + } + } + } + } + } catch (ex: IOException) { + logger.error( + "Error reading offsets from Redis: {}. Processing all offsets.", + ex.toString() + ) + null + } + } + + /** + * Read the specified Path in Redis and adds the given UserRouteOffset to the offsets. + */ + override fun add(path: Path, offset: UserRouteOffset) { + val offsets: Offsets = (read(path.toString()) ?: Offsets()).apply { add(offset)} + val redisOffsets = RedisOffsets(offsets.offsetsMap.map { (userRoute, fitbitOffsets) -> + RedisOffset( + userRoute.userId, + userRoute.route, + fitbitOffsets.lastSuccessOffset, + fitbitOffsets.latestOffset + ) + }) + try { + redisHolder.execute { redis -> + redis.set(path.toString(), redisOffsetWriter.writeValueAsString(redisOffsets)) + } + } catch (e: IOException) { + logger.error("Failed to write offsets to Redis: {}", e.toString()) + } + } + + companion object { + data class RedisOffsets( + val offsets: List + ) + + data class RedisOffset( + val userId: String, + val route: String, + val lastSuccessOffset: Instant, + val latestOffset: Instant + ) + + private val logger = LoggerFactory.getLogger(OffsetRedisPersistence::class.java) + private val mapper = jacksonObjectMapper().apply { + registerModule(JavaTimeModule()) + configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) + } + val redisOffsetWriter: ObjectWriter = mapper.writerFor(RedisOffsets::class.java) + val redisOffsetReader: ObjectReader = mapper.readerFor(RedisOffsets::class.java) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/Offsets.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/Offsets.kt new file mode 100644 index 00000000..6fc0e36a --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/Offsets.kt @@ -0,0 +1,20 @@ +package org.radarbase.push.integration.fitbit.redis + +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +class Offsets(val offsetsMap: ConcurrentMap = ConcurrentHashMap()) { + fun add(userRouteOffset: UserRouteOffset) { + offsetsMap[userRouteOffset.userRoute] = FitbitOffsets(userRouteOffset.lastSuccessOffset, userRouteOffset.latestOffset) + } + + fun addAll(offsets: Offsets) { + offsetsMap.putAll(offsets.offsetsMap) + } +} + +data class FitbitOffsets( + var lastSuccessOffset: Instant, + var latestOffset: Instant +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/UserRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/UserRoute.kt new file mode 100644 index 00000000..24fd5e75 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/UserRoute.kt @@ -0,0 +1,26 @@ +package org.radarbase.push.integration.fitbit.redis + +import java.util.* + +class UserRoute(val userId: String, val route: String) : Comparable { + private val hash = Objects.hash(userId, route) + + override fun hashCode(): Int = hash + + override fun compareTo(other: UserRoute): Int = compareValuesBy( + this, other, + UserRoute::userId, UserRoute::route + ) + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as UserRoute + + if (userId != other.userId) return false + if (route != other.route) return false + + return true + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/UserRouteOffset.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/UserRouteOffset.kt new file mode 100644 index 00000000..4743dea9 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/redis/UserRouteOffset.kt @@ -0,0 +1,28 @@ +package org.radarbase.push.integration.fitbit.redis + +import com.fasterxml.jackson.annotation.JsonIgnore +import java.time.Instant + +class UserRouteOffset( + val userRoute: UserRoute, + val lastSuccessOffset: Instant, + val latestOffset: Instant +) { + @JsonIgnore + val userId: String = userRoute.userId + + @JsonIgnore + val route: String = userRoute.route + + constructor(userId: String, route: String, lastSuccessOffset: Instant, latestOffset: Instant) : this( + UserRoute(userId, route), + lastSuccessOffset, + latestOffset + ) + + override fun toString(): String { + return "$userId+$route (lastSuccessOffset: $lastSuccessOffset, latestOffset: $latestOffset)" + } + + +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/FitbitRequestGenerator.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/FitbitRequestGenerator.kt new file mode 100644 index 00000000..9376df8e --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/FitbitRequestGenerator.kt @@ -0,0 +1,67 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request + +import com.fasterxml.jackson.core.JsonFactory +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import okhttp3.OkHttpClient +import org.radarbase.gateway.Config +import org.radarbase.gateway.kafka.ProducerPool +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.radarbase.push.integration.fitbit.request.route.FitbitActivityLogRoute +import org.radarbase.push.integration.fitbit.request.route.FitbitFoodLogRoute +import org.radarbase.push.integration.fitbit.request.route.FitbitSleepRoute +import org.radarbase.push.integration.fitbit.request.route.RequestRoute +import org.slf4j.LoggerFactory + +/** + * Generate all requests for Fitbit API. + */ +class FitbitRequestGenerator( + private val userRepository: UserRepository, + config: Config, + producerPool: ProducerPool +) : RequestGeneratorRouter() { + private var baseClient: OkHttpClient? = OkHttpClient() + private val clients: MutableMap = mutableMapOf() + private var routes: List = mutableListOf( + FitbitSleepRoute(this, userRepository, config, producerPool), + FitbitActivityLogRoute(this, userRepository, config, producerPool), + FitbitFoodLogRoute(this, userRepository, config, producerPool) + ) + + override fun routes(): Sequence { + return routes.asSequence() + } + + fun getClient(user: User): OkHttpClient { + return clients.computeIfAbsent(user.id) { + baseClient!!.newBuilder() + .authenticator(TokenAuthenticator(user, userRepository)) + .build() + } + } + + companion object { + val JSON_FACTORY = JsonFactory() + val JSON_READER = ObjectMapper(JSON_FACTORY) + .registerModule(JavaTimeModule()) + .reader() + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/FitbitRestRequest.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/FitbitRestRequest.kt new file mode 100644 index 00000000..e2a4f111 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/FitbitRestRequest.kt @@ -0,0 +1,105 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request + +import jakarta.ws.rs.NotAuthorizedException +import okhttp3.Headers +import okhttp3.OkHttpClient +import okhttp3.Request +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.fitbit.converter.DateRange +import org.radarbase.push.integration.fitbit.converter.TopicData +import org.radarbase.push.integration.fitbit.request.route.RequestRoute +import java.io.IOException +import java.util.function.Predicate + +/** + * REST request taking into account the user and offsets queried. The offsets are useful for + * defining what dates to poll (again). + */ +class FitbitRestRequest( + private val route: RequestRoute, + private val request: Request, + val user: User, + private val client: OkHttpClient, dateRange: DateRange, + private val isValid: Predicate, +) { + + private val dateRange: DateRange + + init { + this.dateRange = dateRange + } + + fun getDateRange(): DateRange { + return dateRange + } + + fun getRequest(): Request { + return request + } + + val isStillValid: Boolean + get() = isValid.test(this) + + /** + * Handle the request using the internal client, using the request route converter. + * @return stream of resulting source records. + * @throws IOException if making or parsing the request failed. + */ + @Throws(IOException::class) + fun handleRequest(): Sequence> { + if (!isStillValid) { + return emptySequence() + } + val records: Sequence> + var data: ByteArray + var headers: Headers + try { + client.newCall(request).execute().use { response -> + if (!response.isSuccessful) { + route.requestFailed(this, response) + return emptySequence() + } + headers = response.headers + val body = response.body + data = body?.bytes() ?: return emptySequence() + } + } catch (ex: IOException) { + route.requestFailed(this, null) + throw ex + } catch (ex: NotAuthorizedException) { + route.requestFailed(this, null) + throw ex + } + records = route.converter().convert(this, headers, data) + if (records.count() == 0) { + route.requestEmpty(this) + } else { + route.requestSucceeded(this, records) + } + return records + } + + override fun toString(): String { + return ("FitbitRestRequest{" + + "url=" + request.url + + ", user=" + user + + ", dateRange=" + dateRange + + '}') + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/RequestGenerator.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/RequestGenerator.kt new file mode 100644 index 00000000..d6e433af --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/RequestGenerator.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request + +import java.time.Instant + +/** + * Dynamically generates requests. The requests should be based on the offsets that are stored in + * the response SourceRecord. + */ +interface RequestGenerator { + val timeOfNextRequest: Instant? + + /** + * Requests that should be queried next. + */ + fun requests(): Sequence +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/RequestGeneratorRouter.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/RequestGeneratorRouter.kt new file mode 100644 index 00000000..d8d685d9 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/RequestGeneratorRouter.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request + +import org.radarbase.push.integration.fitbit.request.route.PollingRequestRoute +import org.radarbase.push.integration.fitbit.request.route.RequestRoute +import java.time.Instant + +abstract class RequestGeneratorRouter : RequestGenerator { + override fun requests(): Sequence { + return routes() + .flatMap { obj: RequestRoute -> obj.requests() } + } + + override val timeOfNextRequest: Instant? + get() = routes() + .map { obj: RequestRoute -> obj.timeOfNextRequest } + .minWithOrNull(Comparator.naturalOrder()) + ?: PollingRequestRoute.nearFuture() + + abstract fun routes(): Sequence +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/TokenAuthenticator.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/TokenAuthenticator.kt new file mode 100644 index 00000000..f1548200 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/TokenAuthenticator.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request + +import jakarta.ws.rs.NotAuthorizedException +import okhttp3.Authenticator +import okhttp3.Request +import okhttp3.Response +import okhttp3.Route +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.slf4j.LoggerFactory +import java.io.IOException + +/** + * Authenticator for Fitbit, which tries to refresh the access token if a request is unauthorized. + */ +class TokenAuthenticator internal constructor(user: User, userRepository: UserRepository) : Authenticator { + private val user: User + private val userRepository: UserRepository + + init { + this.user = user + this.userRepository = userRepository + } + + @Throws(IOException::class) + override fun authenticate(requestRoute: Route?, response: Response): Request? { + return if (response.code != 401) { + null + } else try { + val newAccessToken: String = userRepository.getAccessToken(user) + response.request.newBuilder() + .header("Authorization", "Bearer $newAccessToken") + .build() + } catch (ex: NotAuthorizedException) { + logger.error("Cannot get a new refresh token for user {}. Cancelling request.", user, ex) + null + } + } + + companion object { + private val logger = LoggerFactory.getLogger(TokenAuthenticator::class.java) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitActivityLogRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitActivityLogRoute.kt new file mode 100644 index 00000000..413fa8af --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitActivityLogRoute.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request.route + +import org.radarbase.gateway.Config +import org.radarbase.gateway.kafka.ProducerPool +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.radarbase.push.integration.fitbit.converter.DateRange +import org.radarbase.push.integration.fitbit.converter.FitbitActivityLogDataConverter +import org.radarbase.push.integration.fitbit.request.FitbitRequestGenerator +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import java.time.Duration +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +open class FitbitActivityLogRoute( + generator: FitbitRequestGenerator, + userRepository: UserRepository, + config: Config, + producerPool: ProducerPool +) : FitbitPollingRoute(generator, userRepository, "activities", config, producerPool) { + private val converter: FitbitActivityLogDataConverter + + init { + converter = FitbitActivityLogDataConverter(config.pushIntegration.fitbit.activityLogTopic) + } + + override fun getUrlFormat(baseUrl: String?): String { + return "$baseUrl/1/user/%s/activities/list.json?sort=asc&afterDate=%s&limit=20&offset=0" + } + + /** + * Actually construct a request, based on the current offset + * @param user Fitbit user + * @return request to make + */ + override fun createRequests(user: User): Sequence { + + val startDate: ZonedDateTime = getOffsets(user)?.lastSuccessOffset?.plus(ONE_SECOND) + ?.atZone(ZoneOffset.UTC) + ?.truncatedTo(ChronoUnit.SECONDS) + ?: user.startDate.atZone(ZoneOffset.UTC) + + val endDate: ZonedDateTime = getOffsets(user)?.latestOffset?.plus(ONE_SECOND) + ?.atZone(ZoneOffset.UTC) + ?.truncatedTo(ChronoUnit.SECONDS) + ?: return emptySequence() + + return if (endDate > startDate) { + sequenceOf( + newRequest( + user, DateRange(startDate, endDate), + user.serviceUserId, FitbitSleepRoute.DATE_TIME_FORMAT.format(startDate) + ) + ) + } else emptySequence() + } + + override var pollIntervalPerUser: Duration + get() = ACTIVITY_LOG_POLL_INTERVAL + set(pollIntervalPerUser) { + super.pollIntervalPerUser = pollIntervalPerUser + } + + override fun converter(): FitbitActivityLogDataConverter { + return converter + } + + companion object { + val DATE_TIME_FORMAT = DateTimeFormatter.ISO_LOCAL_DATE_TIME + .withZone(ZoneOffset.UTC) + private val ACTIVITY_LOG_POLL_INTERVAL = Duration.ofDays(1) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitFoodLogRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitFoodLogRoute.kt new file mode 100644 index 00000000..d38c5547 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitFoodLogRoute.kt @@ -0,0 +1,70 @@ +package org.radarbase.push.integration.fitbit.request.route + +import org.radarbase.gateway.Config +import org.radarbase.gateway.kafka.ProducerPool +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.radarbase.push.integration.fitbit.converter.DateRange +import org.radarbase.push.integration.fitbit.converter.FitbitFoodLogConverter +import org.radarbase.push.integration.fitbit.converter.PayloadToSourceRecordConverter +import org.radarbase.push.integration.fitbit.request.FitbitRequestGenerator +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import java.time.Duration +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +class FitbitFoodLogRoute( + generator: FitbitRequestGenerator, + userRepository: UserRepository, + config: Config, + producerPool: ProducerPool +) : FitbitPollingRoute(generator, userRepository, "foods", config, producerPool) { + private val converter: FitbitFoodLogConverter + + init { + converter = FitbitFoodLogConverter(config.pushIntegration.fitbit.foodLogTopic) + } + + override fun createRequests(user: User): Sequence { + val startDate: ZonedDateTime = getOffsets(user)?.lastSuccessOffset?.plus(ONE_SECOND) + ?.atZone(ZoneOffset.UTC) + ?.truncatedTo(ChronoUnit.SECONDS) + ?: user.startDate.atZone(ZoneOffset.UTC) + + val endDate: ZonedDateTime = getOffsets(user)?.latestOffset?.plus(ONE_SECOND) + ?.atZone(ZoneOffset.UTC) + ?.truncatedTo(ChronoUnit.SECONDS) + ?: return emptySequence() + + return if (endDate > startDate) { + sequenceOf( + newRequest( + user, DateRange(startDate, endDate), + user.serviceUserId, DATE_TIME_FORMAT.format(startDate) + ) + ) + } else emptySequence() + } + + override fun getUrlFormat(baseUrl: String?): String { + return "$baseUrl/1/user/%s/foods/log/date/%s.json" + } + + override fun converter(): PayloadToSourceRecordConverter { + return converter + } + + override var pollIntervalPerUser: Duration + get() = FOOD_LOG_POLL_INTERVAL + set(pollIntervalPerUser) { + super.pollIntervalPerUser = pollIntervalPerUser + } + + companion object { + val DATE_TIME_FORMAT: DateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME + .withZone(ZoneOffset.UTC) + private val FOOD_LOG_POLL_INTERVAL = Duration.ofDays(1) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitPollingRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitPollingRoute.kt new file mode 100644 index 00000000..ae8bf54f --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitPollingRoute.kt @@ -0,0 +1,286 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request.route + +import jakarta.ws.rs.NotAuthorizedException +import okhttp3.Request +import okhttp3.Response +import org.apache.avro.generic.IndexedRecord +import org.radarbase.gateway.Config +import org.radarbase.gateway.kafka.ProducerPool +import org.radarbase.push.integration.common.redis.RedisHolder +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.radarbase.push.integration.fitbit.converter.DateRange +import org.radarbase.push.integration.fitbit.converter.PayloadToSourceRecordConverter.Companion.MIN_INSTANT +import org.radarbase.push.integration.fitbit.converter.PayloadToSourceRecordConverter.Companion.nearFuture +import org.radarbase.push.integration.fitbit.converter.TopicData +import org.radarbase.push.integration.fitbit.redis.* +import org.radarbase.push.integration.fitbit.request.FitbitRequestGenerator +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import org.radarbase.push.integration.fitbit.request.route.PollingRequestRoute.Companion.max +import org.radarcns.kafka.ObservationKey +import org.slf4j.LoggerFactory +import redis.clients.jedis.JedisPool +import java.io.IOException +import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit +import java.time.temporal.TemporalAmount +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +/** + * Route for regular polling. + * + * + * The algorithm uses the following polling times: + * 1. do not try polling until getLastPoll() + getPollInterval() + * 2. if that has passed, determine for each user when to poll again. Per user: + * 1. if a successful call was made that returned data, take the last successful offset and after + * getLookbackTime() has passed, poll again. + * 2. if a successful call was made that did not return data, take the last query interval + * and start cycling up from the last successful record, starting no further than + * HISTORICAL_TIME + * + * + * Conditions that should be met: + * 1. Do not poll more frequently than once every getPollInterval(). + * 2. On first addition of a user, poll its entire history + * 3. If the history of a user has been scanned, do not look back further than + * `HISTORICAL_TIME`. This ensures fewer operations under normal operations, where Fitbit + * data is fairly frequently updated. + * 4. If there was data for a certain date time in an API, earlier date times are not polled. This + * prevents duplicate data. + * 5. From after the latest known date time, the history of the user is regularly inspected for new + * records. + * 6, All of the recent history is simultaneously inspected to prevent reading only later data in + * a single batch that is added to the API. + * 7. When a too many records exception occurs, do not poll for given user for + * `TOO_MANY_REQUESTS_COOLDOWN`. + */ +abstract class FitbitPollingRoute( + private val generator: FitbitRequestGenerator, + private val userRepository: UserRepository, + val routeName: String, + private val config: Config, + private val producerPool: ProducerPool, + private val redisHolder: RedisHolder = RedisHolder(JedisPool(config.pushIntegration.garmin.backfill.redis.uri)), + private val offsetPersistenceFactory: OffsetPersistenceFactory = OffsetRedisPersistence(redisHolder), +) : PollingRequestRoute { + private val lastPollPerUser: MutableMap = HashMap() + final override val pollInterval: Duration = config.pushIntegration.fitbit.routePollInterval + final override var lastPoll: Instant = MIN_INSTANT + private set + private var baseUrl: String = config.pushIntegration.fitbit.baseUrl + + /** + * Get the poll interval for a single user on a single route. + */ + protected open var pollIntervalPerUser: Duration = config.pushIntegration.fitbit.pollIntervalPerUser + private val tooManyRequestsForUser: MutableSet = ConcurrentHashMap.newKeySet() + private val tooManyRequestsCooldown: Duration? = + config.pushIntegration.fitbit.tooManyRequestsCooldown.minus(pollIntervalPerUser) + + override fun requestSucceeded(request: FitbitRestRequest, record: Sequence>) { + lastPollPerUser[request.user.id] = lastPoll + var maxOffset: Instant = Instant.MIN + var topic: String? = null + val recordList: List> = record.filter { it.isSuccess }.map { + topic = it.getOrNull()?.topic ?: throw IOException("Topic was not specified") + val data = Pair(request.user.observationKey, it.getOrNull()?.value ?: throw IOException("null data")) + val currentOffset = request.getDateRange().end.toInstant() + if (currentOffset > maxOffset) { + maxOffset = currentOffset + } + data + }.toList() + if (recordList.isNotEmpty()) { + producerPool.produce(topic ?: throw IOException("Topic was not specified"), recordList) + addLastSuccessOffset(request.user, maxOffset) + } + } + + override fun requestEmpty(request: FitbitRestRequest) { + lastPollPerUser[request.user.id] = lastPoll + val endOffset: Instant = request.getDateRange().end.toInstant() + if (ChronoUnit.DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) { + addLastSuccessOffset(request.user, endOffset) + } + } + + override fun requestFailed(request: FitbitRestRequest, response: Response?) { + if (response != null && response.code == 429) { + val user: User = request.user + tooManyRequestsForUser.add(user) + val cooldownString = response.header("Retry-After") + var cooldown = tooManyRequestsCooldown + if (cooldownString != null) { + cooldown = try { + Duration.ofSeconds(cooldownString.toLong()) + } catch (ex: NumberFormatException) { + tooManyRequestsCooldown + } + } + val backOff = lastPoll.plus(cooldown) + lastPollPerUser[user.id] = backOff + logger.info( + "Too many requests for user {}. Backing off until {}", user, backOff.plus(pollIntervalPerUser) + ) + } else { + logger.warn("Failed to make request {}", request) + } + } + + /** + * Actually construct requests, based on the current offset + * @param user Fitbit user + * @return request to make + */ + protected abstract fun createRequests(user: User): Sequence + + override fun requests(): Sequence { + tooManyRequestsForUser.clear() + lastPoll = Instant.now() + return try { + userRepository.stream() + .map { u -> AbstractMap.SimpleImmutableEntry(u, nextPoll(u)) } + .filter { u -> lastPoll.isAfter(u.value) } + .sortedWith(java.util.Map.Entry.comparingByValue()) + .flatMap { u -> createRequests(u.key) } + .filterNotNull() + } catch (e: IOException) { + logger.warn("Cannot read users") + emptySequence() + } + } + + /** Get the time that this route should be polled again. */ + override val timeOfNextRequest: Instant? + get() = nextPolls().minWithOrNull(Comparator.naturalOrder()) ?: nearFuture() + + /** + * Create a FitbitRestRequest for given arguments. + * @param user Fitbit user + * @param dateRange dates that may be queried in the request + * @param urlFormatArgs format arguments to [.getUrlFormat]. + * @return request or `null` if the authorization cannot be arranged. + */ + protected fun newRequest( + user: User, dateRange: DateRange, vararg urlFormatArgs: Any? + ): FitbitRestRequest? { + val builder: Request.Builder = Request.Builder().url(String.format(getUrlFormat(baseUrl), *urlFormatArgs)) + return try { + val request: Request = + builder.header("Authorization", "Bearer " + userRepository.getAccessToken(user)).build() + FitbitRestRequest( + this, request, user, generator.getClient(user), dateRange + ) { req -> !tooManyRequestsForUser.contains(req.user) } + } catch (ex: NotAuthorizedException) { + logger.warn( + "User {} does not have a configured access token: {}. Skipping.", user, ex.toString() + ) + null + } catch (ex: IOException) { + logger.warn( + "User {} does not have a configured access token: {}. Skipping.", user, ex.toString() + ) + null + } + } + + override fun nextPolls(): Sequence { + return try { + userRepository.stream().map { user: User -> nextPoll(user) } + } catch (e: IOException) { + logger.warn("Failed to read users for polling interval: {}", e.toString()) + sequenceOf(lastPoll.plus(pollInterval)) + } + } + + protected fun getLatestOffset(user: User): Instant { + return offsetPersistenceFactory.read(user.versionedId)?.offsetsMap?.get( + UserRoute( + user.versionedId, + routeName + ) + )?.latestOffset ?: user.startDate.minus(ONE_NANO) + } + + protected fun getOffsets(user: User): FitbitOffsets? { + return offsetPersistenceFactory.read(user.versionedId)?.offsetsMap?.get( + UserRoute( + user.versionedId, + routeName + ) + ) + } + + protected fun addLastSuccessOffset(user: User, lastSuccessOffset: Instant) { + val offsets = + offsetPersistenceFactory.read(user.versionedId)?.offsetsMap?.get(UserRoute(user.versionedId, routeName)) + + offsetPersistenceFactory.add( + path = Path.of(user.versionedId), offset = UserRouteOffset( + userId = user.versionedId, + route = routeName, + lastSuccessOffset, + offsets?.latestOffset ?: lastSuccessOffset + ) + ) + } + + /** + * URL String format. The format arguments should be provided to + * [.newRequest] + */ + protected abstract fun getUrlFormat(baseUrl: String?): String + + /** + * Next time that given user should be polled. + */ + protected fun nextPoll(user: User): Instant { + val offset = getLatestOffset(user) + return if (offset.isAfter(user.endDate.minus(endDateThreshold))) { + nearFuture() + } else { + val nextPoll = lastPollPerUser.getOrDefault(user.id, MIN_INSTANT).plus(pollIntervalPerUser) + max(offset.plus(lookbackTime), nextPoll) + } + } + + private val endDateThreshold: TemporalAmount + get() = Duration.ofHours(1) + + companion object { + protected val DATE_FORMAT: DateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE + protected val TIME_FORMAT: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm") + + /** + * Time that should not be polled to avoid duplicate data. + */ + val lookbackTime: Duration = Duration.ofDays(1) // 1 day + const val HISTORICAL_TIME_DAYS = 14L + val ONE_DAY: Duration = ChronoUnit.DAYS.duration + val ONE_NANO: Duration = ChronoUnit.NANOS.duration + val ONE_SECOND: TemporalAmount = ChronoUnit.SECONDS.duration + val ONE_MINUTE: TemporalAmount = ChronoUnit.MINUTES.duration + private val logger = LoggerFactory.getLogger(FitbitSleepRoute::class.java) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitSleepRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitSleepRoute.kt new file mode 100644 index 00000000..9e44ed84 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/FitbitSleepRoute.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request.route + +import org.radarbase.gateway.Config +import org.radarbase.gateway.kafka.ProducerPool +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.radarbase.push.integration.fitbit.converter.DateRange +import org.radarbase.push.integration.fitbit.converter.FitbitSleepDataConverter +import org.radarbase.push.integration.fitbit.request.FitbitRequestGenerator +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import java.time.Duration +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit + +open class FitbitSleepRoute( + generator: FitbitRequestGenerator, + userRepository: UserRepository, + config: Config, + producerPool: ProducerPool +) : FitbitPollingRoute(generator, userRepository, "sleep", config, producerPool) { + private val converter = FitbitSleepDataConverter( + config.pushIntegration.fitbit.sleepStagesTopic, + config.pushIntegration.fitbit.sleepClassicTopic + ) + + override fun getUrlFormat(baseUrl: String?): String { + return "$baseUrl/1.2/user/%s/sleep/list.json?sort=asc&afterDate=%s&limit=100&offset=0" + } + + /** + * Actually construct a request, based on the current offset + * @param user Fitbit user + * @return request to make + */ + override fun createRequests(user: User): Sequence { + val startDate: ZonedDateTime = getOffsets(user)?.lastSuccessOffset?.plus(ONE_SECOND) + ?.atZone(ZoneOffset.UTC) + ?.truncatedTo(ChronoUnit.SECONDS) + ?: user.startDate.atZone(ZoneOffset.UTC) + + val endDate: ZonedDateTime = getOffsets(user)?.latestOffset?.plus(ONE_SECOND) + ?.atZone(ZoneOffset.UTC) + ?.truncatedTo(ChronoUnit.SECONDS) + ?: return emptySequence() + + return if (endDate > startDate) { + sequenceOf( + newRequest( + user, DateRange(startDate, endDate), + user.serviceUserId, DATE_TIME_FORMAT.format(startDate) + ) + ) + } else emptySequence() + } + + override var pollIntervalPerUser: Duration + get() = SLEEP_POLL_INTERVAL + set(pollIntervalPerUser) { + super.pollIntervalPerUser = pollIntervalPerUser + } + + override fun converter(): FitbitSleepDataConverter { + return converter + } + + companion object { + val DATE_TIME_FORMAT: DateTimeFormatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME + .withZone(ZoneOffset.UTC) + private val SLEEP_POLL_INTERVAL = Duration.ofDays(1) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/PollingRequestRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/PollingRequestRoute.kt new file mode 100644 index 00000000..113f6fb7 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/PollingRequestRoute.kt @@ -0,0 +1,61 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request.route + +import java.time.Duration +import java.time.Instant +import java.time.temporal.TemporalAmount + +interface PollingRequestRoute : RequestRoute { + /** + * General polling interval for retrying this route. + */ + val pollInterval: Duration? + + /** + * Last time the route was polled. + */ + val lastPoll: Instant + + /** + * Actual times that new data will be needed. + */ + fun nextPolls(): Sequence + + /** + * Get the time that this route should be polled again. + */ + override val timeOfNextRequest: Instant? + get() = max( + lastPoll.plus(pollInterval), + nextPolls() + .minWithOrNull(Comparator.naturalOrder()) + ?: nearFuture() + ) + + companion object { + fun nearFuture(): Instant? { + return Instant.now().plus(NEAR_FUTURE) + } + + fun ?> max(a: T, b: T): T { + return if (a != null && (b == null || a >= b)) a else b + } + + private val NEAR_FUTURE: TemporalAmount = Duration.ofDays(31L) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/RequestRoute.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/RequestRoute.kt new file mode 100644 index 00000000..68cc67cc --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/request/route/RequestRoute.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2018 The Hyve + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.radarbase.push.integration.fitbit.request.route + +import okhttp3.Response +import org.radarbase.push.integration.fitbit.converter.PayloadToSourceRecordConverter +import org.radarbase.push.integration.fitbit.converter.TopicData +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import org.radarbase.push.integration.fitbit.request.RequestGenerator + +/** + * Single request route. This may represent e.g. a URL. + */ +interface RequestRoute : RequestGenerator { + + fun converter(): PayloadToSourceRecordConverter + + /** + * Called when the request from this route succeeded. + * + * @param request non-null generated request + * @param record non-null resulting records + */ + fun requestSucceeded(request: FitbitRestRequest, record: Sequence>) + fun requestEmpty(request: FitbitRestRequest) + fun requestFailed(request: FitbitRestRequest, response: Response?) +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/resource/FitbitPushEndpoint.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/resource/FitbitPushEndpoint.kt new file mode 100644 index 00000000..35bef0f4 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/resource/FitbitPushEndpoint.kt @@ -0,0 +1,48 @@ +package org.radarbase.push.integration.fitbit.resource + +import com.fasterxml.jackson.databind.JsonNode +import jakarta.inject.Named +import jakarta.inject.Singleton +import jakarta.ws.rs.* +import jakarta.ws.rs.core.Context +import jakarta.ws.rs.core.MediaType +import jakarta.ws.rs.core.Response +import org.radarbase.jersey.auth.Authenticated +import org.radarbase.jersey.exception.HttpInternalServerException +import org.radarbase.push.integration.common.auth.DelegatedAuthValidator +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.fitbit.filter.ClientDomainVerification +import org.radarbase.push.integration.fitbit.service.fitbitapi.FitbitApiService + +@Singleton +@Consumes(MediaType.APPLICATION_JSON) +@Produces(MediaType.APPLICATION_JSON) +@Path("/fitbit") +class FitbitPushEndpoint( + @Context private val fitbitApiService: FitbitApiService +) { + @GET + @ClientDomainVerification("fitbit.com") + fun verify(@QueryParam("verify") verificationCode: String): Response { + return fitbitApiService.verifySubscriber(verificationCode) + } + + @POST + @Authenticated + @ClientDomainVerification("fitbit.com") + fun submitNotification( + @Context @Named(DelegatedAuthValidator.FITBIT_QUALIFIER) userTreeMap: MutableMap + ): Response { + val responses = userTreeMap.map { (user, tree) -> + fitbitApiService.addNotifications(user, tree) + } + + if (responses.any { it.status !in 200..299 }) { + throw HttpInternalServerException( + "exception", "There was an exception while processing the data." + ) + } + + return Response.noContent().build() + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/service/fitbitapi/FitbitApiService.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/fitbitapi/FitbitApiService.kt new file mode 100644 index 00000000..74fc7e07 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/fitbitapi/FitbitApiService.kt @@ -0,0 +1,84 @@ +package org.radarbase.push.integration.fitbit.service.fitbitapi + +import com.fasterxml.jackson.core.type.TypeReference +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.ObjectMapper +import jakarta.ws.rs.BadRequestException +import jakarta.ws.rs.core.Context +import jakarta.ws.rs.core.Response +import jakarta.ws.rs.core.Response.Status +import org.radarbase.gateway.Config +import org.radarbase.gateway.FitbitConfig +import org.radarbase.push.integration.common.redis.RedisHolder +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.fitbit.dto.FitbitNotification +import org.radarbase.push.integration.fitbit.redis.* +import org.slf4j.LoggerFactory +import redis.clients.jedis.JedisPool +import java.io.IOException +import java.nio.file.Path +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter + +class FitbitApiService( + @Context private val config: Config, + @Context objectMapper: ObjectMapper +) { + private val fitbitConfig: FitbitConfig = config.pushIntegration.fitbit + private val contentReader = + objectMapper.readerFor(object : TypeReference>() {}) + + private val redisHolder: RedisHolder = + RedisHolder(JedisPool(config.pushIntegration.fitbit.redis.uri)) + private val offsetPersistenceFactory: OffsetPersistenceFactory = + OffsetRedisPersistence(redisHolder) + + @Throws(IOException::class, BadRequestException::class) + fun verifySubscriber(verificationCode: String): Response { + if (verificationCode == fitbitConfig.verificationCode) { + return Response.noContent().build() + } + return Response.status(Status.NOT_FOUND).build() + } + + + fun addNotifications(user: User, contents: JsonNode): Response { + val notifications = contentReader.readValue>(contents) + + notifications.forEach { + + when (it.collectionType) { + "userRevokedAccess", "deleteUser" -> logger.warn("The user has restricted to send data.") + "activities","body","foods","sleep" -> processNotification(it, user) + else -> logger.info("Unsupported collectionType {} is received.", it.collectionType) + } + } + + return Response.noContent().build() + } + + fun processNotification(notification: FitbitNotification, user: User){ + val offsets: Offsets? = offsetPersistenceFactory.read(user.versionedId) + offsetPersistenceFactory.add( + Path.of(user.versionedId), + UserRouteOffset( + user.versionedId, + notification.collectionType, + offsets?.offsetsMap?.get(UserRoute(user.versionedId, notification.collectionType))?.lastSuccessOffset + ?: user.startDate, + convertStringToInstant(notification.date) + ) + ) + } + + private fun convertStringToInstant(date: String): Instant { + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd") + return LocalDateTime.parse(date, formatter).toInstant(ZoneOffset.UTC) + } + + companion object { + private val logger = LoggerFactory.getLogger(FitbitApiService::class.java) + } +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/service/fitbitapi/FitbitRequestProcessor.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/fitbitapi/FitbitRequestProcessor.kt new file mode 100644 index 00000000..2b02bbc6 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/fitbitapi/FitbitRequestProcessor.kt @@ -0,0 +1,112 @@ +package org.radarbase.push.integration.fitbit.service.fitbitapi + +import jakarta.inject.Named +import jakarta.ws.rs.NotAuthorizedException +import jakarta.ws.rs.core.Context +import org.glassfish.jersey.server.monitoring.ApplicationEvent +import org.glassfish.jersey.server.monitoring.ApplicationEventListener +import org.glassfish.jersey.server.monitoring.RequestEvent +import org.glassfish.jersey.server.monitoring.RequestEventListener +import org.radarbase.gateway.Config +import org.radarbase.gateway.kafka.ProducerPool +import org.radarbase.push.integration.common.auth.DelegatedAuthValidator +import org.radarbase.push.integration.common.redis.RedisHolder +import org.radarbase.push.integration.common.redis.RedisRemoteLockManager +import org.radarbase.push.integration.fitbit.converter.TopicData +import org.radarbase.push.integration.fitbit.request.FitbitRequestGenerator +import org.radarbase.push.integration.fitbit.request.FitbitRestRequest +import org.radarbase.push.integration.fitbit.user.FitbitUserRepository +import org.slf4j.LoggerFactory +import redis.clients.jedis.JedisPool +import java.io.IOException +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +class FitbitRequestProcessor( + @Context private val config: Config, + @Context @Named(DelegatedAuthValidator.FITBIT_QUALIFIER) private val userRepository: FitbitUserRepository, + @Context private val producerPool: ProducerPool +) : ApplicationEventListener { + private val executorService = Executors.newSingleThreadScheduledExecutor() + private val requestExecutorService = Executors.newFixedThreadPool( + config.pushIntegration.fitbit.requestMaxThreads + ) + + private val redisHolder = RedisHolder(JedisPool(config.pushIntegration.garmin.backfill.redis.uri)) + + private val requestGenerator = FitbitRequestGenerator(userRepository, config, producerPool) + private val remoteLockManager = RedisRemoteLockManager( + redisHolder, config.pushIntegration.fitbit.redis.lockPrefix + ) + + override fun onEvent(event: ApplicationEvent?) { + when (event?.type) { + ApplicationEvent.Type.INITIALIZATION_FINISHED -> start() + ApplicationEvent.Type.DESTROY_FINISHED -> stop() + else -> logger.info("Application event received: ${event?.type}") + } + } + + override fun onRequest(requestEvent: RequestEvent?): RequestEventListener? = null + + private fun start() { + logger.info("Application Initialisation completed. Starting Backfill service...") + + executorService.scheduleAtFixedRate(::makeRequests, 1, 5, TimeUnit.MINUTES) + } + + private fun makeRequests() { + var requestsGenerated: Long = 0 + val timeout = ChronoUnit.MILLIS.between(Instant.now(), requestGenerator.timeOfNextRequest) + if (timeout > 0) { + logger.info("Waiting {} milliseconds for next available request", timeout) + Thread.sleep(timeout) + } + + requestGenerator.requests().associateBy { it.user }.forEach { requestMap -> + requestsGenerated++ + requestExecutorService.submit { + remoteLockManager.tryRunLocked(requestMap.key.versionedId) { + if (!requestMap.value.isStillValid) { + logger.info("Requesting {}", requestMap.value.getRequest().url) + val records = makeRequest(requestMap.value) + records?.let { + logger.debug("Processed ${it.count()} records") + } + } + } + } + } + logger.info("Processed $requestsGenerated Urls") + } + + private fun makeRequest(request: FitbitRestRequest): Sequence>? { + return try { + request.handleRequest() + } catch (ex: IOException) { + logger.warn("Failed to make request: {}", ex.toString()) + null + } catch (ex: NotAuthorizedException) { + logger.warn("Failed to make request: {}", ex.toString()) + null + } + } + + private fun stop() { + logger.info("Application Destroy completed. Stopping Backfill service...") + try { + requestExecutorService.awaitTermination(30, TimeUnit.SECONDS) + executorService.awaitTermination(30, TimeUnit.SECONDS) + } catch (e: InterruptedException) { + logger.error("Failed to complete execution: interrupted") + } + } + + + companion object { + private val logger = LoggerFactory.getLogger(FitbitRequestProcessor::class.java) + } + +} diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionRequest.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionRequest.kt new file mode 100644 index 00000000..49fb93e6 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionRequest.kt @@ -0,0 +1,10 @@ +package org.radarbase.push.integration.fitbit.service.subscription + +import okhttp3.Request +import org.radarbase.push.integration.common.user.User + +data class SubscriptionRequest( + val request: Request, + val user: User, + val subscriptionID: String +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionRequestGenerator.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionRequestGenerator.kt new file mode 100644 index 00000000..acb9b92e --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionRequestGenerator.kt @@ -0,0 +1,102 @@ +package org.radarbase.push.integration.fitbit.service.subscription + +import okhttp3.Request +import okhttp3.Response +import okhttp3.internal.EMPTY_REQUEST +import org.radarbase.gateway.Config +import org.radarbase.push.integration.common.redis.RedisHolder +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.fitbit.redis.OffsetPersistenceFactory +import org.radarbase.push.integration.fitbit.redis.OffsetRedisPersistence +import org.radarbase.push.integration.fitbit.redis.UserRouteOffset +import org.radarbase.push.integration.fitbit.user.FitbitUserRepository +import org.slf4j.LoggerFactory +import redis.clients.jedis.JedisPool +import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +class SubscriptionRequestGenerator( + val config: Config, private val userRepository: FitbitUserRepository +) { + private val userDataMap: ConcurrentHashMap = + ConcurrentHashMap() + private val subscriptionID: AtomicInteger = AtomicInteger(0) + private val redisHolder: RedisHolder = + RedisHolder(JedisPool(config.pushIntegration.fitbit.redis.uri)) + private val offsetPersistenceFactory: OffsetPersistenceFactory = + OffsetRedisPersistence(redisHolder) + + private fun subscriptionUrl(user: User, subscriptionID: String): String { + return "https://api.fitbit.com/1/user/" + user.serviceUserId + "/apiSubscriptions/" + subscriptionID + ".json" + } + + fun subscriptionCreationRequest(user: User): SubscriptionRequest? { + if (userDataMap[user.id] == null) { + userDataMap[user.id] = SubscriptionUserData( + false, + subscriptionID.getAndIncrement().toString(), + Instant.now() + ) + } + val userData = userDataMap[user.id] ?: return null + if (userData.subscriptionStatus) return null + if (Instant.now().isBefore(userData.nextRequestTime)) return null + return SubscriptionRequest( + Request.Builder().url(subscriptionUrl(user, userData.subscriptionID)) + .addHeader("accept", "application/json") + .addHeader("authorization", "Bearer " + userRepository.getAccessToken(user)) + .addHeader( + "X-Fitbit-Subscriber-Id", + config.pushIntegration.fitbit.subscriptionConfig.subscriberID + ).post(EMPTY_REQUEST).build(), user, userData.subscriptionID + ) + } + + fun subscriptionDeletionRequest(user: User): SubscriptionRequest? { + val userData = userDataMap[user.id] ?: return null + if (!userData.subscriptionStatus) return null + return SubscriptionRequest( + Request.Builder().url(subscriptionUrl(user, userData.subscriptionID)) + .addHeader("accept", "application/json") + .addHeader("authorization", "Bearer " + userRepository.getAccessToken(user)) + .addHeader( + "X-Fitbit-Subscriber-Id", + config.pushIntegration.fitbit.subscriptionConfig.subscriberID + ).delete(EMPTY_REQUEST).build(), user, userData.subscriptionID + ) + } + + fun subscriptionCreationRequestSuccessful(request: SubscriptionRequest, response: Response) { + userDataMap[request.user.userId]?.subscriptionStatus = true + logger.info("Request successful: {}. Response: {}", request.request, response) + } + + fun subscriptionCreationRequestFailed(request: SubscriptionRequest, response: Response) { + when (response.code) { + 429 -> { + logger.info("Too many requests reach rate limit.") + userDataMap[request.user.userId]?.nextRequestTime = Instant.now() + BACK_OFF_TIME + } + + 409 -> logger.info("The given user is already subscribed to this stream using a different subscription ID or the given subscription ID is already used to identify a subscription to a different stream.") + else -> logger.info("Request failed, {} {}", request, response) + } + } + + fun subscriptionDeletionRequestSuccessful(request: SubscriptionRequest, response: Response) { + logger.info("Request successful: {}. Response: {}", request.request, response) + } + + fun subscriptionDeletionRequestFailed(request: SubscriptionRequest, response: Response) { + logger.info("Request failed, {} {}", request, response) + } + + + companion object { + private val logger = LoggerFactory.getLogger(SubscriptionRequestGenerator::class.java) + private val BACK_OFF_TIME = Duration.ofMinutes(1L) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionService.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionService.kt new file mode 100644 index 00000000..6c35ab04 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionService.kt @@ -0,0 +1,121 @@ +package org.radarbase.push.integration.fitbit.service.subscription + +import jakarta.inject.Named +import jakarta.ws.rs.core.Context +import okhttp3.OkHttpClient +import okio.IOException +import org.glassfish.jersey.server.monitoring.ApplicationEvent +import org.glassfish.jersey.server.monitoring.ApplicationEvent.Type.DESTROY_FINISHED +import org.glassfish.jersey.server.monitoring.ApplicationEvent.Type.INITIALIZATION_FINISHED +import org.glassfish.jersey.server.monitoring.ApplicationEventListener +import org.glassfish.jersey.server.monitoring.RequestEvent +import org.glassfish.jersey.server.monitoring.RequestEventListener +import org.radarbase.gateway.Config +import org.radarbase.push.integration.common.auth.DelegatedAuthValidator.Companion.FITBIT_QUALIFIER +import org.radarbase.push.integration.fitbit.user.FitbitUserRepository +import org.slf4j.LoggerFactory +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit + +class SubscriptionService( + @Context private val config: Config, + @Context private val httpClient: OkHttpClient, + @Named(FITBIT_QUALIFIER) private val userRepository: FitbitUserRepository +) : ApplicationEventListener { + + private val requestGenerator = SubscriptionRequestGenerator(config, userRepository) + private val userExecutorService = Executors.newSingleThreadScheduledExecutor() + private val requestExecutorService = Executors.newSingleThreadExecutor() + private val futures: MutableList> = mutableListOf() + + override fun onEvent(event: ApplicationEvent?) { + when (event?.type) { + INITIALIZATION_FINISHED -> start() + DESTROY_FINISHED -> stop() + else -> logger.info("Application Event Received: ${event?.type}") + } + } + + override fun onRequest(requestEvent: RequestEvent?): RequestEventListener? = null + + private fun start() { + logger.info("Application Initialisation completed. Starting Subscription service...") + userExecutorService.scheduleAtFixedRate(::iterateUsers, 1, 5, TimeUnit.MINUTES) + } + + private fun stop() { + logger.info("Application destroy completed. Stopping subscription service...") + userExecutorService.awaitTermination(30, TimeUnit.SECONDS) + futures.forEach { it.cancel(true) } + futures.clear() + userRepository.stream() + .map { user -> + requestExecutorService.submit { + makeDeletionRequest(requestGenerator.subscriptionDeletionRequest(user)) + } + } + } + + private fun iterateUsers() { + if (!futures.all { it.isDone }) { + logger.info("The previous task is already running. Waiting for next iteration") + // wait for the next iteration + return + } + futures.clear() + logger.info("Iterate Fitbit users and create subscription for newly added ones...") + try { + futures += userRepository.stream() + .map { user -> + requestExecutorService.submit { + makeCreationRequest(requestGenerator.subscriptionCreationRequest(user)) + } + } + } catch (exc: IOException) { + logger.warn("I/O Exception while iterating Fitbit users.", exc) + } catch (exc: Throwable) { + logger.warn("Error while iterating Fitbit users.", exc) + } + } + + private fun makeCreationRequest(request: SubscriptionRequest?) { + if (request == null) { + return + } + logger.debug("Making Request: {}", request.request) + try { + httpClient.newCall(request.request).execute().use { response -> + if (response.isSuccessful) { + requestGenerator.subscriptionCreationRequestSuccessful(request, response) + } else { + requestGenerator.subscriptionCreationRequestFailed(request, response) + } + } + } catch (ex: Throwable) { + logger.warn("Error making request ${request.request.url}.", ex) + } + } + + private fun makeDeletionRequest(request: SubscriptionRequest?) { + if (request == null) { + return + } + logger.debug("Making Request: {}", request.request) + try { + httpClient.newCall(request.request).execute().use { response -> + if (response.isSuccessful) { + requestGenerator.subscriptionDeletionRequestSuccessful(request, response) + } else { + requestGenerator.subscriptionDeletionRequestFailed(request, response) + } + } + } catch (exc: Throwable) { + logger.warn("Error making request ${request.request.url}.", exc) + } + } + + companion object { + private val logger = LoggerFactory.getLogger(SubscriptionService::class.java) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionUserData.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionUserData.kt new file mode 100644 index 00000000..8efa6689 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/service/subscription/SubscriptionUserData.kt @@ -0,0 +1,9 @@ +package org.radarbase.push.integration.fitbit.service.subscription + +import java.time.Instant + +data class SubscriptionUserData( + var subscriptionStatus: Boolean, + val subscriptionID: String, + var nextRequestTime: Instant +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUser.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUser.kt new file mode 100644 index 00000000..cee265ab --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUser.kt @@ -0,0 +1,26 @@ +package org.radarbase.push.integration.fitbit.user + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty +import org.radarbase.push.integration.common.user.User +import org.radarcns.kafka.ObservationKey +import java.time.Instant + +@JsonIgnoreProperties(ignoreUnknown = true) +data class FitbitUser( + @JsonProperty("id") override val id: String, + @JsonProperty("createdAt") override val createdAt: Instant, + @JsonProperty("projectId") override val projectId: String, + @JsonProperty("userId") override val userId: String, + @JsonProperty("humanReadableUserId") override val humanReadableUserId: String?, + @JsonProperty("sourceId") override val sourceId: String, + @JsonProperty("externalId") override val externalId: String?, + @JsonProperty("isAuthorized") override val isAuthorized: Boolean, + @JsonProperty("startDate") override val startDate: Instant, + @JsonProperty("endDate") override val endDate: Instant, + @JsonProperty("version") override val version: String? = null, + @JsonProperty("serviceUserId") override val serviceUserId: String, +) : User { + override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId) + override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}" +} \ No newline at end of file diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUserCredentials.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUserCredentials.kt new file mode 100644 index 00000000..4b76f2fe --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUserCredentials.kt @@ -0,0 +1,9 @@ +package org.radarbase.push.integration.fitbit.user + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty + +@JsonIgnoreProperties(ignoreUnknown = true) +data class FitbitUserCredentials( + @JsonProperty("accessToken") var accessToken: String +) diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUserRepository.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUserRepository.kt new file mode 100644 index 00000000..ef05a4c2 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUserRepository.kt @@ -0,0 +1,148 @@ +package org.radarbase.push.integration.fitbit.user + +import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.databind.ObjectReader +import jakarta.ws.rs.core.Context +import okhttp3.* +import okhttp3.HttpUrl.Companion.toHttpUrl +import org.radarbase.exception.TokenException +import org.radarbase.gateway.Config +import org.radarbase.gateway.FitbitConfig +import org.radarbase.jersey.exception.HttpBadRequestException +import org.radarbase.oauth.OAuth2Client +import org.radarbase.push.integration.common.inject.ObjectReaderFactory +import org.radarbase.push.integration.common.user.User +import org.radarbase.push.integration.common.user.UserRepository +import org.slf4j.LoggerFactory +import java.io.IOException +import java.net.URL +import java.time.Duration +import java.time.Instant +import java.util.concurrent.ConcurrentHashMap + +class FitbitUserRepository( + @Context private val config: Config, + @Context private val client: OkHttpClient, + @Context private val objectReaderFactory: ObjectReaderFactory, +) : UserRepository { + private val fitbitConfig: FitbitConfig = config.pushIntegration.fitbit + private val cachedCredentials: ConcurrentHashMap = + ConcurrentHashMap() + private var nextFetch = MIN_INSTANT + + private val baseUrl: HttpUrl = fitbitConfig.userRepositoryUrl.toHttpUrl() + private var timedCachedUsers: List = ArrayList() + private val tokenUrl: URL = URL(fitbitConfig.userRepositoryTokenUrl) + private val clientId: String = fitbitConfig.userRepositoryClientId + private val clientSecret: String = fitbitConfig.userRepositoryClientSecret + private val repositoryClient: OAuth2Client = OAuth2Client.Builder() + .credentials(clientId, clientSecret) + .endpoint(tokenUrl) + .scopes("SUBJECT.READ", "MEASUREMENT.READ", "SUBJECT.UPDATE", "MEASUREMENT.CREATE") + .httpClient(client) + .build() + + private val userListReader: ObjectReader by lazy { objectReaderFactory.readerFor(FitbitUsers::class) } + private val userReader: ObjectReader by lazy { objectReaderFactory.readerFor(FitbitUser::class) } + private val oauthReader: ObjectReader by lazy { + objectReaderFactory.readerFor( + FitbitUserCredentials::class + ) + } + + @Throws(IOException::class) + override fun get(key: String): User? { + val request: Request = requestFor("users/$key").build() + return makeRequest(request, userReader) + } + + @Throws(IOException::class) + override fun stream(): Sequence { + if (hasPendingUpdates()) { + applyPendingUpdates() + } + return timedCachedUsers.asSequence() + } + + override fun hasPendingUpdates(): Boolean { + val now = Instant.now() + return now.isAfter(nextFetch) + } + + override fun applyPendingUpdates() { + logger.info("Requesting user information from webservice") + val request = requestFor("users?source-type=$FITBIT_SOURCE").build() + timedCachedUsers = makeRequest(request, userListReader).users + + nextFetch = Instant.now().plus(FETCH_THRESHOLD) + } + + override fun getAccessToken(user: User): String { + val credentials: FitbitUserCredentials = + cachedCredentials[user.id] ?: requestUserCredentials(user) + return credentials.accessToken + } + + override fun getRefreshToken(user: User): String { + throw HttpBadRequestException("", "Not available for source type") + } + + + fun requestUserCredentials(user: User): FitbitUserCredentials { + val request = requestFor("users/" + user.id + "/token").build() + val credentials = makeRequest(request, oauthReader) as FitbitUserCredentials + cachedCredentials[user.id] = credentials + return credentials + } + + @Throws(IOException::class) + private fun requestFor(relativeUrl: String): Request.Builder { + val url: HttpUrl = baseUrl.resolve(relativeUrl) + ?: throw IllegalArgumentException("Relative URL is invalid") + val builder: Request.Builder = Request.Builder().url(url) + val authorization = requestAuthorization() + builder.addHeader("Authorization", authorization) + return builder + } + + @Throws(IOException::class) + private fun requestAuthorization(): String { + return try { + "Bearer " + repositoryClient.validToken.accessToken + } catch (ex: TokenException) { + throw IOException(ex) + } + } + + @Throws(IOException::class) + private fun makeRequest(request: Request, reader: ObjectReader?): T { + logger.info("Requesting info from {}", request.url) + client.newCall(request).execute().use { response -> + val body: ResponseBody? = response.body + if (response.code == 404) { + throw NoSuchElementException("URL " + request.url + " does not exist") + } else if (!response.isSuccessful || body == null) { + var message = "Failed to make request (HTTP status code " + response.code + ')' + if (body != null) { + message += body.string() + } + throw IOException(message) + } + val bodyString = body.string() + return try { + if (reader == null) "" as T + else reader.readValue(bodyString) + } catch (ex: JsonProcessingException) { + logger.error("Failed to parse JSON: {}\n{}", ex.toString(), bodyString) + throw ex + } + } + } + + companion object { + private const val FITBIT_SOURCE = "FitBit" + private val FETCH_THRESHOLD: Duration = Duration.ofMinutes(1L) + private val MIN_INSTANT: Instant = Instant.EPOCH + private val logger = LoggerFactory.getLogger(FitbitUserRepository::class.java) + } +} \ No newline at end of file diff --git a/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUsers.kt b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUsers.kt new file mode 100644 index 00000000..79054b52 --- /dev/null +++ b/src/main/kotlin/org/radarbase/push/integration/fitbit/user/FitbitUsers.kt @@ -0,0 +1,7 @@ +package org.radarbase.push.integration.fitbit.user + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.annotation.JsonProperty + +@JsonIgnoreProperties(ignoreUnknown = true) +data class FitbitUsers(@JsonProperty("users") val users: List) diff --git a/src/main/kotlin/org/radarbase/push/integration/garmin/backfill/GarminRequestGenerator.kt b/src/main/kotlin/org/radarbase/push/integration/garmin/backfill/GarminRequestGenerator.kt index 723189d1..bdaecbc4 100644 --- a/src/main/kotlin/org/radarbase/push/integration/garmin/backfill/GarminRequestGenerator.kt +++ b/src/main/kotlin/org/radarbase/push/integration/garmin/backfill/GarminRequestGenerator.kt @@ -5,7 +5,7 @@ import org.radarbase.gateway.Config import org.radarbase.push.integration.common.user.User import org.radarbase.push.integration.garmin.backfill.route.* import org.radarbase.push.integration.garmin.user.GarminUserRepository -import org.radarbase.push.integration.garmin.util.RedisHolder +import org.radarbase.push.integration.common.redis.RedisHolder import org.radarbase.push.integration.garmin.util.offset.* import org.slf4j.LoggerFactory import redis.clients.jedis.JedisPool diff --git a/src/main/kotlin/org/radarbase/push/integration/garmin/service/BackfillService.kt b/src/main/kotlin/org/radarbase/push/integration/garmin/service/BackfillService.kt index 083cee0b..258779c5 100644 --- a/src/main/kotlin/org/radarbase/push/integration/garmin/service/BackfillService.kt +++ b/src/main/kotlin/org/radarbase/push/integration/garmin/service/BackfillService.kt @@ -15,8 +15,8 @@ import org.radarbase.push.integration.garmin.backfill.GarminRequestGenerator import org.radarbase.push.integration.garmin.backfill.RestRequest import org.radarbase.push.integration.garmin.backfill.TooManyRequestsException import org.radarbase.push.integration.garmin.user.GarminUserRepository -import org.radarbase.push.integration.garmin.util.RedisHolder -import org.radarbase.push.integration.garmin.util.RedisRemoteLockManager +import org.radarbase.push.integration.common.redis.RedisHolder +import org.radarbase.push.integration.common.redis.RedisRemoteLockManager import org.slf4j.LoggerFactory import redis.clients.jedis.JedisPool import java.io.IOException diff --git a/src/main/kotlin/org/radarbase/push/integration/garmin/util/offset/OffsetRedisPersistence.kt b/src/main/kotlin/org/radarbase/push/integration/garmin/util/offset/OffsetRedisPersistence.kt index 5a9d939a..a64fcb65 100644 --- a/src/main/kotlin/org/radarbase/push/integration/garmin/util/offset/OffsetRedisPersistence.kt +++ b/src/main/kotlin/org/radarbase/push/integration/garmin/util/offset/OffsetRedisPersistence.kt @@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectWriter import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper -import org.radarbase.push.integration.garmin.util.RedisHolder +import org.radarbase.push.integration.common.redis.RedisHolder import org.slf4j.LoggerFactory import java.io.IOException import java.nio.file.Path