Skip to content

Commit e2eff49

Browse files
committed
Fix OuraServiceUserRepository serialization issues
1 parent 742db0e commit e2eff49

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

kafka-connect-oura-source/build.gradle.kts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,21 @@ dependencies {
55
api(project(":oura-library"))
66
api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}")
77
api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}")
8-
implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}")
8+
implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}")
99

1010
implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}"))
1111
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
1212
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")
1313
implementation("com.google.firebase:firebase-admin:${Versions.firebaseAdmin}")
1414
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.8.21")
1515

16+
implementation("io.ktor:ktor-client-auth:${Versions.ktor}")
17+
implementation("io.ktor:ktor-client-content-negotiation:${Versions.ktor}")
18+
implementation("io.ktor:ktor-serialization-jackson:${Versions.ktor}")
19+
implementation("io.ktor:ktor-client-cio-jvm:${Versions.ktor}")
20+
implementation("io.ktor:ktor-serialization-kotlinx-json:${Versions.ktor}")
21+
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:${Versions.jackson}")
22+
1623
// Included in connector runtime
1724
compileOnly("org.apache.kafka:connect-api:${Versions.kafka}")
1825
compileOnly(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}"))

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
*/
1717
package org.radarbase.connect.rest.oura.user
1818

19+
import com.fasterxml.jackson.databind.ObjectMapper
1920
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
21+
import com.fasterxml.jackson.module.kotlin.readValue
22+
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
2023
import io.ktor.client.HttpClient
21-
import io.ktor.client.call.body
2224
import io.ktor.client.engine.cio.CIO
2325
import io.ktor.client.plugins.HttpTimeout
2426
import io.ktor.client.plugins.auth.Auth
@@ -41,9 +43,11 @@ import io.ktor.http.contentType
4143
import io.ktor.http.isSuccess
4244
import io.ktor.http.takeFrom
4345
import io.ktor.serialization.jackson.jackson
46+
import io.ktor.serialization.kotlinx.json.json
4447
import kotlinx.coroutines.Dispatchers
4548
import kotlinx.coroutines.runBlocking
4649
import kotlinx.coroutines.withContext
50+
import kotlinx.serialization.json.Json
4751
import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig
4852
import org.radarbase.kotlin.coroutines.CacheConfig
4953
import org.radarbase.kotlin.coroutines.CachedSet
@@ -69,6 +73,7 @@ class OuraServiceUserRepository : UserRepository {
6973
private val credentialCaches = ConcurrentHashMap<String, CachedValue<OAuth2UserCredentials>>()
7074
private val credentialCacheConfig =
7175
CacheConfig(refreshDuration = 1.days, retryDuration = 1.minutes)
76+
private val mapper = ObjectMapper().registerKotlinModule().registerModule(JavaTimeModule())
7277

7378
@Throws(IOException::class)
7479
override fun get(key: String): User = runBlocking(Dispatchers.Default) {
@@ -86,18 +91,17 @@ class OuraServiceUserRepository : UserRepository {
8691
clientId = config.ouraUserRepositoryClientId,
8792
clientSecret = config.ouraUserRepositoryClientSecret,
8893
)
89-
9094

9195
userCache = CachedSet(
9296
CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes),
9397
) {
9498
makeRequest<OuraUsers> { url("users?source-type=Oura") }
9599
.users
96100
.toHashSet()
97-
// .filterTo(HashSet()) { u ->
98-
// u.isComplete() &&
99-
// (containedUsers.isEmpty() || u.versionedId in containedUsers)
100-
// }
101+
.filterTo(HashSet()) { u ->
102+
u.isComplete() &&
103+
(containedUsers.isEmpty() || u.versionedId in containedUsers)
104+
}
101105
}
102106
}
103107

@@ -118,6 +122,13 @@ class OuraServiceUserRepository : UserRepository {
118122
baseUrl.host,
119123
)
120124
}
125+
install(ContentNegotiation) {
126+
json(
127+
Json {
128+
ignoreUnknownKeys = true
129+
},
130+
)
131+
}
121132
} else if (clientId != null && clientSecret != null) {
122133
install(Auth) {
123134
basic {
@@ -244,7 +255,7 @@ class OuraServiceUserRepository : UserRepository {
244255
}
245256
throw HttpResponseException(message, response.status.value)
246257
}
247-
response.body<T>()
258+
mapper.readValue<T>(response.bodyAsText())
248259
}
249260

250261
companion object {

0 commit comments

Comments
 (0)