Skip to content

Commit 1106a31

Browse files
committed
Store backoff request time per route instead of per user
1 parent 83b782d commit 1106a31

File tree

1 file changed

+88
-24
lines changed

1 file changed

+88
-24
lines changed

oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt

Lines changed: 88 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ constructor(
2525
public val routes: List<Route> = OuraRouteFactory.getRoutes(userRepository),
2626
) : RequestGenerator {
2727
private val userNextRequest: MutableMap<String, Instant> = mutableMapOf()
28+
private val routeNextRequest: MutableMap<String, Instant> = mutableMapOf()
2829

2930
public var nextRequestTime: Instant = Instant.MIN
3031

@@ -38,9 +39,18 @@ constructor(
3839
return if (user.ready()) {
3940
routes.asSequence()
4041
.flatMap { route ->
41-
return@flatMap generateRequests(route, user)
42+
if (routeReady(user, route)) {
43+
return@flatMap generateRequests(route, user)
44+
} else {
45+
logger.info(
46+
"Skip {} for {}: route in backoff until {}",
47+
route,
48+
user.versionedId,
49+
routeNextRequest[routeKey(route, user)],
50+
)
51+
return@flatMap emptySequence()
52+
}
4253
}
43-
.takeWhile { !shouldBackoff }
4454
} else {
4555
emptySequence()
4656
}
@@ -54,12 +64,28 @@ constructor(
5464
.stream()
5565
.flatMap { user ->
5666
if (user.ready()) {
57-
generateRequests(route, user)
67+
if (routeReady(user, route)) {
68+
generateRequests(route, user)
69+
} else {
70+
logger.info(
71+
"Skip {} for {}: route in backoff until {}",
72+
route,
73+
user.versionedId,
74+
routeNextRequest[routeKey(route, user)],
75+
)
76+
emptySequence()
77+
}
5878
} else {
79+
logger.info(
80+
"Skip {} for {}: user in backoff until {}",
81+
route,
82+
user.versionedId,
83+
userNextRequest[user.versionedId]
84+
)
5985
emptySequence()
6086
}
6187
}
62-
.takeWhile { !shouldBackoff }
88+
6389
}
6490

6591
override fun requests(
@@ -68,7 +94,17 @@ constructor(
6894
max: Int,
6995
): Sequence<RestRequest> {
7096
return if (user.ready()) {
71-
return generateRequests(route, user).takeWhile { !shouldBackoff }
97+
return if (routeReady(user, route)) {
98+
generateRequests(route, user)
99+
} else {
100+
logger.info(
101+
"Skip {} for {}: route in backoff until {}",
102+
route,
103+
user.versionedId,
104+
routeNextRequest[routeKey(route, user)],
105+
)
106+
emptySequence()
107+
}
72108
} else {
73109
emptySequence()
74110
}
@@ -90,8 +126,31 @@ constructor(
90126
offsetTime.coerceAtLeast(startDate)
91127
}
92128
val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now()
93-
if (Duration.between(startOffset, endDate) <= ONE_DAY) {
94-
logger.info("Interval between dates is too short. Not requesting..")
129+
if (!startOffset.isBefore(endDate)) {
130+
// If the user's configured endDate is in the past (>30d ago),
131+
// and we've reached or surpassed it (startOffset >= endDate),
132+
// permanently disable future requests for this user+route.
133+
val userEnd = user.endDate
134+
if (userEnd != null && endDate == userEnd && Duration.between(userEnd, Instant.now()) > Duration.ofDays(30)) {
135+
val key = routeKey(route, user)
136+
routeNextRequest[key] = Instant.MAX
137+
logger.info(
138+
"Disable future requests for {}: user={}, endDate={} (>30d ago), startOffset={}",
139+
route,
140+
user.versionedId,
141+
userEnd,
142+
startOffset,
143+
)
144+
}
145+
logger.info(
146+
"Skip {} for {}: interval empty (startOffset={} >= endDate={}), persistedOffset={}, userStartDate={}",
147+
route,
148+
user.versionedId,
149+
startOffset,
150+
endDate,
151+
offset?.offset,
152+
startDate,
153+
)
95154
return emptySequence()
96155
}
97156
val timeSinceStart = Duration.between(startOffset, Instant.now())
@@ -139,10 +198,9 @@ constructor(
139198
Instant.ofEpochSecond(offset).plus(OFFSET_BUFFER),
140199
)
141200
val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME)
142-
userNextRequest[request.user.versionedId] =
143-
userNextRequest[request.user.versionedId]?.let {
144-
if (it > nextRequestTime) it else nextRequestTime
145-
} ?: nextRequestTime
201+
val key = routeKey(request.route, request.user)
202+
routeNextRequest[key] =
203+
routeNextRequest[key]?.let { if (it > nextRequestTime) it else nextRequestTime } ?: nextRequestTime
146204
} else {
147205
if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) {
148206
logger.info("No records found, updating offsets to end date..")
@@ -151,10 +209,11 @@ constructor(
151209
request.user,
152210
request.endDate,
153211
)
154-
userNextRequest[request.user.versionedId] =
155-
Instant.now().plus(SUCCESS_BACK_OFF_TIME)
212+
val key = routeKey(request.route, request.user)
213+
routeNextRequest[key] = Instant.now().plus(SUCCESS_BACK_OFF_TIME)
156214
} else {
157-
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
215+
val key = routeKey(request.route, request.user)
216+
routeNextRequest[key] = Instant.now().plus(BACK_OFF_TIME)
158217
}
159218
}
160219
return records
@@ -174,11 +233,8 @@ constructor(
174233
logger.warn(
175234
"User ${request.user} has expired." + "Please renew the subscription...",
176235
)
177-
userNextRequest[request.user.versionedId] =
178-
Instant.now()
179-
.plus(
180-
USER_BACK_OFF_TIME,
181-
)
236+
routeNextRequest[routeKey(request.route, request.user)] =
237+
Instant.now().plus(USER_BACK_OFF_TIME)
182238
OuraAccessForbiddenError(
183239
"Oura subscription has expired or API data not available..",
184240
IOException("Unauthorized"),
@@ -191,11 +247,8 @@ constructor(
191247
" expired, malformed, or revoked. " +
192248
response.body?.string(),
193249
)
194-
userNextRequest[request.user.versionedId] =
195-
Instant.now()
196-
.plus(
197-
USER_BACK_OFF_TIME,
198-
)
250+
routeNextRequest[routeKey(request.route, request.user)] =
251+
Instant.now().plus(USER_BACK_OFF_TIME)
199252
OuraUnauthorizedAccessError(
200253
"Access token expired or revoked..",
201254
IOException("Unauthorized"),
@@ -205,6 +258,7 @@ constructor(
205258
400 -> {
206259
logger.warn("Client exception..")
207260
nextRequestTime = Instant.now() + BACK_OFF_TIME
261+
routeNextRequest[routeKey(request.route, request.user)] = Instant.now().plus(BACK_OFF_TIME)
208262
OuraClientException(
209263
"Client unsupported or unauthorized..",
210264
IOException("Invalid client"),
@@ -213,6 +267,7 @@ constructor(
213267
}
214268
422 -> {
215269
logger.warn("Request Failed: {}, {}", request, response)
270+
routeNextRequest[routeKey(request.route, request.user)] = Instant.now().plus(BACK_OFF_TIME)
216271
OuraValidationError(
217272
response.body!!.string(),
218273
IOException("Validation error"),
@@ -221,6 +276,7 @@ constructor(
221276
}
222277
404 -> {
223278
logger.warn("Not found..")
279+
routeNextRequest[routeKey(request.route, request.user)] = Instant.now().plus(BACK_OFF_TIME)
224280
OuraNotFoundError(
225281
response.body!!.string(),
226282
IOException("Data not found"),
@@ -229,6 +285,7 @@ constructor(
229285
}
230286
else -> {
231287
logger.warn("Request Failed: {}, {}", request, response)
288+
routeNextRequest[routeKey(request.route, request.user)] = Instant.now().plus(BACK_OFF_TIME)
232289
OuraGenericError(response.body!!.string(), IOException("Unknown error"), "500")
233290
}
234291
}
@@ -242,6 +299,13 @@ constructor(
242299
}
243300
}
244301

302+
private fun routeReady(user: User, route: Route): Boolean {
303+
val key = routeKey(route, user)
304+
return routeNextRequest[key]?.let { Instant.now() > it } ?: true
305+
}
306+
307+
private fun routeKey(route: Route, user: User): String = "${user.versionedId}#${route}"
308+
245309
companion object {
246310
private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java)
247311
private val BACK_OFF_TIME = Duration.ofMinutes(10L)

0 commit comments

Comments
 (0)