Skip to content

Commit 716f284

Browse files
committed
Merge branch 'dev' of github.com:RADAR-base/RADAR-REST-Connector into dev
2 parents 1f9ac9d + 4e52267 commit 716f284

File tree

10 files changed

+126
-79
lines changed

10 files changed

+126
-79
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ out/
66
.gradle/
77
docker/users
88
docker/source-fitbit.properties
9+
docker/rest-source-authorizer/etc/rest_source_clients_configs.yml

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ your Fitbit App client ID and client secret. The following tables shows the poss
5555
<tr>
5656
<td>fitbit.api.intraday</td></td><td>Set to true if the client has permissions to Fitbit Intraday API, false otherwise.</td></td><td>boolean</td></td><td>true</td></td><td></td></td><td>medium</td></td></tr>
5757
<tr>
58-
<td>fitbit.user.repository.class</td></td><td>Class for managing users and authentication.</td></td><td>class</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlUserRepository</td></td><td>Class extending org.radarbase.connect.rest.fitbit.user.UserRepository</td></td><td>medium</td></td></tr>
58+
<td>fitbit.user.repository.classes</td></td><td>Class(es) for managing users and authentication. Comma separate for using multiple repositories</td></td><td>list</td></td><td>org.radarbase.connect.rest.fitbit.user.YamlUserRepository</td></td><td>Classes extending org.radarbase.connect.rest.fitbit.user.UserRepository</td></td><td>medium</td></td></tr>
5959
<tr>
6060
<td>fitbit.user.dir</td></td><td>Directory containing Fitbit user information and credentials. Only used if a file-based user repository is configured.</td></td><td>string</td></td><td>/var/lib/kafka-connect-fitbit-source/users</td></td><td></td></td><td>low</td></td></tr>
6161
<tr>

docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ services:
163163
- kafka-2
164164
- kafka-3
165165
- schema-registry-1
166+
networks:
167+
- default
166168
environment:
167169
CONNECT_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9092,PLAINTEXT://kafka-3:9092
168170
CONNECT_REST_PORT: 8083
@@ -181,5 +183,6 @@ services:
181183
CONNECT_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
182184
CONNECTOR_PROPERTY_FILE_PREFIX: "source-fitbit"
183185
KAFKA_HEAP_OPTS: "-Xms256m -Xmx768m"
186+
CONNECT_LOG4J_ROOT_LOGLEVEL: DEBUG
184187
KAFKA_BROKERS: 3
185188
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
---
2+
version: '2.4'
3+
4+
services:
5+
radar-rest-source-auth-backend:
6+
image: radarbase/radar-rest-source-auth-backend:1.0.0
7+
depends_on:
8+
- auth-postgresql
9+
environment:
10+
- SPRING_DATASOURCE_URL=jdbc:postgresql://auth-postgresql:5432/restsourceauth
11+
- SPRING_DATASOURCE_USERNAME=radarcns
12+
- SPRING_DATASOURCE_PASSWORD=radarcns
13+
- REST_SOURCE_AUTHORIZER_SOURCE_CLIENTS_FILE_PATH=app-includes/rest_source_clients_configs.yml
14+
- LOGGING_LEVEL_ORG_SPRINGFRAMEWORK_WEB=DEBUG
15+
- APP_SLEEP=10 # gives time for the database to boot before the application
16+
ports:
17+
- "8080:8080"
18+
volumes:
19+
- ./etc:/app-includes
20+
21+
auth-postgresql:
22+
image: postgres:10.4-alpine
23+
environment:
24+
- POSTGRES_USER=radarcns
25+
- POSTGRES_PASSWORD=radarcns
26+
- POSTGRES_DB=restsourceauth
27+
28+
radar-rest-source-authorizer:
29+
image: radarbase/radar-rest-source-authorizer:1.0.0
30+
depends_on:
31+
- radar-rest-source-auth-backend
32+
- auth-postgresql
33+
ports:
34+
- "3000:80"
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
rest_source_clients:
2+
- source_type: FitBit
3+
authorization_endpoint: https://www.fitbit.com/oauth2/authorize
4+
token_endpoint: https://api.fitbit.com/oauth2/token
5+
client_id: FITBITIT
6+
client_secret: FITBITSECRET
7+
scope: activity heartrate sleep profile

docker/source-fitbit.properties.template

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ rest.source.request.generator.class=org.radarbase.connect.rest.fitbit.request.Fi
77
fitbit.api.client=?
88
fitbit.api.secret=?
99
fitbit.max.users.per.poll=10
10-
fitbit.user.repository.class=org.radarbase.connect.rest.fitbit.user.ServiceUserRepository
11-
fitbit.user.repository.url=http://radar-device-auth-backend:8080/
10+
fitbit.user.repository.classes=org.radarbase.connect.rest.fitbit.user.YamlUserRepository,org.radarbase.connect.rest.fitbit.user.ServiceUserRepository
11+
fitbit.user.repository.url=http://localhost:8080/

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@
2323
import java.nio.charset.StandardCharsets;
2424
import java.nio.file.Path;
2525
import java.nio.file.Paths;
26-
import java.time.Duration;
27-
import java.util.Base64;
28-
import java.util.Collections;
29-
import java.util.List;
30-
import java.util.Map;
26+
import java.util.*;
27+
3128
import okhttp3.Headers;
3229
import okhttp3.HttpUrl;
3330
import org.apache.kafka.common.config.ConfigDef;
@@ -40,7 +37,6 @@
4037
import org.apache.kafka.common.config.ConfigException;
4138
import org.apache.kafka.connect.errors.ConnectException;
4239
import org.radarbase.connect.rest.RestSourceConnectorConfig;
43-
import org.radarbase.connect.rest.config.ValidClass;
4440
import org.radarbase.connect.rest.fitbit.user.UserRepository;
4541
import org.radarbase.connect.rest.fitbit.user.YamlUserRepository;
4642

@@ -61,9 +57,9 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
6157
private static final String FITBIT_API_SECRET_DOC = "Secret for the Fitbit API client set in fitbit.api.client.";
6258
private static final String FITBIT_API_SECRET_DISPLAY = "Fitbit API client secret";
6359

64-
public static final String FITBIT_USER_REPOSITORY_CONFIG = "fitbit.user.repository.class";
65-
private static final String FITBIT_USER_REPOSITORY_DOC = "Class for managing users and authentication.";
66-
private static final String FITBIT_USER_REPOSITORY_DISPLAY = "User repository class";
60+
public static final String FITBIT_USER_REPOSITORIES_CONFIG = "fitbit.user.repository.classes";
61+
private static final String FITBIT_USER_REPOSITORIES_DOC = "Classes for managing users and authentication. Can be a Comma-separated list for multiple repositories.";
62+
private static final String FITBIT_USER_REPOSITORIES_DISPLAY = "User repository classes";
6763

6864
public static final String FITBIT_API_INTRADAY_ACCESS_CONFIG = "fitbit.api.intraday";
6965
private static final String FITBIT_API_INTRADAY_ACCESS_DOC = "Set to true if the client has permissions to Fitbit Intraday API, false otherwise.";
@@ -115,19 +111,22 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
115111
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DEFAULT = "connect_fitbit_activity_log";
116112
private static final String FITBIT_ACTIVITY_LOG_TOPIC_DISPLAY = "Activity log topic";
117113

118-
private final UserRepository userRepository;
114+
private final List<UserRepository> userRepositories;
119115
private final Headers clientCredentials;
120116

121117
@SuppressWarnings("unchecked")
122118
public FitbitRestSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig, boolean doLog) {
123119
super(config, parsedConfig, doLog);
120+
userRepositories = new ArrayList<>();
124121

125122
try {
126-
userRepository = ((Class<? extends UserRepository>)
127-
getClass(FITBIT_USER_REPOSITORY_CONFIG)).getDeclaredConstructor().newInstance();
123+
for(String c: getList(FITBIT_USER_REPOSITORIES_CONFIG)) {
124+
userRepositories.add(
125+
((Class<? extends UserRepository>) Class.forName(c)).getDeclaredConstructor().newInstance());
126+
}
128127
} catch (IllegalAccessException | InstantiationException
129-
| InvocationTargetException | NoSuchMethodException e) {
130-
throw new ConnectException("Invalid class for: " + SOURCE_PAYLOAD_CONVERTER_CONFIG, e);
128+
| InvocationTargetException | NoSuchMethodException | ClassNotFoundException e) {
129+
throw new ConnectException("Invalid class(es) for: " + FITBIT_USER_REPOSITORIES_CONFIG, e);
131130
}
132131

133132
String credentialString = getFitbitClient() + ":" + getFitbitClientSecret();
@@ -194,16 +193,15 @@ public String toString() {
194193
Width.SHORT,
195194
FITBIT_API_INTRADAY_ACCESS_DISPLAY)
196195

197-
.define(FITBIT_USER_REPOSITORY_CONFIG,
198-
Type.CLASS,
199-
YamlUserRepository.class,
200-
ValidClass.isSubclassOf(UserRepository.class),
196+
.define(FITBIT_USER_REPOSITORIES_CONFIG,
197+
Type.LIST,
198+
YamlUserRepository.class.getName(),
201199
Importance.MEDIUM,
202-
FITBIT_USER_REPOSITORY_DOC,
200+
FITBIT_USER_REPOSITORIES_DOC,
203201
group,
204202
++orderInGroup,
205203
Width.SHORT,
206-
FITBIT_USER_REPOSITORY_DISPLAY)
204+
FITBIT_USER_REPOSITORIES_DISPLAY)
207205

208206
.define(FITBIT_USER_CREDENTIALS_DIR_CONFIG,
209207
Type.STRING,
@@ -316,9 +314,9 @@ public String getFitbitClientSecret() {
316314
return getPassword(FITBIT_API_SECRET_CONFIG).value();
317315
}
318316

319-
public UserRepository getUserRepository() {
320-
userRepository.initialize(this);
321-
return userRepository;
317+
public List<UserRepository> getUserRepositories() {
318+
userRepositories.forEach(u -> u.initialize(this));
319+
return userRepositories;
322320
}
323321

324322
public String getFitbitIntradayStepsTopic() {

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,21 @@
1717

1818
package org.radarbase.connect.rest.fitbit;
1919

20+
import static java.util.stream.Collectors.toSet;
2021
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;
2122

2223
import java.io.IOException;
23-
import java.util.HashMap;
24-
import java.util.List;
25-
import java.util.Map;
26-
import java.util.Set;
24+
import java.util.*;
2725
import java.util.concurrent.Executors;
2826
import java.util.concurrent.ScheduledExecutorService;
2927
import java.util.concurrent.TimeUnit;
3028
import java.util.stream.Collectors;
31-
import java.util.stream.Stream;
3229

3330
import org.apache.kafka.common.config.ConfigDef;
3431
import org.apache.kafka.common.config.ConfigException;
3532
import org.radarbase.connect.rest.AbstractRestSourceConnector;
3633
import org.radarbase.connect.rest.fitbit.user.User;
34+
import org.radarbase.connect.rest.fitbit.user.UserRepository;
3735
import org.slf4j.Logger;
3836
import org.slf4j.LoggerFactory;
3937

@@ -50,10 +48,13 @@ public void start(Map<String, String> props) {
5048
executor = Executors.newSingleThreadScheduledExecutor();
5149

5250
executor.scheduleAtFixedRate(() -> {
51+
Set<? extends User> newUsers = new HashSet<>();
5352
try {
5453
logger.info("Requesting latest user details...");
55-
Set<? extends User> newUsers = getConfig(props, false).getUserRepository().stream()
56-
.collect(Collectors.toSet());
54+
for(UserRepository u: getConfig(props, false).getUserRepositories()) {
55+
Set users = u.stream().collect(toSet());
56+
newUsers.addAll(users);
57+
}
5758
if (configuredUsers != null && !newUsers.equals(configuredUsers)) {
5859
logger.info("User info mismatch found. Requesting reconfiguration...");
5960
reconfigure();
@@ -95,26 +96,32 @@ private List<Map<String, String>> configureTasks(int maxTasks) {
9596
Map<String, String> baseConfig = config.originalsStrings();
9697
FitbitRestSourceConnectorConfig fitbitConfig = getConfig(baseConfig);
9798
// Divide the users over tasks
98-
try {
99+
List<Map<String, String>> userTasks = new ArrayList<>();
100+
this.configuredUsers = new HashSet<>();
99101

100-
List<Map<String, String>> userTasks = fitbitConfig.getUserRepository().stream()
101-
.map(User::getId)
102-
// group users based on their hashCode
103-
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
104-
// since that allows existing tasks to only handle small modifications users to handle.
105-
.collect(Collectors.groupingBy(
106-
u -> Math.abs(u.hashCode()) % maxTasks,
107-
Collectors.joining(",")))
108-
.values().stream()
109-
.map(u -> {
110-
Map<String, String> config = new HashMap<>(baseConfig);
111-
config.put(FITBIT_USERS_CONFIG, u);
112-
return config;
113-
})
114-
.collect(Collectors.toList());
115-
this.configuredUsers = fitbitConfig.getUserRepository().stream()
116-
.collect(Collectors.toSet());
117-
logger.info("Received userTask Configs {}" , userTasks);
102+
try {
103+
for(UserRepository ur: fitbitConfig.getUserRepositories()) {
104+
List<Map<String, String>> userTasksCurrent = ur.stream()
105+
.map(User::getId)
106+
// group users based on their hashCode
107+
// in principle this allows for more efficient reconfigurations for a fixed number of tasks,
108+
// since that allows existing tasks to only handle small modifications users to handle.
109+
.collect(Collectors.groupingBy(
110+
u -> Math.abs(u.hashCode()) % maxTasks,
111+
Collectors.joining(",")))
112+
.values().stream()
113+
.map(u -> {
114+
Map<String, String> config = new HashMap<>(baseConfig);
115+
config.put(FITBIT_USERS_CONFIG, u);
116+
return config;
117+
})
118+
.collect(Collectors.toList());
119+
userTasks.addAll(userTasksCurrent);
120+
Set currentUsers = ur.stream()
121+
.collect(toSet());
122+
this.configuredUsers.addAll(currentUsers);
123+
logger.info("Received userTask Configs {}", userTasks);
124+
}
118125
return userTasks;
119126
} catch (IOException ex) {
120127
throw new ConfigException("Cannot read users", ex);

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/request/FitbitRequestGenerator.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,7 @@
2323
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
2424
import io.confluent.connect.avro.AvroData;
2525
import java.io.IOException;
26-
import java.util.ArrayList;
27-
import java.util.Arrays;
28-
import java.util.Collections;
29-
import java.util.HashMap;
30-
import java.util.List;
31-
import java.util.Map;
26+
import java.util.*;
3227
import java.util.stream.Collectors;
3328
import java.util.stream.Stream;
3429
import okhttp3.OkHttpClient;
@@ -58,7 +53,7 @@ public class FitbitRequestGenerator extends RequestGeneratorRouter {
5853

5954
private OkHttpClient baseClient;
6055
private final Map<String, OkHttpClient> clients;
61-
private UserRepository userRepository;
56+
private List<UserRepository> userRepositories;
6257
private List<RequestRoute> routes;
6358

6459
public FitbitRequestGenerator() {
@@ -75,35 +70,42 @@ public void initialize(RestSourceConnectorConfig config) {
7570
FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config;
7671
this.baseClient = new OkHttpClient();
7772

78-
this.userRepository = fitbitConfig.getUserRepository();
73+
this.userRepositories = fitbitConfig.getUserRepositories();
7974
this.routes = getRoutes(fitbitConfig);
8075

8176
super.initialize(config);
8277
}
8378

8479
private List<RequestRoute> getRoutes(FitbitRestSourceConnectorConfig config) {
8580
AvroData avroData = new AvroData(20);
86-
List<RequestRoute> localRoutes = new ArrayList<>(5);
87-
localRoutes.add(new FitbitSleepRoute(this, userRepository, avroData));
88-
localRoutes.add(new FitbitTimeZoneRoute(this, userRepository, avroData));
89-
localRoutes.add(new FitbitActivityLogRoute(this, userRepository, avroData));
90-
if (config.hasIntradayAccess()) {
91-
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
92-
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
81+
List<RequestRoute> localRoutes = new ArrayList<>(5 * userRepositories.size());
82+
for(UserRepository userRepository : userRepositories) {
83+
localRoutes.add(new FitbitSleepRoute(this, userRepository, avroData));
84+
localRoutes.add(new FitbitTimeZoneRoute(this, userRepository, avroData));
85+
localRoutes.add(new FitbitActivityLogRoute(this, userRepository, avroData));
86+
if (config.hasIntradayAccess()) {
87+
localRoutes.add(new FitbitIntradayStepsRoute(this, userRepository, avroData));
88+
localRoutes.add(new FitbitIntradayHeartRateRoute(this, userRepository, avroData));
89+
}
9390
}
9491
return localRoutes;
9592
}
9693

97-
public OkHttpClient getClient(User user) {
94+
public OkHttpClient getClient(User user, UserRepository userRepository) {
95+
9896
return clients.computeIfAbsent(user.getId(), u -> baseClient.newBuilder()
99-
.authenticator(new TokenAuthenticator(user, userRepository))
100-
.build());
97+
.authenticator(new TokenAuthenticator(user, userRepository))
98+
.build());
10199
}
102100

103101
public Map<String, Map<String, Object>> getPartitions(String route) {
104102
try {
105-
return userRepository.stream()
106-
.collect(Collectors.toMap(User::getId, u -> getPartition(route, u)));
103+
Map<String, Map<String, Object>> partitions = new HashMap<>();
104+
for(UserRepository userRepository : userRepositories) {
105+
partitions.putAll(userRepository.stream()
106+
.collect(Collectors.toMap(User::getId, u -> getPartition(route, u))));
107+
}
108+
return partitions;
107109
} catch (IOException e) {
108110
logger.warn("Failed to initialize user partitions for route {}: {}", route, e.toString());
109111
return Collections.emptyMap();

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@
3131
import java.time.ZonedDateTime;
3232
import java.time.format.DateTimeFormatter;
3333
import java.time.temporal.TemporalAmount;
34-
import java.util.AbstractMap;
35-
import java.util.Comparator;
36-
import java.util.HashMap;
37-
import java.util.Map;
38-
import java.util.Objects;
39-
import java.util.Set;
34+
import java.util.*;
4035
import java.util.concurrent.ConcurrentHashMap;
4136
import java.util.stream.Collectors;
4237
import java.util.stream.Stream;
@@ -220,7 +215,7 @@ protected FitbitRestRequest newRequest(User user, DateRange dateRange,
220215
.header("Authorization", "Bearer " + userRepository.getAccessToken(user))
221216
.build();
222217
return new FitbitRestRequest(this, request, user, getPartition(user),
223-
generator.getClient(user), dateRange,
218+
generator.getClient(user, userRepository), dateRange,
224219
req -> !tooManyRequestsForUser.contains(((FitbitRestRequest)req).getUser()));
225220
} catch (NotAuthorizedException | IOException ex) {
226221
logger.warn("User {} does not have a configured access token: {}. Skipping.",

0 commit comments

Comments
 (0)