Skip to content

Commit 7771361

Browse files
Fix NPE
1 parent 59f8435 commit 7771361

File tree

1 file changed

+17
-16
lines changed

1 file changed

+17
-16
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,10 @@
3030
import java.util.Map;
3131
import java.util.NoSuchElementException;
3232
import java.util.Set;
33-
import java.util.concurrent.ConcurrentHashMap;
34-
import java.util.concurrent.ConcurrentMap;
3533
import java.util.concurrent.atomic.AtomicReference;
3634
import java.util.stream.Collectors;
3735
import java.util.stream.Stream;
3836
import javax.ws.rs.NotAuthorizedException;
39-
4037
import okhttp3.HttpUrl;
4138
import okhttp3.MediaType;
4239
import okhttp3.OkHttpClient;
@@ -56,15 +53,15 @@ public class ServiceUserRepository implements UserRepository {
5653
private static final ObjectReader USER_LIST_READER = JSON_READER.forType(Users.class);
5754
private static final ObjectReader USER_READER = JSON_READER.forType(User.class);
5855
private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class);
59-
private static final RequestBody EMPTY_BODY = RequestBody.create(
60-
MediaType.parse("application/json; charset=utf-8"), "");
56+
private static final RequestBody EMPTY_BODY =
57+
RequestBody.create(MediaType.parse("application/json; charset=utf-8"), "");
58+
private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L);
6159
private final OkHttpClient client;
62-
private HttpUrl baseUrl;
6360
private final Map<String, OAuth2UserCredentials> cachedCredentials;
64-
private HashSet<String> containedUsers;
65-
private Set< ? extends User> timedCachedUsers = new HashSet<>();
6661
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
67-
private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L);
62+
private HttpUrl baseUrl;
63+
private HashSet<String> containedUsers;
64+
private Set<? extends User> timedCachedUsers = new HashSet<>();
6865

6966
public ServiceUserRepository() {
7067
this.client = new OkHttpClient();
@@ -97,10 +94,16 @@ public Stream<? extends User> stream() throws IOException {
9794

9895
logger.info("Requesting user information from webservice");
9996
Request request = requestFor("users" + "?source-type=FitBit").build();
100-
this.timedCachedUsers = this.<Users>makeRequest(request, USER_LIST_READER)
101-
.getUsers().stream()
102-
.filter(u -> containedUsers.isEmpty() || containedUsers.contains(u.getId()))
103-
.collect(Collectors.toSet());
97+
this.timedCachedUsers =
98+
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
99+
.filter(u -> containedUsers.isEmpty() || containedUsers.contains(u.getId()))
100+
.filter( // Filter for when a partial update is ongoing
101+
(User u) ->
102+
u.getEndDate() != null
103+
&& u.getStartDate() != null
104+
&& u.getProjectId() != null
105+
&& u.getUserId() != null)
106+
.collect(Collectors.toSet());
104107

105108
return this.timedCachedUsers.stream();
106109
}
@@ -118,9 +121,7 @@ public String getAccessToken(User user) throws IOException, NotAuthorizedExcepti
118121

119122
@Override
120123
public String refreshAccessToken(User user) throws IOException, NotAuthorizedException {
121-
Request request = requestFor("users/" + user.getId() + "/token")
122-
.post(EMPTY_BODY)
123-
.build();
124+
Request request = requestFor("users/" + user.getId() + "/token").post(EMPTY_BODY).build();
124125
OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER);
125126
cachedCredentials.put(user.getId(), credentials);
126127
return credentials.getAccessToken();

0 commit comments

Comments
 (0)