Skip to content

Commit 59f8435

Browse files
committed
Revert multiple user repositories merge
1 parent 716f284 commit 59f8435

File tree

8 files changed

+79
-85
lines changed

8 files changed

+79
-85
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,3 @@ out/
66
.gradle/
77
docker/users
88
docker/source-fitbit.properties
9-
docker/rest-source-authorizer/etc/rest_source_clients_configs.yml

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ your Fitbit App client ID and client secret. The following tables shows the poss
5555
<tr>
5656
<td>fitbit.api.intraday</td></td><td>Set to true if the client has permissions to Fitbit Intraday API, false otherwise.</td></td><td>boolean</td></td><td>true</td></td><td></td></td><td>medium</td></td></tr>
5757
<tr>
58-
<td>fitbit.user.repository.classes</td></td><td>Class(es) for managing users and authentication. Comma separate for using multiple repositories</td></td><td>list</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlUserRepository</td></td><td>Classes extending org.radarbase.connect.rest.fitbit.user.UserRepository</td></td><td>medium</td></td></tr>
58+
<td>fitbit.user.repository.class</td></td><td>Class for managing users and authentication.</td></td><td>class</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlUserRepository</td></td><td>Class extending org.radarbase.connect.rest.fitbit.user.UserRepository</td></td><td>medium</td></td></tr>
5959
<tr>
6060
<td>fitbit.user.dir</td></td><td>Directory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.</td></td><td>string</td></td><td>/var/lib/kafka-connect-fitbit-source/users</td></td><td></td></td><td>low</td></td></tr>
6161
<tr>

docker-compose.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,6 @@ services:
163163
- kafka-2
164164
- kafka-3
165165
- schema-registry-1
166-
networks:
167-
- default
168166
environment:
169167
CONNECT_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9092,PLAINTEXT://kafka-3:9092
170168
CONNECT_REST_PORT: 8083
@@ -183,6 +181,5 @@ services:
183181
CONNECT_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
184182
CONNECTOR_PROPERTY_FILE_PREFIX: "source-fitbit"
185183
KAFKA_HEAP_OPTS: "-Xms256m -Xmx768m"
186-
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
187184
KAFKA_BROKERS: 3
188185
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"

docker/source-fitbit.properties.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ rest.source.request.generator.class=org.radarbase.connect.rest.fitbit.request.Fi
77
fitbit.api.client=?
88
fitbit.api.secret=?
99
fitbit.max.users.per.poll=10
10-
fitbit.user.repository.classes=org.radarbase.connect.rest.fitbit.user.YamlUserRepository,org.radarbase.connect.rest.fitbit.user.ServiceUserRepository
11-
fitbit.user.repository.url=http://localhost:8080/
10+
fitbit.user.repository.class=org.radarbase.connect.rest.fitbit.user.ServiceUserRepository
11+
fitbit.user.repository.url=http://radar-device-auth-backend:8080/

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import java.nio.charset.StandardCharsets;
2424
import java.nio.file.Path;
2525
import java.nio.file.Paths;
26-
import java.util.*;
27-
26+
import java.time.Duration;
27+
import java.util.Base64;
28+
import java.util.Collections;
29+
import java.util.List;
30+
import java.util.Map;
2831
import okhttp3.Headers;
2932
import okhttp3.HttpUrl;
3033
import org.apache.kafka.common.config.ConfigDef;
@@ -37,6 +40,7 @@
3740
import org.apache.kafka.common.config.ConfigException;
3841
import org.apache.kafka.connect.errors.ConnectException;
3942
import org.radarbase.connect.rest.RestSourceConnectorConfig;
43+
import org.radarbase.connect.rest.config.ValidClass;
4044
import org.radarbase.connect.rest.fitbit.user.UserRepository;
4145
import org.radarbase.connect.rest.fitbit.user.YamlUserRepository;
4246

@@ -57,9 +61,9 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
5761
private static final String FITBIT_API_SECRET_DOC = "Secret for the Fitbit API client set in fitbit.api.client.";
5862
private static final String FITBIT_API_SECRET_DISPLAY = "Fitbit API client secret";
5963

60-
public static final String FITBIT_USER_REPOSITORIES_CONFIG = "fitbit.user.repository.classes";
61-
private static final String FITBIT_USER_REPOSITORIES_DOC = "Classes for managing users and authentication. Can be a Comma-separated list for multiple repositories.";
62-
private static final String FITBIT_USER_REPOSITORIES_DISPLAY = "User repository classes";
64+
public static final String FITBIT_USER_REPOSITORY_CONFIG = "fitbit.user.repository.class";
65+
private static final String FITBIT_USER_REPOSITORY_DOC = "Class for managing users and authentication.";
66+
private static final String FITBIT_USER_REPOSITORY_DISPLAY = "User repository class";
6367

6468
public static final String FITBIT_API_INTRADAY_ACCESS_CONFIG = "fitbit.api.intraday";
6569
private static final String FITBIT_API_INTRADAY_ACCESS_DOC = "Set to true if the client has permissions to Fitbit Intraday API, false otherwise.";
@@ -111,22 +115,19 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
111115
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DEFAULT = "connect_fitbit_activity_log";
112116
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DISPLAY = "Activity log topic";
113117

114-
private final List<UserRepository> userRepositories;
118+
private final UserRepository userRepository;
115119
private final Headers clientCredentials;
116120

117121
@SuppressWarnings("unchecked")
118122
public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig, boolean doLog) {
119123
super(config, parsedConfig, doLog);
120-
userRepositories = new ArrayList<>();
121124

122125
try {
123-
for(String c: getList(FITBIT_USER_REPOSITORIES_CONFIG)) {
124-
userRepositories.add(
125-
((Class<? extends UserRepository>) Class.forName(c)).getDeclaredConstructor().newInstance());
126-
}
126+
userRepository = ((Class<? extends UserRepository>)
127+
getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
127128
} catch (IllegalAccessException | InstantiationException
128-
| InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
129-
throw new ConnectException("Invalid class(es) for: " + FITBIT_USER_REPOSITORIES_CONFIG, e);
129+
| InvocationTargetException | NoSuchMethodException e) {
130+
throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e);
130131
}
131132

132133
String credentialString = getFitbitClient() + ":" + getFitbitClientSecret();
@@ -193,15 +194,16 @@ public String toString() {
193194
Width.SHORT,
194195
FITBIT_API_INTRADAY_ACCESS_DISPLAY)
195196

196-
.define(FITBIT_USER_REPOSITORIES_CONFIG,
197-
Type.LIST,
198-
YamlUserRepository.class.getName(),
197+
.define(FITBIT_USER_REPOSITORY_CONFIG,
198+
Type.CLASS,
199+
YamlUserRepository.class,
200+
ValidClass.isSubclassOf(UserRepository.class),
199201
Importance.MEDIUM,
200-
FITBIT_USER_REPOSITORIES_DOC,
202+
FITBIT_USER_REPOSITORY_DOC,
201203
group,
202204
++orderInGroup,
203205
Width.SHORT,
204-
FITBIT_USER_REPOSITORIES_DISPLAY)
206+
FITBIT_USER_REPOSITORY_DISPLAY)
205207

206208
.define(FITBIT_USER_CREDENTIALS_DIR_CONFIG,
207209
Type.STRING,
@@ -314,9 +316,9 @@ public String getFitbitClientSecret() {
314316
return getPassword(FITBIT_API_SECRET_CONFIG).value();
315317
}
316318

317-
public List<UserRepository> getUserRepositories() {
318-
userRepositories.forEach(u -> u.initialize(this));
319-
return userRepositories;
319+
public UserRepository getUserRepository() {
320+
userRepository.initialize(this);
321+
return userRepository;
320322
}
321323

322324
public String getFitbitIntradayStepsTopic() {

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,23 @@
1717

1818
package org.radarbase.connect.rest.fitbit;
1919

20-
import static java.util.stream.Collectors.toSet;
2120
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;
2221

2322
import java.io.IOException;
24-
import java.util.*;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
2527
import java.util.concurrent.Executors;
2628
import java.util.concurrent.ScheduledExecutorService;
2729
import java.util.concurrent.TimeUnit;
2830
import java.util.stream.Collectors;
31+
import java.util.stream.Stream;
2932

3033
import org.apache.kafka.common.config.ConfigDef;
3134
import org.apache.kafka.common.config.ConfigException;
3235
import org.radarbase.connect.rest.AbstractRestSourceConnector;
3336
import org.radarbase.connect.rest.fitbit.user.User;
34-
import org.radarbase.connect.rest.fitbit.user.UserRepository;
3537
import org.slf4j.Logger;
3638
import org.slf4j.LoggerFactory;
3739

@@ -48,13 +50,10 @@ public void start(Map<String, String> props) {
4850
executor = Executors.newSingleThreadScheduledExecutor();
4951

5052
executor.scheduleAtFixedRate(() -> {
51-
Set<? extends User> newUsers = new HashSet<>();
5253
try {
5354
logger.info("Requesting latest user details...");
54-
for(UserRepository u: getConfig(props, false).getUserRepositories()) {
55-
Set users = u.stream().collect(toSet());
56-
newUsers.addAll(users);
57-
}
55+
Set<? extends User> newUsers = getConfig(props, false).getUserRepository().stream()
56+
.collect(Collectors.toSet());
5857
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
5958
logger.info("User info mismatch found. Requesting reconfiguration...");
6059
reconfigure();
@@ -96,32 +95,26 @@ private List<Map<String, String>> configureTasks(int maxTasks) {
9695
Map<String, String> baseConfig = config.originalsStrings();
9796
FitbitRestSourceConnectorConfig fitbitConfig = getConfig(baseConfig);
9897
// Divide the users over tasks
99-
List<Map<String, String>> userTasks = new ArrayList<>();
100-
this.configuredUsers = new HashSet<>();
101-
10298
try {
103-
for(UserRepository ur: fitbitConfig.getUserRepositories()) {
104-
List<Map<String, String>> userTasksCurrent = ur.stream()
105-
.map(User::getId)
106-
// group users based on their hashCode
107-
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
108-
// since that allows existing tasks to only handle small modifications users to handle.
109-
.collect(Collectors.groupingBy(
110-
u -> Math.abs(u.hashCode()) % maxTasks,
111-
Collectors.joining(",")))
112-
.values().stream()
113-
.map(u -> {
114-
Map<String, String> config = new HashMap<>(baseConfig);
115-
config.put(FITBIT_USERS_CONFIG, u);
116-
return config;
117-
})
118-
.collect(Collectors.toList());
119-
userTasks.addAll(userTasksCurrent);
120-
Set currentUsers = ur.stream()
121-
.collect(toSet());
122-
this.configuredUsers.addAll(currentUsers);
123-
logger.info("Received userTask Configs {}", userTasks);
124-
}
99+
100+
List<Map<String, String>> userTasks = fitbitConfig.getUserRepository().stream()
101+
.map(User::getId)
102+
// group users based on their hashCode
103+
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
104+
// since that allows existing tasks to only handle small modifications users to handle.
105+
.collect(Collectors.groupingBy(
106+
u -> Math.abs(u.hashCode()) % maxTasks,
107+
Collectors.joining(",")))
108+
.values().stream()
109+
.map(u -> {
110+
Map<String, String> config = new HashMap<>(baseConfig);
111+
config.put(FITBIT_USERS_CONFIG, u);
112+
return config;
113+
})
114+
.collect(Collectors.toList());
115+
this.configuredUsers = fitbitConfig.getUserRepository().stream()
116+
.collect(Collectors.toSet());
117+
logger.info("Received userTask Configs {}" , userTasks);
125118
return userTasks;
126119
} catch (IOException ex) {
127120
throw new ConfigException("Cannot read users", ex);

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
2424
import io.confluent.connect.avro.AvroData;
2525
import java.io.IOException;
26-
import java.util.*;
26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
2732
import java.util.stream.Collectors;
2833
import java.util.stream.Stream;
2934
import okhttp3.OkHttpClient;
@@ -53,7 +58,7 @@ public class FitbitRequestGenerator extends RequestGeneratorRouter {
5358

5459
private OkHttpClient baseClient;
5560
private final Map<String, OkHttpClient> clients;
56-
private List<UserRepository> userRepositories;
61+
private UserRepository userRepository;
5762
private List<RequestRoute> routes;
5863

5964
public FitbitRequestGenerator() {
@@ -70,42 +75,35 @@ public void initialize(RestSourceConnectorConfig config) {
7075
FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config;
7176
this.baseClient = new OkHttpClient();
7277

73-
this.userRepositories = fitbitConfig.getUserRepositories();
78+
this.userRepository = fitbitConfig.getUserRepository();
7479
this.routes = getRoutes(fitbitConfig);
7580

7681
super.initialize(config);
7782
}
7883

7984
private List<RequestRoute> getRoutes(FitbitRestSourceConnectorConfig config) {
8085
AvroData avroData = new AvroData(20);
81-
List<RequestRoute> localRoutes = new ArrayList<>(5 * userRepositories.size());
82-
for(UserRepository userRepository : userRepositories) {
83-
localRoutes.add(new FitbitSleepRoute(this, userRepository, avroData));
84-
localRoutes.add(new FitbitTimeZoneRoute(this, userRepository, avroData));
85-
localRoutes.add(new FitbitActivityLogRoute(this, userRepository, avroData));
86-
if (config.hasIntradayAccess()) {
87-
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
88-
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
89-
}
86+
List<RequestRoute> localRoutes = new ArrayList<>(5);
87+
localRoutes.add(new FitbitSleepRoute(this, userRepository, avroData));
88+
localRoutes.add(new FitbitTimeZoneRoute(this, userRepository, avroData));
89+
localRoutes.add(new FitbitActivityLogRoute(this, userRepository, avroData));
90+
if (config.hasIntradayAccess()) {
91+
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
92+
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
9093
}
9194
return localRoutes;
9295
}
9396

94-
public OkHttpClient getClient(User user, UserRepository userRepository) {
95-
97+
public OkHttpClient getClient(User user) {
9698
return clients.computeIfAbsent(user.getId(), u -> baseClient.newBuilder()
97-
.authenticator(new TokenAuthenticator(user, userRepository))
98-
.build());
99+
.authenticator(new TokenAuthenticator(user, userRepository))
100+
.build());
99101
}
100102

101103
public Map<String, Map<String, Object>> getPartitions(String route) {
102104
try {
103-
Map<String, Map<String, Object>> partitions = new HashMap<>();
104-
for(UserRepository userRepository : userRepositories) {
105-
partitions.putAll(userRepository.stream()
106-
.collect(Collectors.toMap(User::getId, u -> getPartition(route, u))));
107-
}
108-
return partitions;
105+
return userRepository.stream()
106+
.collect(Collectors.toMap(User::getId, u -> getPartition(route, u)));
109107
} catch (IOException e) {
110108
logger.warn("Failed to initialize user partitions for route {}: {}", route, e.toString());
111109
return Collections.emptyMap();

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@
3131
import java.time.ZonedDateTime;
3232
import java.time.format.DateTimeFormatter;
3333
import java.time.temporal.TemporalAmount;
34-
import java.util.*;
34+
import java.util.AbstractMap;
35+
import java.util.Comparator;
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.Objects;
39+
import java.util.Set;
3540
import java.util.concurrent.ConcurrentHashMap;
3641
import java.util.stream.Collectors;
3742
import java.util.stream.Stream;
@@ -215,7 +220,7 @@ protected FitbitRestRequest newRequest(User user, DateRange dateRange,
215220
.header("Authorization", "Bearer " + userRepository.getAccessToken(user))
216221
.build();
217222
return new FitbitRestRequest(this, request, user, getPartition(user),
218-
generator.getClient(user, userRepository), dateRange,
223+
generator.getClient(user), dateRange,
219224
req -> !tooManyRequestsForUser.contains(((FitbitRestRequest)req).getUser()));
220225
} catch (NotAuthorizedException | IOException ex) {
221226
logger.warn("User {} does not have a configured access token: {}. Skipping.",

0 commit comments

Comments
 (0)