Skip to content

Commit c797acc

Browse files
committed
Fix initial user poll
1 parent e8e7083 commit c797acc

File tree

4 files changed

+11
-3
lines changed

4 files changed

+11
-3
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT;
2626
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.TIMESTAMP_OFFSET_KEY;
2727
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.nearFuture;
28+
import static org.radarbase.connect.rest.request.PollingRequestRoute.max;
2829

2930
import java.io.IOException;
3031
import java.time.Duration;
@@ -316,7 +317,7 @@ protected Instant nextPoll(User user) {
316317
} else {
317318
Instant nextPoll = lastPollPerUser.getOrDefault(user.getId(), MIN_INSTANT)
318319
.plus(getPollIntervalPerUser());
319-
return PollingRequestRoute.max(offset.plus(getLookbackTime()), nextPoll);
320+
return max(offset.plus(getLookbackTime()), nextPoll);
320321
}
321322
}
322323

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,13 @@ public void initialize(RestSourceConnectorConfig config) {
113113

114114
@Override
115115
public Stream<? extends User> stream() {
116+
if (nextFetch.get().equals(Instant.MIN)) {
117+
try {
118+
applyPendingUpdates();
119+
} catch (IOException ex) {
120+
logger.error("Failed to initially get users from repository");
121+
}
122+
}
116123
return this.timedCachedUsers.stream();
117124
}
118125

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class YamlUserRepository implements UserRepository {
8686

8787
private Set<String> configuredUsers;
8888
private Headers headers;
89-
private ConcurrentMap<String, LockedUser> users = new ConcurrentHashMap<>();
89+
private final ConcurrentMap<String, LockedUser> users = new ConcurrentHashMap<>();
9090
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
9191
private Path credentialsDir;
9292

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/firebase/CovidCollabFirebaseUserRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class CovidCollabFirebaseUserRepository extends FirebaseUserRepository {
3737
private static final Logger logger =
3838
LoggerFactory.getLogger(CovidCollabFirebaseUserRepository.class);
3939

40-
private Map<String, FirebaseUser> cachedUsers = new HashMap<>();
40+
private final Map<String, FirebaseUser> cachedUsers = new HashMap<>();
4141
private CollectionReference userCollection;
4242
private CollectionReference fitbitCollection;
4343
private FitbitTokenService fitbitTokenService;

0 commit comments

Comments
 (0)