Skip to content

Commit fb42fc4

Browse files
authored
Merge pull request #62 from RADAR-base/close_requests
Close requests
2 parents 28d0325 + a20069e commit fb42fc4

File tree

3 files changed

+67
-54
lines changed

3 files changed

+67
-54
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ subprojects {
1111
apply plugin: 'java-library'
1212

1313
group = 'org.radarbase'
14-
version = '0.3.1'
14+
version = '0.3.2-SNAPSHOT'
1515

1616
sourceCompatibility = 1.8
1717
targetCompatibility = 1.8

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/YamlUserRepository.java

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,61 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
209209
throw new NoSuchElementException("User " + user + " is not present in this user repository.");
210210
}
211211
String refreshToken = actualUser.apply(u -> u.getOAuth2Credentials().getRefreshToken());
212+
213+
JsonNode node;
214+
215+
try {
216+
node = requestAccessToken(refreshToken);
217+
} catch (IOException ex) {
218+
if (retry > 0) {
219+
logger.warn("Failed to read OAuth 2.0 response: {}", ex.toString());
220+
return refreshAccessToken(user, retry - 1);
221+
} else {
222+
throw ex;
223+
}
224+
} catch (NotAuthorizedException ex) {
225+
actualUser.accept((u, p) -> {
226+
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
227+
// it was updated already by another thread.
228+
return;
229+
}
230+
u.setOauth2Credentials(new OAuth2UserCredentials());
231+
store(p, u);
232+
});
233+
throw ex;
234+
}
235+
236+
JsonNode expiresInNode = node.get("expires_in");
237+
Long expiresIn = expiresInNode != null
238+
? expiresInNode.asLong()
239+
: null;
240+
241+
JsonNode accessTokenNode = node.get("access_token");
242+
JsonNode refreshTokenNode = node.get("refresh_token");
243+
if (accessTokenNode == null || refreshTokenNode == null) {
244+
if (retry > 0) {
245+
logger.warn("Failed to get access token in successful OAuth 2.0 request:"
246+
+ " access token or refresh token are missing");
247+
return refreshAccessToken(user, retry - 1);
248+
} else {
249+
throw new NotAuthorizedException("Did not get an access token");
250+
}
251+
}
252+
253+
actualUser.accept((u, p) -> {
254+
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
255+
// it was updated already by another thread.
256+
return;
257+
}
258+
u.setOauth2Credentials(new OAuth2UserCredentials(
259+
refreshTokenNode.asText(), accessTokenNode.asText(), expiresIn));
260+
store(p, u);
261+
});
262+
263+
return actualUser.apply(u -> u.getOAuth2Credentials().getAccessToken());
264+
}
265+
266+
private JsonNode requestAccessToken(String refreshToken) throws IOException {
212267
if (refreshToken == null || refreshToken.isEmpty()) {
213268
throw new NotAuthorizedException("Refresh token is not set");
214269
}
@@ -225,52 +280,8 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
225280
ResponseBody responseBody = response.body();
226281

227282
if (response.isSuccessful() && responseBody != null) {
228-
JsonNode node;
229-
try {
230-
node = JSON_READER.readTree(responseBody.charStream());
231-
} catch (IOException ex) {
232-
if (retry > 0) {
233-
logger.warn("Failed to read OAuth 2.0 response: {}", ex.toString());
234-
return refreshAccessToken(user, retry - 1);
235-
}
236-
throw ex;
237-
}
238-
239-
JsonNode expiresInNode = node.get("expires_in");
240-
Long expiresIn = expiresInNode != null
241-
? expiresInNode.asLong()
242-
: null;
243-
244-
JsonNode accessTokenNode = node.get("access_token");
245-
JsonNode refreshTokenNode = node.get("refresh_token");
246-
if (accessTokenNode == null || refreshTokenNode == null) {
247-
if (retry > 0) {
248-
logger.warn("Failed to get access token in successful OAuth 2.0 request:"
249-
+ " access token or refresh token are missing");
250-
return refreshAccessToken(user, retry - 1);
251-
} else {
252-
throw new NotAuthorizedException("Did not get an access token");
253-
}
254-
}
255-
256-
actualUser.accept((u, p) -> {
257-
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
258-
// it was updated already by another thread.
259-
return;
260-
}
261-
u.setOauth2Credentials(new OAuth2UserCredentials(
262-
refreshTokenNode.asText(), accessTokenNode.asText(), expiresIn));
263-
store(p, u);
264-
});
283+
return JSON_READER.readTree(responseBody.charStream());
265284
} else if (response.code() == 400 || response.code() == 401) {
266-
actualUser.accept((u, p) -> {
267-
if (!refreshToken.equals(u.getOAuth2Credentials().getRefreshToken())) {
268-
// it was updated already by another thread.
269-
return;
270-
}
271-
u.setOauth2Credentials(new OAuth2UserCredentials());
272-
store(p, u);
273-
});
274285
throw new NotAuthorizedException("Refresh token is no longer valid.");
275286
} else {
276287
String message = "Failed to request refresh token, with response HTTP status code "
@@ -281,7 +292,6 @@ public synchronized String refreshAccessToken(User user, int retry) throws IOExc
281292
throw new IOException(message);
282293
}
283294
}
284-
return actualUser.apply(u -> u.getOAuth2Credentials().getAccessToken());
285295
}
286296

287297
/**

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/request/RestRequest.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,25 @@ public Stream<SourceRecord> handleRequest() throws IOException {
8080
return Stream.empty();
8181
}
8282

83+
Collection<SourceRecord> records;
84+
8385
try (Response response = client.newCall(request).execute()) {
8486
if (!response.isSuccessful()) {
8587
route.requestFailed(this, response);
8688
return Stream.empty();
8789
}
8890

89-
Collection<SourceRecord> records = route.converter().convert(this, response);
90-
if (records.isEmpty()) {
91-
route.requestEmpty(this);
92-
} else {
93-
records.forEach(r -> route.requestSucceeded(this, r));
94-
}
95-
return records.stream();
91+
records = route.converter().convert(this, response);
9692
} catch (IOException ex) {
9793
route.requestFailed(this, null);
9894
throw ex;
9995
}
96+
97+
if (records.isEmpty()) {
98+
route.requestEmpty(this);
99+
} else {
100+
records.forEach(r -> route.requestSucceeded(this, r));
101+
}
102+
return records.stream();
100103
}
101104
}

0 commit comments

Comments
 (0)