Skip to content

Commit 3b7cd26

Browse files
committed
Close requests as soon as possible
1 parent 9d7a22c commit 3b7cd26

File tree

2 files changed

+60
-49
lines changed

2 files changed

+60
-49
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java

Lines changed: 50 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -221,57 +221,19 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
221221
.build())
222222
.build();
223223

224+
Exception exception = null;
225+
JsonNode node = null;
224226
try (Response response = client.newCall(request).execute()) {
225227
ResponseBody responseBody = response.body();
226228

227229
if (response.isSuccessful() && responseBody != null) {
228-
JsonNode node;
229230
try {
230231
node = JSON_READER.readTree(responseBody.charStream());
231232
} catch (IOException ex) {
232-
if (retry > 0) {
233-
logger.warn("Failed to read OAuth 2.0 response: {}", ex.toString());
234-
return refreshAccessToken(user, retry - 1);
235-
}
236-
throw ex;
237-
}
238-
239-
JsonNode expiresInNode = node.get("expires_in");
240-
Long expiresIn = expiresInNode != null
241-
? expiresInNode.asLong()
242-
: null;
243-
244-
JsonNode accessTokenNode = node.get("access_token");
245-
JsonNode refreshTokenNode = node.get("refresh_token");
246-
if (accessTokenNode == null || refreshTokenNode == null) {
247-
if (retry > 0) {
248-
logger.warn("Failed to get access token in successful OAuth 2.0 request:"
249-
+ " access token or refresh token are missing");
250-
return refreshAccessToken(user, retry - 1);
251-
} else {
252-
throw new NotAuthorizedException("Did not get an access token");
253-
}
233+
exception = ex;
254234
}
255-
256-
actualUser.accept((u, p) -> {
257-
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
258-
// it was updated already by another thread.
259-
return;
260-
}
261-
u.setOauth2Credentials(new OAuth2UserCredentials(
262-
refreshTokenNode.asText(), accessTokenNode.asText(), expiresIn));
263-
store(p, u);
264-
});
265235
} else if (response.code() == 400 || response.code() == 401) {
266-
actualUser.accept((u, p) -> {
267-
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
268-
// it was updated already by another thread.
269-
return;
270-
}
271-
u.setOauth2Credentials(new OAuth2UserCredentials());
272-
store(p, u);
273-
});
274-
throw new NotAuthorizedException("Refresh token is no longer valid.");
236+
exception = new NotAuthorizedException("Refresh token is no longer valid.");
275237
} else {
276238
String message = "Failed to request refresh token, with response HTTP status code "
277239
+ response.code();
@@ -281,6 +243,52 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
281243
throw new IOException(message);
282244
}
283245
}
246+
247+
if (node != null) {
248+
JsonNode expiresInNode = node.get("expires_in");
249+
Long expiresIn = expiresInNode != null
250+
? expiresInNode.asLong()
251+
: null;
252+
253+
JsonNode accessTokenNode = node.get("access_token");
254+
JsonNode refreshTokenNode = node.get("refresh_token");
255+
if (accessTokenNode == null || refreshTokenNode == null) {
256+
if (retry > 0) {
257+
logger.warn("Failed to get access token in successful OAuth 2.0 request:"
258+
+ " access token or refresh token are missing");
259+
return refreshAccessToken(user, retry - 1);
260+
} else {
261+
throw new NotAuthorizedException("Did not get an access token");
262+
}
263+
}
264+
265+
actualUser.accept((u, p) -> {
266+
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
267+
// it was updated already by another thread.
268+
return;
269+
}
270+
u.setOauth2Credentials(new OAuth2UserCredentials(
271+
refreshTokenNode.asText(), accessTokenNode.asText(), expiresIn));
272+
store(p, u);
273+
});
274+
} else if (exception instanceof IOException) {
275+
if (retry > 0) {
276+
logger.warn("Failed to read OAuth 2.0 response: {}", exception.toString());
277+
return refreshAccessToken(user, retry - 1);
278+
}
279+
throw (IOException) exception;
280+
} else if (exception instanceof NotAuthorizedException) {
281+
actualUser.accept((u, p) -> {
282+
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
283+
// it was updated already by another thread.
284+
return;
285+
}
286+
u.setOauth2Credentials(new OAuth2UserCredentials());
287+
store(p, u);
288+
});
289+
throw (NotAuthorizedException) exception;
290+
}
291+
284292
return actualUser.apply(u -> u.getOAuth2Credentials().getAccessToken());
285293
}
286294

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,25 @@ public Stream<SourceRecord> handleRequest() throws IOException {
8080
return Stream.empty();
8181
}
8282

83+
Collection<SourceRecord> records;
84+
8385
try (Response response = client.newCall(request).execute()) {
8486
if (!response.isSuccessful()) {
8587
route.requestFailed(this, response);
8688
return Stream.empty();
8789
}
8890

89-
Collection<SourceRecord> records = route.converter().convert(this, response);
90-
if (records.isEmpty()) {
91-
route.requestEmpty(this);
92-
} else {
93-
records.forEach(r -> route.requestSucceeded(this, r));
94-
}
95-
return records.stream();
91+
records = route.converter().convert(this, response);
9692
} catch (IOException ex) {
9793
route.requestFailed(this, null);
9894
throw ex;
9995
}
96+
97+
if (records.isEmpty()) {
98+
route.requestEmpty(this);
99+
} else {
100+
records.forEach(r -> route.requestSucceeded(this, r));
101+
}
102+
return records.stream();
100103
}
101104
}

0 commit comments

Comments
 (0)