Skip to content

Commit 5d58d56

Browse files
authored
Merge pull request #40 from RADAR-base/fix-npe
Fix NPE
2 parents 59f8435 + c1b6b66 commit 5d58d56

File tree

2 files changed

+32
-20
lines changed

2 files changed

+32
-20
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,10 @@
3030
import java.util.Map;
3131
import java.util.NoSuchElementException;
3232
import java.util.Set;
33-
import java.util.concurrent.ConcurrentHashMap;
34-
import java.util.concurrent.ConcurrentMap;
3533
import java.util.concurrent.atomic.AtomicReference;
3634
import java.util.stream.Collectors;
3735
import java.util.stream.Stream;
3836
import javax.ws.rs.NotAuthorizedException;
39-
4037
import okhttp3.HttpUrl;
4138
import okhttp3.MediaType;
4239
import okhttp3.OkHttpClient;
@@ -56,15 +53,17 @@ public class ServiceUserRepository implements UserRepository {
5653
private static final ObjectReader USER_LIST_READER = JSON_READER.forType(Users.class);
5754
private static final ObjectReader USER_READER = JSON_READER.forType(User.class);
5855
private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class);
59-
private static final RequestBody EMPTY_BODY = RequestBody.create(
60-
MediaType.parse("application/json; charset=utf-8"), "");
56+
private static final RequestBody EMPTY_BODY =
57+
RequestBody.create(MediaType.parse("application/json; charset=utf-8"), "");
58+
private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L);
59+
6160
private final OkHttpClient client;
62-
private HttpUrl baseUrl;
6361
private final Map<String, OAuth2UserCredentials> cachedCredentials;
64-
private HashSet<String> containedUsers;
65-
private Set< ? extends User> timedCachedUsers = new HashSet<>();
6662
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
67-
private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L);
63+
64+
private HttpUrl baseUrl;
65+
private HashSet<String> containedUsers;
66+
private Set<? extends User> timedCachedUsers = new HashSet<>();
6867

6968
public ServiceUserRepository() {
7069
this.client = new OkHttpClient();
@@ -97,10 +96,11 @@ public Stream<? extends User> stream() throws IOException {
9796

9897
logger.info("Requesting user information from webservice");
9998
Request request = requestFor("users" + "?source-type=FitBit").build();
100-
this.timedCachedUsers = this.<Users>makeRequest(request, USER_LIST_READER)
101-
.getUsers().stream()
102-
.filter(u -> containedUsers.isEmpty() || containedUsers.contains(u.getId()))
103-
.collect(Collectors.toSet());
99+
this.timedCachedUsers =
100+
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
101+
.filter(u -> u.isComplete()
102+
&& (containedUsers.isEmpty() || containedUsers.contains(u.getId())))
103+
.collect(Collectors.toSet());
104104

105105
return this.timedCachedUsers.stream();
106106
}
@@ -118,9 +118,7 @@ public String getAccessToken(User user) throws IOException, NotAuthorizedExcepti
118118

119119
@Override
120120
public String refreshAccessToken(User user) throws IOException, NotAuthorizedException {
121-
Request request = requestFor("users/" + user.getId() + "/token")
122-
.post(EMPTY_BODY)
123-
.build();
121+
Request request = requestFor("users/" + user.getId() + "/token").post(EMPTY_BODY).build();
124122
OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER);
125123
cachedCredentials.put(user.getId(), credentials);
126124
return credentials.getAccessToken();

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/User.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,32 @@
2323
import org.radarcns.kafka.ObservationKey;
2424

2525
public interface User {
26+
static SchemaAndValue computeObservationKey(AvroData avroData, User user) {
27+
return avroData.toConnectData(
28+
ObservationKey.getClassSchema(),
29+
new ObservationKey(user.getProjectId(), user.getUserId(), user.getSourceId()));
30+
}
31+
2632
String getId();
33+
2734
String getExternalUserId();
35+
2836
String getProjectId();
37+
2938
String getUserId();
39+
3040
Instant getStartDate();
41+
3142
Instant getEndDate();
43+
3244
String getSourceId();
45+
3346
SchemaAndValue getObservationKey(AvroData avroData);
3447

35-
static SchemaAndValue computeObservationKey(AvroData avroData, User user) {
36-
return avroData.toConnectData(
37-
ObservationKey.getClassSchema(),
38-
new ObservationKey(user.getProjectId(), user.getUserId(), user.getSourceId()));
48+
default Boolean isComplete() {
49+
return getEndDate() != null
50+
&& getStartDate() != null
51+
&& getProjectId() != null
52+
&& getUserId() != null;
3953
}
4054
}

0 commit comments

Comments
 (0)