Skip to content

Commit 4c74569

Browse files
committed
Add client auth fixes to Fitbit ServiceUserRepository and delete legacy implementation
1 parent ec003bd commit 4c74569

File tree

2 files changed

+97
-292
lines changed

2 files changed

+97
-292
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import io.ktor.client.plugins.auth.providers.basic
2929
import io.ktor.client.plugins.contentnegotiation.ContentNegotiation
3030
import io.ktor.client.plugins.defaultRequest
3131
import io.ktor.client.request.HttpRequestBuilder
32+
import io.ktor.client.request.header
3233
import io.ktor.client.request.request
3334
import io.ktor.client.request.setBody
3435
import io.ktor.client.request.url
@@ -49,6 +50,7 @@ import kotlinx.coroutines.Dispatchers
4950
import kotlinx.coroutines.runBlocking
5051
import kotlinx.coroutines.withContext
5152
import kotlinx.serialization.json.Json
53+
import kotlinx.serialization.json.jsonObject
5254
import org.radarbase.connect.rest.RestSourceConnectorConfig
5355
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig
5456
import org.radarbase.kotlin.coroutines.CacheConfig
@@ -73,6 +75,13 @@ class ServiceUserRepository : UserRepository {
7375
private val credentialCacheConfig =
7476
CacheConfig(refreshDuration = 1.days, retryDuration = 1.minutes)
7577
private val mapper = ObjectMapper().registerKotlinModule().registerModule(JavaTimeModule())
78+
79+
// User repository service token cache for OAuth2 client credentials authentication
80+
private var userRepositoryTokenCache: CachedValue<String>? = null
81+
private val userRepositoryCacheConfig = CacheConfig(
82+
refreshDuration = 50.minutes,
83+
retryDuration = 1.minutes,
84+
)
7685

7786
@Throws(IOException::class)
7887
override fun get(key: String): User = runBlocking(Dispatchers.Default) {
@@ -90,10 +99,20 @@ class ServiceUserRepository : UserRepository {
9099
tokenUrl = URLBuilder(config.fitbitUserRepositoryTokenUrl.toString()).build(),
91100
clientId = config.fitbitUserRepositoryClientId,
92101
clientSecret = config.fitbitUserRepositoryClientSecret,
93-
scope = "SUBJECT.READ MEASUREMENT.CREATE",
94-
audience = "res_restAuthorizer",
95102
)
96103

104+
if (config.fitbitUserRepositoryTokenUrl.toString().isNotBlank()) {
105+
userRepositoryTokenCache = CachedValue(userRepositoryCacheConfig) {
106+
requestUserRepositoryToken(
107+
tokenUrl = URLBuilder(config.fitbitUserRepositoryTokenUrl.toString()).build(),
108+
clientId = config.fitbitUserRepositoryClientId,
109+
clientSecret = config.fitbitUserRepositoryClientSecret,
110+
scope = "SUBJECT.READ MEASUREMENT.CREATE",
111+
audience = "res_restAuthorizer",
112+
)
113+
}
114+
}
115+
97116
val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
98117
userCache = CachedSet(
99118
CacheConfig(
@@ -115,30 +134,9 @@ class ServiceUserRepository : UserRepository {
115134
tokenUrl: Url?,
116135
clientId: String?,
117136
clientSecret: String?,
118-
scope: String?,
119-
audience: String?,
120137
): HttpClient = HttpClient(CIO) {
121-
if (tokenUrl != null) {
122-
install(Auth) {
123-
clientCredentials(
124-
ClientCredentialsConfig(
125-
tokenUrl.toString(),
126-
clientId,
127-
clientSecret,
128-
scope,
129-
audience,
130-
).copyWithEnv("MANAGEMENT_PORTAL"),
131-
baseUrl.host,
132-
)
133-
}
134-
install(ContentNegotiation) {
135-
json(
136-
Json {
137-
ignoreUnknownKeys = true
138-
},
139-
)
140-
}
141-
} else if (clientId != null && clientSecret != null) {
138+
// Only add basic auth if no token URL is provided (fallback authentication)
139+
if (tokenUrl == null && clientId != null && clientSecret != null) {
142140
install(Auth) {
143141
basic {
144142
credentials {
@@ -168,6 +166,74 @@ class ServiceUserRepository : UserRepository {
168166
}
169167
}
170168

169+
private suspend fun requestUserRepositoryToken(
170+
tokenUrl: Url,
171+
clientId: String?,
172+
clientSecret: String?,
173+
scope: String?,
174+
audience: String?,
175+
): String {
176+
return try {
177+
val authClient = HttpClient(CIO) {
178+
install(Auth) {
179+
clientCredentials(
180+
ClientCredentialsConfig(
181+
tokenUrl.toString(),
182+
clientId,
183+
clientSecret,
184+
scope,
185+
audience,
186+
).copyWithEnv("MANAGEMENT_PORTAL"),
187+
tokenUrl.host,
188+
)
189+
}
190+
install(ContentNegotiation) {
191+
json(
192+
Json {
193+
ignoreUnknownKeys = true
194+
},
195+
)
196+
}
197+
install(HttpTimeout) {
198+
connectTimeoutMillis = 60.seconds.inWholeMilliseconds
199+
requestTimeoutMillis = 90.seconds.inWholeMilliseconds
200+
}
201+
}
202+
203+
val response = authClient.request {
204+
url(tokenUrl)
205+
method = HttpMethod.Post
206+
setBody("grant_type=client_credentials&scope=${scope ?: ""}&audience=${audience ?: ""}")
207+
contentType(ContentType.Application.FormUrlEncoded)
208+
}
209+
210+
authClient.close()
211+
212+
if (!response.status.isSuccess()) {
213+
throw HttpResponseException(
214+
"Failed to get user repository token: ${response.status}",
215+
response.status.value,
216+
)
217+
}
218+
219+
val tokenResponse = Json.parseToJsonElement(response.bodyAsText()).jsonObject
220+
tokenResponse["access_token"]?.toString()?.removeSurrounding("\"")
221+
?: throw IllegalStateException("No access token in response")
222+
} catch (e: Exception) {
223+
logger.error("Failed to get user repository token", e)
224+
throw e
225+
}
226+
}
227+
228+
private suspend fun getAuthorizationHeader(): String {
229+
return try {
230+
userRepositoryTokenCache?.get() ?: throw IllegalStateException("User repository token cache not initialized")
231+
} catch (e: Exception) {
232+
logger.error("Failed to get authorization header", e)
233+
throw e
234+
}
235+
}
236+
171237
override fun stream(): Stream<out User> = runBlocking(Dispatchers.Default) {
172238
val valueInCache = userCache.getFromCache()
173239
.takeIf { it is CachedValue.CacheValue }
@@ -248,7 +314,12 @@ class ServiceUserRepository : UserRepository {
248314
private suspend inline fun <reified T> makeRequest(
249315
crossinline builder: HttpRequestBuilder.() -> Unit,
250316
): T = withContext(Dispatchers.IO) {
251-
val response = client.request(builder)
317+
val response = client.request {
318+
builder()
319+
userRepositoryTokenCache?.let { tokenCache ->
320+
header("Authorization", "Bearer ${getAuthorizationHeader()}")
321+
}
322+
}
252323
val contentLength = response.contentLength()
253324
// if Transfer-Encoding: chunked, then the request has data but contentLength will be null.
254325
val transferEncoding = response.headers["Transfer-Encoding"]

0 commit comments

Comments
 (0)