Skip to content

Commit 9bc26ae

Browse files
authored
Merge pull request #114 from RADAR-base/fitbit-fixes
Add back legacy UserRepository
2 parents f123db2 + 6b6055b commit 9bc26ae

File tree

4 files changed

+267
-16
lines changed

4 files changed

+267
-16
lines changed

kafka-connect-fitbit-source/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ dependencies {
66
api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}")
77
api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}")
88
implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}")
9+
implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}")
910

11+
api("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
1012
implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}"))
1113
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
1214
implementation("com.fasterxml.jackson.datatype:jackson-datatype-jsr310")

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.radarbase.connect.rest.fitbit;
1919

20-
import static io.ktor.http.URLUtilsKt.URLBuilder;
2120
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
2221

2322
import java.lang.reflect.InvocationTargetException;
23+
import java.net.MalformedURLException;
24+
import java.net.URL;
2425
import java.nio.charset.StandardCharsets;
2526
import java.nio.file.Path;
2627
import java.nio.file.Paths;
@@ -30,9 +31,9 @@
3031
import java.util.List;
3132
import java.util.Map;
3233

33-
import io.ktor.http.URLParserException;
34-
import io.ktor.http.Url;
3534
import okhttp3.Headers;
35+
import okhttp3.HttpUrl;
36+
3637
import org.apache.kafka.common.config.ConfigDef;
3738
import org.apache.kafka.common.config.ConfigDef.Importance;
3839
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
@@ -530,18 +531,18 @@ public Path getFitbitUserCredentialsPath() {
530531
return Paths.get(getString(FITBIT_USER_CREDENTIALS_DIR_CONFIG));
531532
}
532533

533-
public Url getFitbitUserRepositoryUrl() {
534+
public HttpUrl getFitbitUserRepositoryUrl() {
534535
String urlString = getString(FITBIT_USER_REPOSITORY_URL_CONFIG).trim();
535536
if (urlString.charAt(urlString.length() - 1) != '/') {
536537
urlString += '/';
537538
}
538-
try {
539-
return URLBuilder(urlString).build();
540-
} catch (URLParserException ex) {
539+
HttpUrl url = HttpUrl.parse(urlString);
540+
if (url == null) {
541541
throw new ConfigException(FITBIT_USER_REPOSITORY_URL_CONFIG,
542542
getString(FITBIT_USER_REPOSITORY_URL_CONFIG),
543-
"User repository URL " + urlString + " cannot be parsed as URL: " + ex);
543+
"User repository URL " + urlString + " cannot be parsed as URL.");
544544
}
545+
return url;
545546
}
546547

547548
public Headers getClientCredentials() {
@@ -584,17 +585,15 @@ public String getFitbitUserRepositoryClientSecret() {
584585
return getPassword(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value();
585586
}
586587

587-
public Url getFitbitUserRepositoryTokenUrl() {
588+
public URL getFitbitUserRepositoryTokenUrl() {
588589
String value = getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG);
589590
if (value == null || value.isEmpty()) {
590591
return null;
591592
} else {
592593
try {
593-
return URLBuilder(value).build();
594-
} catch (URLParserException ex) {
595-
throw new ConfigException(FITBIT_USER_REPOSITORY_URL_CONFIG,
596-
getString(FITBIT_USER_REPOSITORY_URL_CONFIG),
597-
"Fitbit user repository token URL " + value + " cannot be parsed as URL: " + ex);
594+
return new URL(getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG));
595+
} catch (MalformedURLException ex) {
596+
throw new ConfigException("Fitbit user repository token URL is invalid.");
598597
}
599598
}
600599
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import io.ktor.client.statement.request
3737
import io.ktor.http.ContentType
3838
import io.ktor.http.HttpMethod
3939
import io.ktor.http.HttpStatusCode
40+
import io.ktor.http.URLBuilder
4041
import io.ktor.http.Url
4142
import io.ktor.http.contentLength
4243
import io.ktor.http.contentType
@@ -85,8 +86,8 @@ class ServiceUserRepository : UserRepository {
8586
val containedUsers = config.fitbitUsers.toHashSet()
8687

8788
client = createClient(
88-
baseUrl = config.fitbitUserRepositoryUrl,
89-
tokenUrl = config.fitbitUserRepositoryTokenUrl,
89+
baseUrl = URLBuilder(config.fitbitUserRepositoryUrl.toString()).build(),
90+
tokenUrl = URLBuilder(config.fitbitUserRepositoryTokenUrl.toString()).build(),
9091
clientId = config.fitbitUserRepositoryClientId,
9192
clientSecret = config.fitbitUserRepositoryClientSecret,
9293
)
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.fitbit.user;
19+
20+
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT;
21+
import static org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator.JSON_READER;
22+
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
import com.fasterxml.jackson.databind.ObjectReader;
25+
import java.io.IOException;
26+
import java.net.ProtocolException;
27+
import java.net.URL;
28+
import java.time.Duration;
29+
import java.time.Instant;
30+
import java.util.HashMap;
31+
import java.util.HashSet;
32+
import java.util.Map;
33+
import java.util.NoSuchElementException;
34+
import java.util.Set;
35+
import java.util.concurrent.atomic.AtomicReference;
36+
import java.util.stream.Collectors;
37+
import java.util.stream.Stream;
38+
import okhttp3.Credentials;
39+
import okhttp3.HttpUrl;
40+
import okhttp3.MediaType;
41+
import okhttp3.OkHttpClient;
42+
import okhttp3.Request;
43+
import okhttp3.RequestBody;
44+
import okhttp3.Response;
45+
import okhttp3.ResponseBody;
46+
import org.apache.kafka.common.config.ConfigException;
47+
import org.radarbase.connect.rest.RestSourceConnectorConfig;
48+
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
49+
import org.radarbase.exception.TokenException;
50+
import org.radarbase.oauth.OAuth2Client;
51+
import org.slf4j.Logger;
52+
import org.slf4j.LoggerFactory;
53+
54+
@SuppressWarnings("unused")
55+
public class ServiceUserRepositoryLegacy implements UserRepository {
56+
private static final Logger logger = LoggerFactory.getLogger(ServiceUserRepositoryLegacy.class);
57+
58+
private static final ObjectReader USER_LIST_READER = JSON_READER.forType(Users.class);
59+
private static final ObjectReader USER_READER = JSON_READER.forType(User.class);
60+
private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class);
61+
private static final RequestBody EMPTY_BODY =
62+
RequestBody.create("", MediaType.parse("application/json; charset=utf-8"));
63+
private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L);
64+
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60);
65+
private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90);
66+
67+
private final OkHttpClient client;
68+
private final Map<String, OAuth2UserCredentials> cachedCredentials;
69+
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
70+
71+
private HttpUrl baseUrl;
72+
private final HashSet<String> containedUsers;
73+
private Set<? extends User> timedCachedUsers = new HashSet<>();
74+
private OAuth2Client repositoryClient;
75+
private String basicCredentials;
76+
77+
public ServiceUserRepositoryLegacy() {
78+
this.client = new OkHttpClient.Builder()
79+
.connectTimeout(CONNECTION_TIMEOUT)
80+
.readTimeout(CONNECTION_READ_TIMEOUT)
81+
.build();
82+
this.cachedCredentials = new HashMap<>();
83+
this.containedUsers = new HashSet<>();
84+
}
85+
86+
@Override
87+
public User get(String key) throws IOException {
88+
Request request = requestFor("users/" + key).build();
89+
return makeRequest(request, USER_READER);
90+
}
91+
92+
@Override
93+
public void initialize(RestSourceConnectorConfig config) {
94+
FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config;
95+
this.baseUrl = fitbitConfig.getFitbitUserRepositoryUrl();
96+
this.containedUsers.addAll(fitbitConfig.getFitbitUsers());
97+
98+
URL tokenUrl = fitbitConfig.getFitbitUserRepositoryTokenUrl();
99+
String clientId = fitbitConfig.getFitbitUserRepositoryClientId();
100+
String clientSecret = fitbitConfig.getFitbitUserRepositoryClientSecret();
101+
102+
if (tokenUrl != null) {
103+
if (clientId.isEmpty()) {
104+
throw new ConfigException("Client ID for user repository is not set.");
105+
}
106+
this.repositoryClient = new OAuth2Client.Builder()
107+
.credentials(clientId, clientSecret)
108+
.endpoint(tokenUrl)
109+
.scopes("SUBJECT.READ MEASUREMENT.CREATE")
110+
.httpClient(client)
111+
.build();
112+
} else if (clientId != null) {
113+
basicCredentials = Credentials.basic(clientId, clientSecret);
114+
}
115+
}
116+
117+
@Override
118+
public Stream<? extends User> stream() {
119+
if (nextFetch.get().equals(MIN_INSTANT)) {
120+
try {
121+
applyPendingUpdates();
122+
} catch (IOException ex) {
123+
logger.error("Failed to initially get users from repository", ex);
124+
}
125+
}
126+
return this.timedCachedUsers.stream()
127+
.filter(User::isComplete);
128+
}
129+
130+
@Override
131+
public String getAccessToken(User user) throws IOException, UserNotAuthorizedException {
132+
if (!user.isAuthorized()) {
133+
throw new UserNotAuthorizedException("User is not authorized");
134+
}
135+
OAuth2UserCredentials credentials = cachedCredentials.get(user.getId());
136+
if (credentials != null && !credentials.isAccessTokenExpired()) {
137+
return credentials.getAccessToken();
138+
} else {
139+
Request request = requestFor("users/" + user.getId() + "/token").build();
140+
return requestAccessToken(user, request);
141+
}
142+
}
143+
144+
@Override
145+
public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException {
146+
if (!user.isAuthorized()) {
147+
throw new UserNotAuthorizedException("User is not authorized");
148+
}
149+
Request request = requestFor("users/" + user.getId() + "/token")
150+
.post(EMPTY_BODY)
151+
.build();
152+
return requestAccessToken(user, request);
153+
}
154+
155+
private String requestAccessToken(User user, Request request)
156+
throws UserNotAuthorizedException, IOException {
157+
try {
158+
OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER);
159+
cachedCredentials.put(user.getId(), credentials);
160+
return credentials.getAccessToken();
161+
} catch (HttpResponseException ex) {
162+
if (ex.getStatusCode() == 407) {
163+
cachedCredentials.remove(user.getId());
164+
if (user instanceof LocalUser) {
165+
((LocalUser) user).setIsAuthorized(false);
166+
}
167+
throw new UserNotAuthorizedException(ex.getMessage());
168+
}
169+
throw ex;
170+
}
171+
}
172+
173+
@Override
174+
public boolean hasPendingUpdates() {
175+
Instant nextFetchTime = nextFetch.get();
176+
Instant now = Instant.now();
177+
return now.isAfter(nextFetchTime);
178+
}
179+
180+
@Override
181+
public void applyPendingUpdates() throws IOException {
182+
logger.info("Requesting user information from webservice");
183+
Request request = requestFor("users?source-type=FitBit").build();
184+
this.timedCachedUsers =
185+
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
186+
.filter(u -> u.isComplete()
187+
&& (containedUsers.isEmpty()
188+
|| containedUsers.contains(u.getVersionedId())))
189+
.collect(Collectors.toSet());
190+
nextFetch.set(Instant.now().plus(FETCH_THRESHOLD));
191+
}
192+
193+
private Request.Builder requestFor(String relativeUrl) throws IOException {
194+
HttpUrl url = baseUrl.resolve(relativeUrl);
195+
if (url == null) {
196+
throw new IllegalArgumentException("Relative URL is invalid");
197+
}
198+
Request.Builder builder = new Request.Builder().url(url);
199+
String authorization = requestAuthorization();
200+
if (authorization != null) {
201+
builder.addHeader("Authorization", authorization);
202+
}
203+
204+
return builder;
205+
}
206+
207+
private String requestAuthorization() throws IOException {
208+
if (repositoryClient != null) {
209+
try {
210+
return "Bearer " + repositoryClient.getValidToken().getAccessToken();
211+
} catch (TokenException ex) {
212+
throw new IOException(ex);
213+
}
214+
} else if (basicCredentials != null) {
215+
return basicCredentials;
216+
} else {
217+
return null;
218+
}
219+
}
220+
221+
private <T> T makeRequest(Request request, ObjectReader reader) throws IOException {
222+
logger.info("Requesting info from {}", request.url());
223+
try (Response response = client.newCall(request).execute()) {
224+
ResponseBody body = response.body();
225+
226+
if (response.code() == 404) {
227+
throw new NoSuchElementException("URL " + request.url() + " does not exist");
228+
} else if (!response.isSuccessful() || body == null) {
229+
String message = "Failed to make request";
230+
if (response.code() > 0) {
231+
message += " (HTTP status code " + response.code() + ')';
232+
}
233+
if (body != null) {
234+
message += body.string();
235+
}
236+
throw new HttpResponseException(message, response.code());
237+
}
238+
String bodyString = body.string();
239+
try {
240+
return reader.readValue(bodyString);
241+
} catch (JsonProcessingException ex) {
242+
logger.error("Failed to parse JSON: {}\n{}", ex, bodyString);
243+
throw ex;
244+
}
245+
} catch (ProtocolException ex) {
246+
throw new IOException("Failed to make request to user repository", ex);
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)