Skip to content

Commit f4ff9e2

Browse files
committed
More Java-like approach
1 parent 3b7cd26 commit f4ff9e2

File tree

1 file changed

+57
-53
lines changed

1 file changed

+57
-53
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java

Lines changed: 57 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,61 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
209209
throw new NoSuchElementException("User " + user + " is not present in this user repository.");
210210
}
211211
String refreshToken = actualUser.apply(u -> u.getOAuth2Credentials().getRefreshToken());
212+
213+
JsonNode node;
214+
215+
try {
216+
node = requestAccessToken(refreshToken);
217+
} catch (IOException ex) {
218+
if (retry > 0) {
219+
logger.warn("Failed to read OAuth 2.0 response: {}", ex.toString());
220+
return refreshAccessToken(user, retry - 1);
221+
} else {
222+
throw ex;
223+
}
224+
} catch (NotAuthorizedException ex) {
225+
actualUser.accept((u, p) -> {
226+
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
227+
// it was updated already by another thread.
228+
return;
229+
}
230+
u.setOauth2Credentials(new OAuth2UserCredentials());
231+
store(p, u);
232+
});
233+
throw ex;
234+
}
235+
236+
JsonNode expiresInNode = node.get("expires_in");
237+
Long expiresIn = expiresInNode != null
238+
? expiresInNode.asLong()
239+
: null;
240+
241+
JsonNode accessTokenNode = node.get("access_token");
242+
JsonNode refreshTokenNode = node.get("refresh_token");
243+
if (accessTokenNode == null || refreshTokenNode == null) {
244+
if (retry > 0) {
245+
logger.warn("Failed to get access token in successful OAuth 2.0 request:"
246+
+ " access token or refresh token are missing");
247+
return refreshAccessToken(user, retry - 1);
248+
} else {
249+
throw new NotAuthorizedException("Did not get an access token");
250+
}
251+
}
252+
253+
actualUser.accept((u, p) -> {
254+
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
255+
// it was updated already by another thread.
256+
return;
257+
}
258+
u.setOauth2Credentials(new OAuth2UserCredentials(
259+
refreshTokenNode.asText(), accessTokenNode.asText(), expiresIn));
260+
store(p, u);
261+
});
262+
263+
return actualUser.apply(u -> u.getOAuth2Credentials().getAccessToken());
264+
}
265+
266+
private JsonNode requestAccessToken(String refreshToken) throws IOException {
212267
if (refreshToken == null || refreshToken.isEmpty()) {
213268
throw new NotAuthorizedException("Refresh token is not set");
214269
}
@@ -227,13 +282,9 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
227282
ResponseBody responseBody = response.body();
228283

229284
if (response.isSuccessful() && responseBody != null) {
230-
try {
231-
node = JSON_READER.readTree(responseBody.charStream());
232-
} catch (IOException ex) {
233-
exception = ex;
234-
}
285+
return JSON_READER.readTree(responseBody.charStream());
235286
} else if (response.code() == 400 || response.code() == 401) {
236-
exception = new NotAuthorizedException("Refresh token is no longer valid.");
287+
throw new NotAuthorizedException("Refresh token is no longer valid.");
237288
} else {
238289
String message = "Failed to request refresh token, with response HTTP status code "
239290
+ response.code();
@@ -243,53 +294,6 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
243294
throw new IOException(message);
244295
}
245296
}
246-
247-
if (node != null) {
248-
JsonNode expiresInNode = node.get("expires_in");
249-
Long expiresIn = expiresInNode != null
250-
? expiresInNode.asLong()
251-
: null;
252-
253-
JsonNode accessTokenNode = node.get("access_token");
254-
JsonNode refreshTokenNode = node.get("refresh_token");
255-
if (accessTokenNode == null || refreshTokenNode == null) {
256-
if (retry > 0) {
257-
logger.warn("Failed to get access token in successful OAuth 2.0 request:"
258-
+ " access token or refresh token are missing");
259-
return refreshAccessToken(user, retry - 1);
260-
} else {
261-
throw new NotAuthorizedException("Did not get an access token");
262-
}
263-
}
264-
265-
actualUser.accept((u, p) -> {
266-
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
267-
// it was updated already by another thread.
268-
return;
269-
}
270-
u.setOauth2Credentials(new OAuth2UserCredentials(
271-
refreshTokenNode.asText(), accessTokenNode.asText(), expiresIn));
272-
store(p, u);
273-
});
274-
} else if (exception instanceof IOException) {
275-
if (retry > 0) {
276-
logger.warn("Failed to read OAuth 2.0 response: {}", exception.toString());
277-
return refreshAccessToken(user, retry - 1);
278-
}
279-
throw (IOException) exception;
280-
} else if (exception instanceof NotAuthorizedException) {
281-
actualUser.accept((u, p) -> {
282-
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
283-
// it was updated already by another thread.
284-
return;
285-
}
286-
u.setOauth2Credentials(new OAuth2UserCredentials());
287-
store(p, u);
288-
});
289-
throw (NotAuthorizedException) exception;
290-
}
291-
292-
return actualUser.apply(u -> u.getOAuth2Credentials().getAccessToken());
293297
}
294298

295299
/**

0 commit comments

Comments
 (0)