Skip to content

Commit 57e0297

Browse files
committed
Add back previous version of OuraServiceRepository
1 parent 5750bc0 commit 57e0297

File tree

7 files changed

+398
-119
lines changed

7 files changed

+398
-119
lines changed

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
11
@Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate")
22
object Versions {
3+
4+
object Plugins {
5+
const val kotlin = "1.9.10"
6+
const val kotlinSerialization = kotlin
7+
const val kotlinAllOpen = kotlin
8+
const val avro = "1.8.0"
9+
const val gradle = "8.3"
10+
}
11+
12+
313
const val project = "0.4.2-SNAPSHOT"
414

515
const val java = 11
@@ -11,6 +21,8 @@ object Versions {
1121
const val kafka = "$confluent-ce"
1222
const val avro = "1.11.0"
1323

24+
const val managementPortal = "2.0.0"
25+
1426
// From image
1527
const val jackson = "2.14.2"
1628

kafka-connect-oura-source/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ dependencies {
44
api(project(":oura-library"))
55
api("io.confluent:kafka-connect-avro-converter:${Versions.confluent}")
66
api("org.radarbase:radar-schemas-commons:${Versions.radarSchemas}")
7-
implementation("org.radarbase:radar-commons-kotlin:${Versions.radarCommons}")
7+
implementation("org.radarbase:radar-commons-kotlin:1.1.1")
8+
implementation("org.radarbase:oauth-client-util:${Versions.managementPortal}")
89

910
api("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
1011
implementation(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}"))

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
2121

22-
import static io.ktor.http.URLUtilsKt.URLBuilder;
2322
import java.lang.reflect.InvocationTargetException;
2423
import java.net.MalformedURLException;
2524
import java.net.URL;
@@ -45,8 +44,6 @@
4544
import org.apache.kafka.common.config.ConfigException;
4645
import org.apache.kafka.connect.errors.ConnectException;
4746
import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository;
48-
import io.ktor.http.URLParserException;
49-
import io.ktor.http.Url;
5047

5148
public class OuraRestSourceConnectorConfig extends AbstractConfig {
5249
public static final Pattern COLON_PATTERN = Pattern.compile(":");
@@ -283,18 +280,18 @@ public OuraServiceUserRepository createUserRepository() {
283280
}
284281
}
285282

286-
public Url getOuraUserRepositoryUrl() {
283+
public HttpUrl getOuraUserRepositoryUrl() {
287284
String urlString = getString(OURA_USER_REPOSITORY_URL_CONFIG).trim();
288285
if (urlString.charAt(urlString.length() - 1) != '/') {
289286
urlString += '/';
290287
}
291-
try {
292-
return URLBuilder(urlString).build();
293-
} catch (URLParserException ex) {
288+
HttpUrl url = HttpUrl.parse(urlString);
289+
if (url == null) {
294290
throw new ConfigException(OURA_USER_REPOSITORY_URL_CONFIG,
295291
getString(OURA_USER_REPOSITORY_URL_CONFIG),
296-
"User repository URL " + urlString + " cannot be parsed as URL: " + ex);
292+
"User repository URL " + urlString + " cannot be parsed as URL.");
297293
}
294+
return url;
298295
}
299296

300297
public Headers getClientCredentials() {
@@ -317,18 +314,16 @@ public String getOuraUserRepositoryClientSecret() {
317314
return getPassword(OURA_USER_REPOSITORY_CLIENT_SECRET_CONFIG).value();
318315
}
319316

320-
public Url getOuraUserRepositoryTokenUrl() {
317+
public URL getOuraUserRepositoryTokenUrl() {
321318
String value = getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG);
322319
if (value == null || value.isEmpty()) {
323320
return null;
324321
} else {
325322
try {
326-
return URLBuilder(value).build();
327-
} catch (URLParserException ex) {
328-
throw new ConfigException(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG,
329-
getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG),
330-
"Oura user repository token URL " + value + " cannot be parsed as URL: " + ex);
323+
return new URL(getString(OURA_USER_REPOSITORY_TOKEN_URL_CONFIG));
324+
} catch (MalformedURLException e) {
325+
throw new ConfigException("Oura user repository token URL is invalid.");
331326
}
332327
}
333328
}
334-
}
329+
}

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,15 @@ public KafkaOffsetManager(
3131
}
3232

3333
public void initialize(List<Map<String, Object>> partitions) {
34-
this.offsets = this.offsetStorageReader.offsets(partitions).entrySet().stream()
34+
if (this.offsetStorageReader != null) {
35+
this.offsets = this.offsetStorageReader.offsets(partitions).entrySet().stream()
3536
.filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY))
3637
.collect(Collectors.toMap(
3738
e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"),
3839
e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue())));
40+
} else {
41+
logger.warn("Offset storage reader is null, will resume from an empty state.");
42+
}
3943
}
4044

4145
@Override
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package org.radarbase.connect.rest.oura.user;
19+
20+
import com.fasterxml.jackson.core.JsonFactory;
21+
import com.fasterxml.jackson.core.JsonProcessingException;
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.fasterxml.jackson.databind.ObjectReader;
24+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
25+
26+
import kotlin.sequences.*;
27+
28+
import java.io.IOException;
29+
import java.net.ProtocolException;
30+
import java.net.URL;
31+
import java.time.Duration;
32+
import java.time.Instant;
33+
import java.util.HashMap;
34+
import java.util.HashSet;
35+
import java.util.Map;
36+
import java.util.NoSuchElementException;
37+
import java.util.Set;
38+
import java.util.List;
39+
import java.util.concurrent.atomic.AtomicReference;
40+
import java.util.stream.Collectors;
41+
import java.util.stream.Stream;
42+
import okhttp3.Credentials;
43+
import okhttp3.HttpUrl;
44+
import okhttp3.MediaType;
45+
import okhttp3.OkHttpClient;
46+
import okhttp3.Request;
47+
import okhttp3.RequestBody;
48+
import okhttp3.Response;
49+
import okhttp3.ResponseBody;
50+
import org.apache.kafka.common.config.ConfigException;
51+
import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig;
52+
import org.radarbase.exception.TokenException;
53+
import org.radarbase.oauth.OAuth2Client;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
56+
import org.radarbase.oura.user.UserRepository;
57+
import org.radarbase.oura.user.User;
58+
import org.radarbase.oura.user.OuraUser;
59+
import org.radarbase.oura.user.UserNotAuthorizedException;
60+
import static kotlin.sequences.SequencesKt.*;
61+
62+
@SuppressWarnings("unused")
63+
public class OuraServiceUserRepository implements UserRepository {
64+
Instant MIN_INSTANT = Instant.EPOCH;
65+
66+
public static final JsonFactory JSON_FACTORY = new JsonFactory();
67+
public static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY)
68+
.registerModule(new JavaTimeModule())
69+
.reader();
70+
private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepository.class);
71+
72+
private static final ObjectReader USER_LIST_READER = JSON_READER.forType(OuraUsers.class);
73+
private static final ObjectReader USER_READER = JSON_READER.forType(User.class);
74+
private static final ObjectReader OAUTH_READER = JSON_READER.forType(OAuth2UserCredentials.class);
75+
private static final RequestBody EMPTY_BODY =
76+
RequestBody.create("", MediaType.parse("application/json; charset=utf-8"));
77+
private static final Duration FETCH_THRESHOLD = Duration.ofMinutes(1L);
78+
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60);
79+
private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90);
80+
81+
private final OkHttpClient client;
82+
private final Map<String, OAuth2UserCredentials> cachedCredentials;
83+
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
84+
85+
private HttpUrl baseUrl;
86+
private final HashSet<String> containedUsers;
87+
private Set<? extends User> timedCachedUsers = new HashSet<>();
88+
private OAuth2Client repositoryClient;
89+
private String basicCredentials;
90+
91+
public OuraServiceUserRepository() {
92+
this.client = new OkHttpClient.Builder()
93+
.connectTimeout(CONNECTION_TIMEOUT)
94+
.readTimeout(CONNECTION_READ_TIMEOUT)
95+
.build();
96+
this.cachedCredentials = new HashMap<>();
97+
this.containedUsers = new HashSet<>();
98+
}
99+
100+
@Override
101+
public User get(String key) throws IOException {
102+
Request request = requestFor("users/" + key).build();
103+
return makeRequest(request, USER_READER);
104+
}
105+
106+
public void initialize(OuraRestSourceConnectorConfig config) {
107+
OuraRestSourceConnectorConfig ouraConfig = (OuraRestSourceConnectorConfig) config;
108+
this.baseUrl = ouraConfig.getOuraUserRepositoryUrl();
109+
this.containedUsers.addAll(ouraConfig.getOuraUsers());
110+
111+
URL tokenUrl = ouraConfig.getOuraUserRepositoryTokenUrl();
112+
String clientId = ouraConfig.getOuraUserRepositoryClientId();
113+
String clientSecret = ouraConfig.getOuraUserRepositoryClientSecret();
114+
115+
if (tokenUrl != null) {
116+
if (clientId.isEmpty()) {
117+
throw new ConfigException("Client ID for user repository is not set.");
118+
}
119+
this.repositoryClient = new OAuth2Client.Builder()
120+
.credentials(clientId, clientSecret)
121+
.endpoint(tokenUrl)
122+
.scopes("SUBJECT.READ MEASUREMENT.CREATE")
123+
.httpClient(client)
124+
.build();
125+
} else if (clientId != null) {
126+
basicCredentials = Credentials.basic(clientId, clientSecret);
127+
}
128+
}
129+
130+
@Override
131+
public Sequence<User> stream() {
132+
if (nextFetch.get().equals(MIN_INSTANT)) {
133+
try {
134+
applyPendingUpdates();
135+
} catch (IOException ex) {
136+
logger.error("Failed to initially get users from repository", ex);
137+
}
138+
}
139+
return SequencesKt.asSequence(this.timedCachedUsers.stream().iterator());
140+
}
141+
142+
@Override
143+
public String getAccessToken(User user) throws IOException, UserNotAuthorizedException {
144+
if (!user.isAuthorized()) {
145+
throw new UserNotAuthorizedException("User is not authorized");
146+
}
147+
OAuth2UserCredentials credentials = cachedCredentials.get(user.getId());
148+
if (credentials != null && !credentials.isAccessTokenExpired()) {
149+
return credentials.getAccessToken();
150+
} else {
151+
Request request = requestFor("users/" + user.getId() + "/token").build();
152+
return requestAccessToken(user, request);
153+
}
154+
}
155+
156+
public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException {
157+
if (!user.isAuthorized()) {
158+
throw new UserNotAuthorizedException("User is not authorized");
159+
}
160+
Request request = requestFor("users/" + user.getId() + "/token")
161+
.post(EMPTY_BODY)
162+
.build();
163+
return requestAccessToken(user, request);
164+
}
165+
166+
private String requestAccessToken(User user, Request request)
167+
throws UserNotAuthorizedException, IOException {
168+
try {
169+
OAuth2UserCredentials credentials = makeRequest(request, OAUTH_READER);
170+
cachedCredentials.put(user.getId(), credentials);
171+
return credentials.getAccessToken();
172+
} catch (HttpResponseException ex) {
173+
if (ex.getStatusCode() == 407) {
174+
cachedCredentials.remove(user.getId());
175+
if (user instanceof User) {
176+
// ((User) user).setIsAuthorized(false);
177+
}
178+
throw new UserNotAuthorizedException(ex.getMessage());
179+
}
180+
throw ex;
181+
}
182+
}
183+
184+
public boolean hasPendingUpdates() {
185+
Instant nextFetchTime = nextFetch.get();
186+
Instant now = Instant.now();
187+
return now.isAfter(nextFetchTime);
188+
}
189+
190+
public void applyPendingUpdates() throws IOException {
191+
logger.info("Requesting user information from webservice");
192+
Request request = requestFor("users?source-type=Oura").build();
193+
this.timedCachedUsers =
194+
this.<OuraUsers>makeRequest(request, USER_LIST_READER).getUsers().stream()
195+
.filter(u -> u.isComplete() && (containedUsers.isEmpty()
196+
|| containedUsers.contains(u.getVersionedId())))
197+
.collect(Collectors.toSet());
198+
nextFetch.set(Instant.now().plus(FETCH_THRESHOLD));
199+
}
200+
201+
private Request.Builder requestFor(String relativeUrl) throws IOException {
202+
HttpUrl url = baseUrl.resolve(relativeUrl);
203+
if (url == null) {
204+
throw new IllegalArgumentException("Relative URL is invalid");
205+
}
206+
Request.Builder builder = new Request.Builder().url(url);
207+
String authorization = requestAuthorization();
208+
if (authorization != null) {
209+
builder.addHeader("Authorization", authorization);
210+
}
211+
212+
return builder;
213+
}
214+
215+
private String requestAuthorization() throws IOException {
216+
if (repositoryClient != null) {
217+
try {
218+
return "Bearer " + repositoryClient.getValidToken().getAccessToken();
219+
} catch (TokenException ex) {
220+
throw new IOException(ex);
221+
}
222+
} else if (basicCredentials != null) {
223+
return basicCredentials;
224+
} else {
225+
return null;
226+
}
227+
}
228+
229+
private <T> T makeRequest(Request request, ObjectReader reader) throws IOException {
230+
logger.info("Requesting info from {}", request.url());
231+
try (Response response = client.newCall(request).execute()) {
232+
ResponseBody body = response.body();
233+
234+
if (response.code() == 404) {
235+
throw new NoSuchElementException("URL " + request.url() + " does not exist");
236+
} else if (!response.isSuccessful() || body == null) {
237+
String message = "Failed to make request";
238+
if (response.code() > 0) {
239+
message += " (HTTP status code " + response.code() + ')';
240+
}
241+
if (body != null) {
242+
message += body.string();
243+
}
244+
throw new HttpResponseException(message, response.code());
245+
}
246+
String bodyString = body.string();
247+
try {
248+
return reader.readValue(bodyString);
249+
} catch (JsonProcessingException ex) {
250+
logger.error("Failed to parse JSON: {}\n{}", ex, bodyString);
251+
throw ex;
252+
}
253+
} catch (ProtocolException ex) {
254+
throw new IOException("Failed to make request to user repository", ex);
255+
}
256+
}
257+
}

0 commit comments

Comments
 (0)