Skip to content

Commit db44b53

Browse files
authored
Merge pull request #147 from RADAR-base/feat/offset-times
Fix Oura offset times
2 parents dbcd7dc + 7f0dcb1 commit db44b53

File tree

2 files changed

+11
-14
lines changed

2 files changed

+11
-14
lines changed

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class OuraSourceTask extends SourceTask {
6363
private AvroData avroData = new AvroData(20);
6464
private KafkaOffsetManager offsetManager;
6565
String TIMESTAMP_OFFSET_KEY = "timestamp";
66+
long TIMEOUT = 60000L;
6667

6768
public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) {
6869
OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config;
@@ -144,6 +145,8 @@ public List<SourceRecord> poll() throws InterruptedException {
144145
List<SourceRecord> sourceRecords = Collections.emptyList();
145146

146147
do {
148+
Thread.sleep(TIMEOUT);
149+
147150
Map<String, String> configs = context.configs();
148151
Iterator<? extends RestRequest> requestIterator = this.requests()
149152
.iterator();

oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,7 @@ constructor(
9191
}
9292
val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now()
9393
if (Duration.between(startOffset, endDate) <= ONE_DAY) {
94-
logger.info("Interval between dates is too short. Backing off..")
95-
userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME)
94+
logger.info("Interval between dates is too short. Not requesting..")
9695
return emptySequence()
9796
}
9897
val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate)
@@ -130,19 +129,12 @@ constructor(
130129
ouraOffsetManager.updateOffsets(
131130
request.route,
132131
request.user,
133-
Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)),
132+
Instant.ofEpochSecond(offset).plus(ONE_DAY),
134133
)
135-
val currentNextRequestTime = userNextRequest[request.user.versionedId]
136134
val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME)
137-
userNextRequest[request.user.versionedId] =
138-
currentNextRequestTime?.let {
139-
if (currentNextRequestTime > nextRequestTime) {
140-
currentNextRequestTime
141-
} else {
142-
nextRequestTime
143-
}
144-
}
145-
?: nextRequestTime
135+
userNextRequest[request.user.versionedId] = userNextRequest[request.user.versionedId]?.let {
136+
if (it > nextRequestTime) it else nextRequestTime
137+
} ?: nextRequestTime
146138
} else {
147139
if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) {
148140
logger.info("No records found, updating offsets to end date..")
@@ -151,7 +143,9 @@ constructor(
151143
request.user,
152144
request.endDate,
153145
)
154-
userNextRequest[request.user.versionedId] = Instant.now().plus(ONE_DAY)
146+
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
147+
} else {
148+
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
155149
}
156150
}
157151
return records

0 commit comments

Comments
 (0)