Skip to content

Commit 97340a5

Browse files
Checks for authorization validity when filtering users
- Attempt to fix #69.
1 parent 56058d7 commit 97340a5

File tree

4 files changed

+31
-3
lines changed

4 files changed

+31
-3
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ protected Duration getLookbackTime() {
312312
*/
313313
protected Instant nextPoll(User user) {
314314
Instant offset = getOffset(user);
315-
if (offset.isAfter(user.getEndDate())) {
315+
if (offset.isAfter(user.getEndDate().minus(getEndDateThreshold()))) {
316316
return nearFuture();
317317
} else {
318318
Instant nextPoll = lastPollPerUser.getOrDefault(user.getId(), MIN_INSTANT)
@@ -321,6 +321,10 @@ protected Instant nextPoll(User user) {
321321
}
322322
}
323323

324+
private TemporalAmount getEndDateThreshold() {
325+
return Duration.ofHours(1);
326+
}
327+
324328
/**
325329
* Generate one date per day, using UTC time zone. The first date will have the time from the
326330
* given startDate. Following time stamps will start at 00:00. This will not up to the date of

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public class LocalUser implements User {
4545
@JsonProperty("oauth2")
4646
private OAuth2UserCredentials oauth2Credentials = new OAuth2UserCredentials();
4747

48+
@JsonProperty("authorized")
49+
private Boolean isAuthorized;
50+
4851
@JsonIgnore
4952
private SchemaAndValue observationKey;
5053

@@ -95,6 +98,15 @@ public void setFitbitUserId(String id) {
9598
this.externalUserId = id;
9699
}
97100

101+
@Override
102+
public boolean isAuthorized() {
103+
if(isAuthorized == null) {
104+
return !oauth2Credentials.isAccessTokenExpired()
105+
|| oauth2Credentials.hasRefreshToken();
106+
}
107+
return isAuthorized;
108+
}
109+
98110
@Override
99111
public String getVersion() {
100112
return version;
@@ -115,6 +127,7 @@ public LocalUser copy() {
115127
copy.endDate = endDate;
116128
copy.sourceId = sourceId;
117129
copy.oauth2Credentials = oauth2Credentials;
130+
copy.isAuthorized = isAuthorized;
118131
return copy;
119132
}
120133

@@ -133,6 +146,7 @@ public String toString() {
133146
+ ", projectId='" + projectId + '\''
134147
+ ", userId='" + userId + '\''
135148
+ ", sourceId='" + sourceId + '\''
149+
+ ", isAuthorized='" + isAuthorized() + '\''
136150
+ '}';
137151
}
138152

@@ -152,7 +166,8 @@ public boolean equals(Object o) {
152166
&& Objects.equals(userId, localUser.userId)
153167
&& Objects.equals(sourceId, localUser.sourceId)
154168
&& Objects.equals(startDate, localUser.startDate)
155-
&& Objects.equals(endDate, localUser.endDate);
169+
&& Objects.equals(endDate, localUser.endDate)
170+
&& Objects.equals(isAuthorized(), localUser.isAuthorized());
156171
}
157172

158173
@Override

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) {
4545

4646
String getSourceId();
4747

48+
boolean isAuthorized();
49+
4850
default String getVersionedId() {
4951
String version = getVersion();
5052
if (version == null) {
@@ -60,6 +62,7 @@ default Boolean isComplete() {
6062
return getEndDate() != null
6163
&& getStartDate() != null
6264
&& getProjectId() != null
63-
&& getUserId() != null;
65+
&& getUserId() != null
66+
&& isAuthorized();
6467
}
6568
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/FirebaseUser.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public String getSourceId() {
7070
return fitbitAuthDetails.getSourceId();
7171
}
7272

73+
@Override
74+
public boolean isAuthorized() {
75+
return !fitbitAuthDetails.getOauth2Credentials().isAccessTokenExpired()
76+
|| fitbitAuthDetails.getOauth2Credentials().hasRefreshToken();
77+
}
78+
7379
public FirebaseUserDetails getFirebaseUserDetails() {
7480
return firebaseUserDetails;
7581
}

0 commit comments

Comments
 (0)