Skip to content

Commit d2edc17

Browse files
Merge pull request #50 from RADAR-base/reset-user-id
Add version property to user so it can be reset
2 parents ea8621b + 7dc0368 commit d2edc17

File tree

7 files changed

+38
-17
lines changed

7 files changed

+38
-17
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private List<Map<String, String>> configureTasks(int maxTasks) {
9898
try {
9999

100100
List<Map<String, String>> userTasks = fitbitConfig.getUserRepository().stream()
101-
.map(User::getId)
101+
.map(User::getVersionedId)
102102
// group users based on their hashCode
103103
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
104104
// since that allows existing tasks to only handle small modifications users to handle.

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public OkHttpClient getClient(User user) {
105105
public Map<String, Map<String, Object>> getPartitions(String route) {
106106
try {
107107
return userRepository.stream()
108-
.collect(Collectors.toMap(User::getId, u -> getPartition(route, u)));
108+
.collect(Collectors.toMap(User::getVersionedId, u -> getPartition(route, u)));
109109
} catch (IOException e) {
110110
logger.warn("Failed to initialize user partitions for route {}: {}", route, e.toString());
111111
return Collections.emptyMap();
@@ -114,7 +114,7 @@ public Map<String, Map<String, Object>> getPartitions(String route) {
114114

115115
public Map<String, Object> getPartition(String route, User user) {
116116
Map<String, Object> partition = new HashMap<>(4);
117-
partition.put("user", user.getId());
117+
partition.put("user", user.getVersionedId());
118118
partition.put("route", route);
119119
return partition;
120120
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void initialize(RestSourceConnectorConfig config) {
143143
@Override
144144
public void requestSucceeded(RestRequest request, SourceRecord record) {
145145
lastPollPerUser.put(((FitbitRestRequest) request).getUser().getId(), lastPoll);
146-
String userKey = ((FitbitRestRequest) request).getUser().getId();
146+
String userKey = ((FitbitRestRequest) request).getUser().getVersionedId();
147147
Instant offset = Instant.ofEpochMilli((Long) record.sourceOffset().get(TIMESTAMP_OFFSET_KEY));
148148
offsets.put(userKey, offset);
149149
}
@@ -154,7 +154,7 @@ public void requestEmpty(RestRequest request) {
154154
FitbitRestRequest fitbitRequest = (FitbitRestRequest) request;
155155
Instant endOffset = fitbitRequest.getDateRange().end().toInstant();
156156
if (DAYS.between(endOffset, lastPoll) >= HISTORICAL_TIME_DAYS) {
157-
String key = fitbitRequest.getUser().getId();
157+
String key = fitbitRequest.getUser().getVersionedId();
158158
offsets.put(key, endOffset);
159159
}
160160
}
@@ -197,7 +197,7 @@ public Stream<FitbitRestRequest> requests() {
197197
return userRepository.stream()
198198
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
199199
.filter(u -> lastPoll.isAfter(u.getValue()))
200-
.sorted(Comparator.comparing(Map.Entry::getValue))
200+
.sorted(Map.Entry.comparingByValue())
201201
.flatMap(u -> this.createRequests(u.getKey()))
202202
.filter(Objects::nonNull);
203203
} catch (IOException e) {
@@ -216,7 +216,7 @@ public Instant getTimeOfNextRequest() {
216216
}
217217

218218
private Map<String, Object> getPartition(User user) {
219-
return partitions.computeIfAbsent(user.getId(),
219+
return partitions.computeIfAbsent(user.getVersionedId(),
220220
k -> generator.getPartition(routeName, user));
221221
}
222222

@@ -279,7 +279,7 @@ public Instant getLastPoll() {
279279
}
280280

281281
protected Instant getOffset(User user) {
282-
return offsets.getOrDefault(user.getId(), user.getStartDate().minus(ONE_NANO));
282+
return offsets.getOrDefault(user.getVersionedId(), user.getStartDate().minus(ONE_NANO));
283283
}
284284

285285
/**

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/LocalUser.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
public class LocalUser implements User {
3535
private static final Pattern ILLEGAL_CHARACTERS_PATTERN = Pattern.compile("[^a-zA-Z0-9_-]");
3636
private String id;
37+
private String version;
3738
private String externalUserId;
3839
private String projectId;
3940
private String userId;
@@ -94,9 +95,19 @@ public void setFitbitUserId(String id) {
9495
this.externalUserId = id;
9596
}
9697

98+
@Override
99+
public String getVersion() {
100+
return version;
101+
}
102+
103+
public void setVersion(String version) {
104+
this.version = version;
105+
}
106+
97107
public LocalUser copy() {
98108
LocalUser copy = new LocalUser();
99109
copy.id = id;
110+
copy.version = version;
100111
copy.externalUserId = externalUserId;
101112
copy.projectId = projectId;
102113
copy.userId = userId;
@@ -116,7 +127,8 @@ public synchronized SchemaAndValue getObservationKey(AvroData avroData) {
116127

117128
@Override
118129
public String toString() {
119-
return "LocalUser{" + "id='" + id + '\''
130+
return "LocalUser{id='" + id + '\''
131+
+ ", version='" + version + '\''
120132
+ ", externalUserId='" + externalUserId + '\''
121133
+ ", projectId='" + projectId + '\''
122134
+ ", userId='" + userId + '\''
@@ -134,19 +146,17 @@ public boolean equals(Object o) {
134146
}
135147
LocalUser localUser = (LocalUser) o;
136148
return Objects.equals(id, localUser.id)
149+
&& Objects.equals(version, localUser.version)
137150
&& Objects.equals(externalUserId, localUser.externalUserId)
138151
&& Objects.equals(projectId, localUser.projectId)
139152
&& Objects.equals(userId, localUser.userId)
140153
&& Objects.equals(sourceId, localUser.sourceId)
141154
&& Objects.equals(startDate, localUser.startDate)
142-
&& Objects.equals(endDate, localUser.endDate)
143-
&& Objects.equals(observationKey, localUser.observationKey);
155+
&& Objects.equals(endDate, localUser.endDate);
144156
}
145157

146158
@Override
147159
public int hashCode() {
148-
149-
return Objects
150-
.hash(id, externalUserId, projectId, userId, sourceId, startDate, endDate, observationKey);
160+
return Objects.hash(id, version);
151161
}
152162
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public Stream<? extends User> stream() throws IOException {
9999
this.timedCachedUsers =
100100
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
101101
.filter(u -> u.isComplete()
102-
&& (containedUsers.isEmpty() || containedUsers.contains(u.getId())))
102+
&& (containedUsers.isEmpty() || containedUsers.contains(u.getVersionedId())))
103103
.collect(Collectors.toSet());
104104

105105
return this.timedCachedUsers.stream();

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) {
3131

3232
String getId();
3333

34+
String getVersion();
35+
3436
String getExternalUserId();
3537

3638
String getProjectId();
@@ -43,6 +45,15 @@ static SchemaAndValue computeObservationKey(AvroData avroData, User user) {
4345

4446
String getSourceId();
4547

48+
default String getVersionedId() {
49+
String version = getVersion();
50+
if (version == null) {
51+
return getId();
52+
} else {
53+
return getId() + "#" + version;
54+
}
55+
}
56+
4657
SchemaAndValue getObservationKey(AvroData avroData);
4758

4859
default Boolean isComplete() {

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public Stream<LocalUser> stream() {
134134
Stream<LockedUser> users = this.users.values().stream()
135135
.filter(lockedTest(u -> u.getOAuth2Credentials().hasRefreshToken()));
136136
if (!configuredUsers.isEmpty()) {
137-
users = users.filter(lockedTest(u -> configuredUsers.contains(u.getId())));
137+
users = users.filter(lockedTest(u -> configuredUsers.contains(u.getVersionedId())));
138138
}
139139
return users.map(lockedApply(LocalUser::copy));
140140
}
@@ -297,7 +297,7 @@ private void store(Path path, LocalUser user) {
297297
* Local user that is protected by a multi-threading lock to avoid simultaneous IO
298298
* and modifications.
299299
*/
300-
private final class LockedUser {
300+
private static final class LockedUser {
301301
final Lock lock = new ReentrantLock();
302302
final LocalUser user;
303303
final Path path;

0 commit comments

Comments
 (0)