Skip to content

Commit 1aebe6a

Browse files
authored
Merge pull request #141 from RADAR-base/fix/offset-manager
Fix Oura offset issues
2 parents ef2f2f8 + c24c89e commit 1aebe6a

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void initialize(List<Map<String, Object>> partitions) {
3636
.filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY))
3737
.collect(Collectors.toMap(
3838
e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"),
39-
e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue())));
39+
e -> Instant.ofEpochSecond(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue())));
4040
} else {
4141
logger.warn("Offset storage reader is null, will resume from an empty state.");
4242
}

oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ constructor(
9090
offsetTime.coerceAtLeast(startDate)
9191
}
9292
val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate
93-
if (Duration.between(startOffset, endDate).toDays() <= ONE_DAY) {
93+
if (Duration.between(startOffset, endDate) <= ONE_DAY) {
9494
logger.info("Interval between dates is too short. Backing off..")
9595
userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME)
9696
return emptySequence()
@@ -145,11 +145,13 @@ constructor(
145145
?: nextRequestTime
146146
} else {
147147
if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) {
148+
logger.info("No records found, updating offsets to end date..")
148149
ouraOffsetManager.updateOffsets(
149150
request.route,
150151
request.user,
151152
request.endDate,
152153
)
154+
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
153155
}
154156
}
155157
return records
@@ -240,7 +242,7 @@ constructor(
240242
companion object {
241243
private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java)
242244
private val BACK_OFF_TIME = Duration.ofMinutes(10L)
243-
private val ONE_DAY = 1L
245+
private val ONE_DAY = Duration.ofDays(1L)
244246
private val TIME_AFTER_REQUEST = Duration.ofDays(30)
245247
private val USER_BACK_OFF_TIME = Duration.ofHours(12L)
246248
private val SUCCESS_BACK_OFF_TIME = Duration.ofMinutes(1L)

0 commit comments

Comments
 (0)