Skip to content

Commit 111eb92

Browse files
committed
Add OuraUserRepository abstract class and extend from this
1 parent fec341c commit 111eb92

File tree

7 files changed

+57
-15
lines changed

7 files changed

+57
-15
lines changed

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraRestSourceConnectorConfig.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
import org.apache.kafka.common.config.ConfigDef.Width;
4444
import org.apache.kafka.common.config.ConfigException;
4545
import org.apache.kafka.connect.errors.ConnectException;
46+
import org.radarbase.connect.rest.oura.user.OuraUserRepository;
4647
import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository;
47-
import org.radarbase.connect.rest.oura.user.OuraServiceUserRepositoryLegacy;
4848

4949
public class OuraRestSourceConnectorConfig extends AbstractConfig {
5050
public static final Pattern COLON_PATTERN = Pattern.compile(":");
@@ -102,7 +102,7 @@ public class OuraRestSourceConnectorConfig extends AbstractConfig {
102102
private static final String OURA_USER_REPOSITORY_TOKEN_URL_DOC = "OAuth 2.0 token url for retrieving client credentials.";
103103
private static final String OURA_USER_REPOSITORY_TOKEN_URL_DISPLAY = "OAuth 2.0 token URL.";
104104

105-
private OuraServiceUserRepository userRepository;
105+
private OuraUserRepository userRepository;
106106
private final Headers clientCredentials;
107107

108108
public OuraRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig, boolean doLog) {
@@ -193,7 +193,7 @@ public String toString() {
193193

194194
.define(OURA_USER_REPOSITORY_CONFIG,
195195
Type.CLASS,
196-
OuraServiceUserRepositoryLegacy.class,
196+
OuraServiceUserRepository.class,
197197
Importance.MEDIUM,
198198
OURA_USER_REPOSITORY_DOC,
199199
group,
@@ -255,7 +255,7 @@ public String getOuraClientSecret() {
255255
return getPassword(OURA_API_SECRET_CONFIG).value();
256256
}
257257

258-
public OuraServiceUserRepository getUserRepository(OuraServiceUserRepository reuse) {
258+
public OuraUserRepository getUserRepository(OuraUserRepository reuse) {
259259
if (reuse != null && reuse.getClass().equals(getClass(OURA_USER_REPOSITORY_CONFIG))) {
260260
userRepository = reuse;
261261
} else {
@@ -265,15 +265,15 @@ public OuraServiceUserRepository getUserRepository(OuraServiceUserRepository reu
265265
return userRepository;
266266
}
267267

268-
public OuraServiceUserRepository getUserRepository() {
268+
public OuraUserRepository getUserRepository() {
269269
userRepository.initialize(this);
270270
return userRepository;
271271
}
272272

273273
@SuppressWarnings("unchecked")
274-
public OuraServiceUserRepository createUserRepository() {
274+
public OuraUserRepository createUserRepository() {
275275
try {
276-
return ((Class<? extends OuraServiceUserRepository>)
276+
return ((Class<? extends OuraUserRepository>)
277277
getClass(OURA_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
278278
} catch (IllegalAccessException | InstantiationException
279279
| InvocationTargetException | NoSuchMethodException e) {

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.kafka.common.config.ConfigDef;
3030
import org.apache.kafka.common.config.ConfigException;
3131
import org.radarbase.oura.user.User;
32-
import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository;
32+
import org.radarbase.connect.rest.oura.user.OuraUserRepository;
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

@@ -45,7 +45,7 @@ public class OuraSourceConnector extends AbstractRestSourceConnector {
4545
private static final Logger logger = LoggerFactory.getLogger(OuraSourceConnector.class);
4646
private ScheduledExecutorService executor;
4747
private Set<? extends User> configuredUsers;
48-
private OuraServiceUserRepository repository;
48+
private OuraUserRepository repository;
4949

5050
@Override
5151
public void start(Map<String, String> props) {

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/OuraSourceTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
import org.apache.kafka.connect.source.SourceTask;
3636
import org.apache.kafka.connect.storage.OffsetStorageReader;
3737
import org.radarbase.connect.rest.oura.offset.KafkaOffsetManager;
38-
import org.radarbase.connect.rest.oura.user.OuraServiceUserRepository;
38+
import org.radarbase.connect.rest.oura.user.OuraUserRepository;
3939
import org.radarbase.connect.rest.oura.util.VersionUtil;
4040
import org.radarbase.oura.converter.TopicData;
4141
import org.radarbase.oura.request.OuraRequestGenerator;
@@ -57,7 +57,7 @@ public class OuraSourceTask extends SourceTask {
5757
private static final Logger logger = LoggerFactory.getLogger(OuraSourceTask.class);
5858

5959
private OkHttpClient baseClient;
60-
private OuraServiceUserRepository userRepository;
60+
private OuraUserRepository userRepository;
6161
private List<Route> routes;
6262
private OuraRequestGenerator ouraRequestGenerator;
6363
private AvroData avroData = new AvroData(20);

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepository.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ import kotlin.time.Duration.Companion.minutes
6767
import kotlin.time.Duration.Companion.seconds
6868

6969
@Suppress("unused")
70-
class OuraServiceUserRepository : OuraServiceUserRepositoryLegacy() {
70+
class OuraServiceUserRepository : OuraUserRepository() {
7171
private lateinit var userCache: CachedSet<OuraUser>
7272
private lateinit var client: HttpClient
7373
private val credentialCaches = ConcurrentHashMap<String, CachedValue<OAuth2UserCredentials>>()

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/OuraServiceUserRepositoryLegacy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@
6060
import static kotlin.sequences.SequencesKt.*;
6161

6262
@SuppressWarnings("unused")
63-
public class OuraServiceUserRepositoryLegacy implements UserRepository {
63+
public class OuraServiceUserRepositoryLegacy extends OuraUserRepository {
6464
Instant MIN_INSTANT = Instant.EPOCH;
6565

6666
public static final JsonFactory JSON_FACTORY = new JsonFactory();
6767
public static final ObjectReader JSON_READER = new ObjectMapper(JSON_FACTORY)
6868
.registerModule(new JavaTimeModule())
6969
.reader();
70-
private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepository.class);
70+
private static final Logger logger = LoggerFactory.getLogger(OuraServiceUserRepositoryLegacy.class);
7171

7272
private static final ObjectReader USER_LIST_READER = JSON_READER.forType(OuraUsers.class);
7373
private static final ObjectReader USER_READER = JSON_READER.forType(User.class);
@@ -153,6 +153,7 @@ public String getAccessToken(User user) throws IOException, UserNotAuthorizedExc
153153
}
154154
}
155155

156+
@Override
156157
public String refreshAccessToken(User user) throws IOException, UserNotAuthorizedException {
157158
if (!user.isAuthorized()) {
158159
throw new UserNotAuthorizedException("User is not authorized");
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2018 The Hyve
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package org.radarbase.connect.rest.oura.user
18+
19+
import org.radarbase.connect.rest.oura.OuraRestSourceConnectorConfig
20+
import org.radarbase.oura.user.User
21+
import org.radarbase.oura.user.UserNotAuthorizedException
22+
import org.radarbase.oura.user.UserRepository
23+
import org.slf4j.LoggerFactory
24+
import java.io.IOException
25+
26+
@Suppress("unused")
27+
abstract class OuraUserRepository : UserRepository {
28+
abstract fun initialize(config: OuraRestSourceConnectorConfig)
29+
30+
@Throws(IOException::class, UserNotAuthorizedException::class)
31+
abstract fun refreshAccessToken(user: User): String
32+
33+
@Throws(IOException::class)
34+
abstract fun applyPendingUpdates()
35+
36+
abstract fun hasPendingUpdates(): Boolean
37+
38+
companion object {
39+
private val logger = LoggerFactory.getLogger(OuraUserRepository::class.java)
40+
}
41+
}

kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/user/UserNotAuthorizedException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.radarbase.connect.rest.oura.user;
1919

20-
public class UserNotAuthorizedException extends Exception {
20+
public class UserNotAuthorizedException extends RuntimeException {
2121
public UserNotAuthorizedException(String message) {
2222
super(message);
2323
}

0 commit comments

Comments
 (0)