Skip to content

Commit 787b87c

Browse files
committed
Fix updating of offsets
1 parent f0ad26a commit 787b87c

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

oura-library/src/main/kotlin/org/radarbase/oura/converter/TopicData.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ data class TopicData(
77
val topic: String,
88
val key: SpecificRecord,
99
val value: SpecificRecord,
10-
val offset: Long?,
10+
val offset: Long,
1111
)

oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ constructor(
4848
.stream()
4949
.flatMap { user ->
5050
if (user.ready()) {
51-
return@flatMap generateRequests(route, user)
51+
generateRequests(route, user)
5252
} else {
5353
emptySequence()
5454
}
@@ -74,11 +74,10 @@ constructor(
7474
logger.debug("Offsets found in persistence.")
7575
offset.offset.coerceAtLeast(startDate)
7676
}
77-
val endDate = user.endDate
77+
val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate
7878
if (endDate <= startOffset) return emptySequence()
79-
if (endDate >= Instant.now()) return emptySequence()
8079
val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate)
81-
return route.generateRequests(user, startOffset, endTime)
80+
return route.generateRequests(user, startOffset, endTime, USER_MAX_REQUESTS)
8281
}
8382

8483
fun handleResponse(req: RestRequest, response: Response): OuraResult<List<TopicData>> {
@@ -94,13 +93,21 @@ constructor(
9493
}
9594

9695
override fun requestSuccessful(request: RestRequest, response: Response): List<TopicData> {
97-
logger.debug("Request successful: {}. Writing to offsets...", request.request)
96+
logger.debug("Request successful: {}..", request.request)
9897
val body: ResponseBody? = response.body
9998
val data = body?.bytes()!!
10099
val records = request.route.converters.flatMap {
101100
it.convert(request, response.headers, data)
102101
}
103-
ouraOffsetManager.updateOffsets(request.route, request.user, request.endDate)
102+
val offset = records.maxByOrNull { it -> it.offset }?.offset
103+
if (offset != null) {
104+
logger.info("Writing ${records.size} records to offsets...")
105+
ouraOffsetManager.updateOffsets(
106+
request.route,
107+
request.user,
108+
Instant.ofEpochMilli(offset),
109+
)
110+
}
104111
return records
105112
}
106113

@@ -172,8 +179,9 @@ constructor(
172179

173180
companion object {
174181
private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java)
175-
private val BACK_OFF_TIME = Duration.ofMinutes(1L)
182+
private val BACK_OFF_TIME = Duration.ofMinutes(5L)
176183
private val USER_BACK_OFF_TIME = Duration.ofDays(1L)
184+
private val USER_MAX_REQUESTS = 20
177185
val JSON_FACTORY = JsonFactory()
178186
val JSON_READER = ObjectMapper(JSON_FACTORY).registerModule(JavaTimeModule()).reader()
179187
}

0 commit comments

Comments
 (0)