Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ kafka:
pushIntegration:
garmin:
enabled: true
oauthVersion: oauth2
backfill:
defaultEndDate: "1590844126"
# Redis configuration
Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/org/radarbase/gateway/Config.kt
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ data class GarminConfig(
val consumerKey: String = "",
val consumerSecret: String = "",
val backfill: BackfillConfig = BackfillConfig(),
val userRepositoryClass: String =
"org.radarbase.push.integration.garmin.user.GarminServiceUserRepository",
val userRepositoryClass: String = "org.radarbase.push.integration.garmin.user.GarminServiceUserRepository",
val oauthVersion: String = "oauth2",
val userRepositoryUrl: String = "http://localhost:8080/",
val userRepositoryClientId: String = "radar_pushendpoint",
val userRepositoryClientSecret: String = "",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ interface UserRepository {
* @throws NoSuchElementException if the user does not exists in this repository.
*/
@Throws(IOException::class, NotAuthorizedException::class)
fun getAccessToken(user: User): String
fun getOAuth1AccessToken(user: User): String

@Throws(IOException::class, NotAuthorizedException::class)
fun getOAuth2AccessToken(user: User): String
/**
* Get the current refresh token of given user.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import jakarta.inject.Named
import jakarta.ws.rs.container.ContainerRequestContext
import jakarta.ws.rs.core.Context
import org.radarbase.gateway.Config
import org.radarbase.jersey.auth.Auth
import org.radarbase.jersey.auth.AuthValidator
import org.radarbase.jersey.auth.disabled.DisabledAuth
Expand All @@ -18,40 +19,45 @@ import java.time.Instant

class GarminAuthValidator(
@Context private val objectMapper: ObjectMapper,
@Context private val config: Config,
@Named(GARMIN_QUALIFIER) private val userRepository: GarminUserRepository
) :
AuthValidator {

) : AuthValidator {
private var nextRetry: Instant = Instant.MIN
private val isOauth2Flow = config.pushIntegration.garmin.oauthVersion.equals("oauth2", ignoreCase = true)

override fun verify(token: String, request: ContainerRequestContext): Auth {
return if (token.isBlank()) {
throw HttpUnauthorizedException("invalid_token", "The token was empty")
} else {
var isAnyUnauthorised = false
// Enrich the request by adding the User
// the data format in Garmin's post is { <data-type> : [ {<data-1>}, {<data-2>} ] }
val tree = request.getProperty("tree") as JsonNode

val userTreeMap: Map<User, JsonNode> =
// group by user ID since request can contain data from multiple users
tree[tree.fieldNames().next()]
.groupBy { node ->
node[USER_ID_KEY].asText()
}
.filter { (userId, userData) ->
val accessToken = userData[0][USER_ACCESS_TOKEN_KEY].asText()
if (checkIsAuthorised(userId, accessToken)) true else {
val isAuthorized = if (isOauth2Flow) {
checkIsAuthorisedOAuth2(userId)
} else {
val accessToken = userData[0][USER_ACCESS_TOKEN_KEY].asText()
checkIsAuthorisedOAuth1(userId, accessToken)
}
if (isAuthorized) {
true
} else {
isAnyUnauthorised = true
userRepository.deregisterUser(userId, accessToken)
if (!isOauth2Flow) {
val accessToken = userData[0][USER_ACCESS_TOKEN_KEY].asText()
userRepository.deregisterUser(userId, accessToken)
}
false
}
}
.entries
.associate { (userId, userData) ->
userRepository.findByExternalId(userId) to
// Map the List<JsonNode> back to <data-type>: [ {<data-1>}, {<data-2>} ]
// so it can be processed in the services without much refactoring
objectMapper.createObjectNode()
.set(tree.fieldNames().next(), objectMapper.valueToTree(userData))
}
Expand All @@ -63,54 +69,70 @@ class GarminAuthValidator(
)
request.removeProperty("tree")

// Disable auth since we don't have proper auth support
DisabledAuth("res_gateway")
}
}

override fun getToken(request: ContainerRequestContext): String? {
return if (request.hasEntity()) {
// We put the json tree in the request because the entity stream will be closed here
val tree = objectMapper.readTree(request.entityStream)
request.setProperty("tree", tree)
val userAccessToken = tree[tree.fieldNames().next()][0][USER_ACCESS_TOKEN_KEY]
?: throw HttpUnauthorizedException("invalid_token", "No user access token provided")
userAccessToken.asText().also {
request.setProperty(USER_ACCESS_TOKEN_KEY, it)

if (isOauth2Flow) {
val userId = tree[tree.fieldNames().next()][0][USER_ID_KEY]?.asText()
?: throw HttpUnauthorizedException("invalid_token", "No user ID provided")
userId //TODO: make sure if we can return the user id or fetch the access token
} else {
val userAccessToken = tree[tree.fieldNames().next()][0][USER_ACCESS_TOKEN_KEY]
?: throw HttpUnauthorizedException("invalid_token", "No user access token provided")
userAccessToken.asText().also {
request.setProperty(USER_ACCESS_TOKEN_KEY, it)
}
}
} else {
null
}
}

private fun checkIsAuthorised(userId: String, accessToken: String, retry: Boolean = true):
Boolean {
private fun checkIsAuthorisedOAuth1(userId: String, accessToken: String, retry: Boolean = true): Boolean {
val user = try {
userRepository.findByExternalId(userId)
} catch (exc: NoSuchElementException) {
} catch (_: NoSuchElementException) {
return if (retry && Instant.now() > nextRetry) {
userRepository.applyPendingUpdates()
nextRetry = Instant.now().plusSeconds(REFRESH_TIMEOUT_S)
checkIsAuthorised(userId, accessToken, retry = false)
checkIsAuthorisedOAuth1(userId, accessToken, retry = false)
} else {
logger.warn(
"no_user: The user {} could not be found in the " +
"user repository.", userId
)
logger.warn("The user {} could not be found in the user repository", userId)
false
}
}
if (!user.isAuthorized) {
logger.warn(
"invalid_user: The user {} does not seem to be authorized.", userId
)
logger.warn("iThe user {} does not seem to be authorized", userId)
return false
}
if (userRepository.getAccessToken(user) != accessToken) {
logger.warn(
"invalid_token: The token for user {} does not" +
" match with the records on the system.", userId
)
if (userRepository.getOAuth1AccessToken(user) != accessToken) {
logger.warn("The token for user {} does not match with the auth records", userId)
return false
}
return true
}

private fun checkIsAuthorisedOAuth2(userId: String, retry: Boolean = true): Boolean {
val user = try {
userRepository.findByExternalId(userId)
} catch (_: NoSuchElementException) {
return if (retry && Instant.now() > nextRetry) {
userRepository.applyPendingUpdates()
nextRetry = Instant.now().plusSeconds(REFRESH_TIMEOUT_S)
checkIsAuthorisedOAuth2(userId, retry = false)
} else {
logger.warn(" The user {} could not be found in the user repository.", userId)
false
}
}
if (!user.isAuthorized) {
logger.warn("The user {} does not seem to be authorized.", userId)
return false
}
return true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class GarminRequestGenerator(
}
}.toList()

private val isOauth2Flow: Boolean = config.pushIntegration.garmin.oauthVersion.equals("oauth2", ignoreCase = true)

private val userNextRequest: MutableMap<String, Instant> = mutableMapOf()

private val routeNextRequest: MutableMap<Route, Instant> = mutableMapOf()
Expand Down Expand Up @@ -154,7 +156,7 @@ class GarminRequestGenerator(
val endDate = userRepository.getBackfillEndDate(user)
if (endDate <= startOffset) return@flatMap emptySequence()
val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate)
route.generateRequests(user, startOffset, endTime, max / routes.size)
route.generateRequests(user, isOauth2Flow, startOffset, endTime, max / routes.size)
}
.takeWhile { !shouldBackoff }
} else emptySequence()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ interface Route {
*/
val maxIntervalPerRequest: Duration

fun generateRequests(user: User, start: Instant, end: Instant, max: Int): Sequence<RestRequest>
fun generateRequests(user: User, isOauth2Flow: Boolean, start: Instant, end: Instant, max: Int): Sequence<RestRequest>

/**
* This is how it would appear in the offsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@ abstract class GarminRoute(
override val maxIntervalPerRequest: Duration
get() = DEFAULT_INTERVAL_PER_REQUEST

fun createRequest(user: User, baseUrl: String, queryParams: String): Request {
fun createRequest(user: User, isOauth2Flow: Boolean, baseUrl: String, queryParams: String): Request {
val request = Request.Builder()
.url(baseUrl + queryParams)
.get()
.build()

val parameters = getParams(request.url)
val requestParams = SignRequestParams(baseUrl, ROUTE_METHOD, parameters)
val signedRequest = userRepository.getSignedRequest(user, requestParams)
return if (isOauth2Flow) {
val accessToken = userRepository.getOAuth2AccessToken(user)
request.newBuilder()
.addHeader("Authorization", "Bearer $accessToken")
.build()
} else {
val parameters = getParams(request.url)
val requestParams = SignRequestParams(baseUrl, ROUTE_METHOD, parameters)
val signedRequest = userRepository.getSignedRequest(user, requestParams)

return Oauth1Signing(signedRequest.parameters).signRequest(request)
Oauth1Signing(signedRequest.parameters).signRequest(request)
}
}

fun getParams(url: HttpUrl): Map<String, String> {
Expand All @@ -51,6 +58,7 @@ abstract class GarminRoute(

override fun generateRequests(
user: User,
isOauth2Flow: Boolean,
start: Instant,
end: Instant,
max: Int
Expand All @@ -61,9 +69,9 @@ abstract class GarminRoute(
.map { startRange ->
val endRange = (startRange + maxIntervalPerRequest).coerceAtMost(end)
val request = createRequest(
user, "$GARMIN_BACKFILL_BASE_URL/${subPath()}",
user, isOauth2Flow, "$GARMIN_BACKFILL_BASE_URL/${subPath()}",
"?summaryStartTimeInSeconds=${startRange.epochSecond}" +
"&summaryEndTimeInSeconds=${endRange.epochSecond}"
"&summaryEndTimeInSeconds=${endRange.epochSecond}"
)
RestRequest(request, user, this, startRange, endRange)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.radarbase.push.integration.common.auth.SignRequestParams
import org.radarbase.push.integration.common.inject.ObjectReaderFactory
import org.radarbase.push.integration.common.user.User
import org.radarbase.push.integration.common.user.Users
import org.radarbase.push.integration.garmin.util.OAuth2TokenCache
import org.slf4j.LoggerFactory
import java.io.IOException
import java.net.URL
Expand All @@ -33,8 +34,12 @@ class GarminServiceUserRepository(
@Context private val objectReaderFactory: ObjectReaderFactory,
) : GarminUserRepository(config) {
private val garminConfig: GarminConfig = config.pushIntegration.garmin
private val cachedCredentials: ConcurrentHashMap<String, OAuth1UserCredentials> =
private val oAuth1CachedCredentials: ConcurrentHashMap<String, OAuth1UserCredentials> by lazy {
ConcurrentHashMap<String, OAuth1UserCredentials>()
}
private val oAuth2CachedCredentials: OAuth2TokenCache by lazy {
OAuth2TokenCache(Duration.ofMinutes(30))
}
private var nextFetch = MIN_INSTANT

private val baseUrl: HttpUrl
Expand All @@ -48,8 +53,9 @@ class GarminServiceUserRepository(

private val userListReader: ObjectReader by lazy { objectReaderFactory.readerFor(Users::class) }
private val userReader: ObjectReader by lazy { objectReaderFactory.readerFor(GarminUser::class) }
private val oauthReader: ObjectReader by lazy { objectReaderFactory.readerFor(OAuth1UserCredentials::class) }
private val oAuth1ResponseReader: ObjectReader by lazy { objectReaderFactory.readerFor(OAuth1UserCredentials::class) }
private val signedRequestReader: ObjectReader by lazy { objectReaderFactory.readerFor(SignRequestParams::class) }
private val oAuth2ResponseReader: ObjectReader by lazy { objectReaderFactory.readerFor(OAuth2UserCredentials::class) }

init {
baseUrl = garminConfig.userRepositoryUrl.toHttpUrl()
Expand Down Expand Up @@ -84,15 +90,15 @@ class GarminServiceUserRepository(

fun requestUserCredentials(user: User): OAuth1UserCredentials {
val request = requestFor("users/" + user.id + "/token").build()
val credentials = makeRequest(request, oauthReader) as OAuth1UserCredentials
cachedCredentials[user.id] = credentials
val credentials = makeRequest(request, oAuth1ResponseReader) as OAuth1UserCredentials
oAuth1CachedCredentials[user.id] = credentials
return credentials
}

@Throws(IOException::class, NotAuthorizedException::class)
override fun getAccessToken(user: User): String {
override fun getOAuth1AccessToken(user: User): String {
val credentials: OAuth1UserCredentials =
cachedCredentials[user.id] ?: requestUserCredentials(user)
oAuth1CachedCredentials[user.id] ?: requestUserCredentials(user)
return credentials.accessToken
}

Expand All @@ -101,6 +107,13 @@ class GarminServiceUserRepository(
throw HttpBadRequestException("", "Not available for source type")
}

override fun getOAuth2AccessToken(user: User): String {
return oAuth2CachedCredentials.getOrFetchToken(user.id) {
val request = requestFor("users/" + user.id + "/token").build()
makeRequest(request, oAuth2ResponseReader)
}
}

override fun getSignedRequest(user: User, payload: SignRequestParams): SignRequestParams {
val body = JSONObject(payload).toString().toRequestBody(JSON_MEDIA_TYPE)
val request = requestFor("users/" + user.id + "/token/sign").method("POST", body).build()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.radarbase.push.integration.garmin.user

import com.fasterxml.jackson.annotation.JsonIgnoreProperties
import com.fasterxml.jackson.annotation.JsonProperty
import java.time.Instant

@JsonIgnoreProperties(ignoreUnknown = true)
data class OAuth2UserCredentials(
@param:JsonProperty("accessToken") val accessToken: String,
@param:JsonProperty("expiresIn") val expiresAt: Instant
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package org.radarbase.push.integration.garmin.util

import org.radarbase.push.integration.garmin.user.OAuth2UserCredentials
import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap

@Suppress("unused")
class OAuth2TokenCache(private val removalAdvance: Duration = Duration.ofMinutes(30)) {
private val map = ConcurrentHashMap<String, OAuth2UserCredentials>()

fun put(userId: String, accessToken: String, expiry: Instant) {
map[userId] = OAuth2UserCredentials(accessToken, expiry)
}

fun remove(key: String) {
map.remove(key)
}

/**
* Get the value if it's still valid and not near expiry, otherwise call `fetch`,
* store a returned pair, and return the fresh value.
*
* @param fetch returns the new access token and its expiry time.
*/
fun getOrFetchToken(key: String, fetch: () -> OAuth2UserCredentials): String {
val now = Instant.now()
val entry = map.compute(key) { _, existing ->
val shouldFetch = when {
existing == null -> true
now.isAfter(existing.expiresAt) || now == existing.expiresAt -> true
!now.isBefore(existing.expiresAt.minus(removalAdvance)) -> true
else -> false
}

if (shouldFetch) {
fetch()
} else {
existing
}
}!!
return entry.accessToken
}

fun isValidAndNotNear(key: String): Boolean {
val e = map[key] ?: return false
val now = Instant.now()
return now.isBefore(e.expiresAt.minus(removalAdvance))
}
}
Loading