Skip to content

Commit 44908d3

Browse files
committed
Merge remote-tracking branch 'origin/master' into release-0.5.4
# Conflicts: # buildSrc/src/main/kotlin/Versions.kt
2 parents a88b53c + 6e29fd5 commit 44908d3

File tree

5 files changed

+17
-18
lines changed

5 files changed

+17
-18
lines changed

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ object Versions {
77
const val wrapper = "8.4"
88

99
const val radarCommons = "1.1.2"
10-
const val confluent = "7.6.0"
10+
const val confluent = "7.7.0"
1111
const val kafka = "$confluent-ce"
12-
const val avro = "1.11.4"
12+
const val avro = "1.12.0"
1313

1414
const val managementPortal = "2.1.5"
1515

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class OuraSourceTask extends SourceTask {
6363
private AvroData avroData = new AvroData(20);
6464
private KafkaOffsetManager offsetManager;
6565
String TIMESTAMP_OFFSET_KEY = "timestamp";
66+
long TIMEOUT = 60000L;
6667

6768
public void initialize(OuraRestSourceConnectorConfig config, OffsetStorageReader offsetStorageReader) {
6869
OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config;
@@ -144,6 +145,8 @@ public List<SourceRecord> poll() throws InterruptedException {
144145
List<SourceRecord> sourceRecords = Collections.emptyList();
145146

146147
do {
148+
Thread.sleep(TIMEOUT);
149+
147150
Map<String, String> configs = context.configs();
148151
Iterator<? extends RestRequest> requestIterator = this.requests()
149152
.iterator();

oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,9 @@ constructor(
8989
logger.info("Offsets found in persistence: " + offsetTime.toString())
9090
offsetTime.coerceAtLeast(startDate)
9191
}
92-
val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate
92+
val endDate = user.endDate?.coerceAtMost(Instant.now()) ?: Instant.now()
9393
if (Duration.between(startOffset, endDate) <= ONE_DAY) {
94-
logger.info("Interval between dates is too short. Backing off..")
95-
userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME)
94+
logger.info("Interval between dates is too short. Not requesting..")
9695
return emptySequence()
9796
}
9897
val endTime = (startOffset + defaultQueryRange).coerceAtMost(endDate)
@@ -130,19 +129,13 @@ constructor(
130129
ouraOffsetManager.updateOffsets(
131130
request.route,
132131
request.user,
133-
Instant.ofEpochSecond(offset).plus(Duration.ofMillis(500)),
132+
Instant.ofEpochSecond(offset).plus(ONE_DAY),
134133
)
135-
val currentNextRequestTime = userNextRequest[request.user.versionedId]
136134
val nextRequestTime = Instant.now().plus(SUCCESS_BACK_OFF_TIME)
137135
userNextRequest[request.user.versionedId] =
138-
currentNextRequestTime?.let {
139-
if (currentNextRequestTime > nextRequestTime) {
140-
currentNextRequestTime
141-
} else {
142-
nextRequestTime
143-
}
144-
}
145-
?: nextRequestTime
136+
userNextRequest[request.user.versionedId]?.let {
137+
if (it > nextRequestTime) it else nextRequestTime
138+
} ?: nextRequestTime
146139
} else {
147140
if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) {
148141
logger.info("No records found, updating offsets to end date..")
@@ -152,6 +145,8 @@ constructor(
152145
request.endDate,
153146
)
154147
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
148+
} else {
149+
userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME)
155150
}
156151
}
157152
return records

oura-library/src/main/kotlin/org/radarbase/oura/user/OuraUser.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ data class OuraUser(
1616
@JsonProperty("externalId") override val externalId: String?,
1717
@JsonProperty("isAuthorized") override val isAuthorized: Boolean,
1818
@JsonProperty("startDate") override val startDate: Instant,
19-
@JsonProperty("endDate") override val endDate: Instant,
19+
@JsonProperty("endDate") override val endDate: Instant? = null,
2020
@JsonProperty("version") override val version: String? = null,
2121
@JsonProperty("serviceUserId") override val serviceUserId: String? = null,
2222
) : User {
2323
override val observationKey: ObservationKey = ObservationKey(projectId, userId, sourceId)
2424
override val versionedId: String = "$id${version?.let { "#$it" } ?: ""}"
2525

26-
fun isComplete() = isAuthorized && startDate.isBefore(endDate) && serviceUserId != null
26+
fun isComplete() =
27+
isAuthorized && (endDate == null || startDate.isBefore(endDate)) && serviceUserId != null
2728
}

oura-library/src/main/kotlin/org/radarbase/oura/user/User.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ interface User {
1010
val sourceId: String
1111
val externalId: String?
1212
val startDate: Instant
13-
val endDate: Instant
13+
val endDate: Instant?
1414
val createdAt: Instant
1515
val humanReadableUserId: String?
1616
val serviceUserId: String?

0 commit comments

Comments
 (0)