Skip to content

Commit ac84fd9

Browse files
authored
Merge pull request #59 from RADAR-base/clientCredentialsSupport
Add OAuth 2.0 client credentials support for user repository
2 parents b4b8bd5 + 85a0d0b commit ac84fd9

File tree

5 files changed

+146
-8
lines changed

5 files changed

+146
-8
lines changed

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ your Fitbit App client ID and client secret. The following tables shows the poss
6363
<tr>
6464
<td>fitbit.user.repository.url</td></td><td>URL for webservice containing user credentials. Only used if a webservice-based user repository is configured.</td></td><td>string</td></td><td>""</td></td><td></td></td><td>low</td></td></tr>
6565
<tr>
66+
<td>fitbit.user.repository.client.id</td></td><td>Client ID for connecting to the service repository.</td></td><td>string</td></td><td>""</td></td><td></td></td><td>medium</td></td></tr>
67+
<tr>
68+
<td>fitbit.user.repository.client.secret</td></td><td>Client secret for connecting to the service repository.</td></td><td>string</td></td><td>""</td></td><td></td></td><td>medium</td></td></tr>
69+
<tr>
70+
<td>fitbit.user.repository.oauth2.token.url</td></td><td>OAuth 2.0 token url for retrieving client credentials.</td></td><td>string</td></td><td>""</td></td><td></td></td><td>medium</td></td></tr>
71+
<tr>
6672
<td>fitbit.intraday.steps.topic</td></td><td>Topic for Fitbit intraday steps</td></td><td>string</td></td><td>connect_fitbit_intraday_steps</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
6773
<tr>
6874
<td>fitbit.intraday.heart.rate.topic</td></td><td>Topic for Fitbit intraday heart_rate</td></td><td>string</td></td><td>connect_fitbit_intraday_heart_rate</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
@@ -76,8 +82,26 @@ your Fitbit App client ID and client secret. The following tables shows the poss
7682
<td>fitbit.activity.log.topic</td></td><td>Topic for Fitbit activity log.</td></td><td>string</td></td><td>connect_fitbit_activity_log</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
7783
<tr>
7884
<td>fitbit.intraday.calories.topic</td></td><td>Topic for Fitbit intraday calories</td></td><td>string</td></td><td>connect_fitbit_intraday_calories</td></td><td>non-empty string without control characters</td></td><td>low</td></td></tr>
85+
<tr>
86+
<td>fitbit.user.firebase.collection.fitbit.name</td></td><td>Firestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.</td></td><td>string</td></td><td>fitbit</td></td><td></td></td><td>low</td></td></tr>
87+
<tr>
88+
<td>fitbit.user.firebase.collection.user.name</td></td><td>Firestore Collection for retrieving User details. Only used when a Firebase based user repository is used.</td></td><td>string</td></td><td>users</td></td><td></td></td><td>low</td></td></tr>
7989
</tbody></table>
8090

91+
If the ManagementPortal is used to authenticate against the user repository, please add an OAuth client to ManagementPortal with the following properties:
92+
93+
```
94+
Client ID: fitbit.user.repository.client.id
95+
Client Secret: fitbit.user.repository.client.secret
96+
Scope: SUBJECT.READ
97+
Resources: res_restAuthorizer
98+
Grant types: client_credentials
99+
Access Token validity: 600
100+
Refresh Token validity: 0
101+
```
102+
103+
Finally set the `fitbit.user.repository.oauth.token.url` to `http://managementportal-app:8080/managementportal/oauth/token`.
104+
81105
Now you can run a full Kafka stack using
82106

83107
```shell

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ subprojects {
2121
maven { url "https://packages.confluent.io/maven/" }
2222
maven { url "https://repo.maven.apache.org/maven2" }
2323
jcenter()
24+
maven { url "https://dl.bintray.com/radar-cns/org.radarcns" }
2425
maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local/' }
2526
}
2627
}

kafka-connect-fitbit-source/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ dependencies {
33
api group: 'io.confluent', name: 'kafka-connect-avro-converter', version: confluentVersion
44
api group: 'org.radarcns', name: 'radar-schemas-commons', version: '0.5.3'
55

6+
implementation group: 'org.radarcns', name: 'oauth-client-util', version: '0.5.8'
67

78
implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-yaml', version: jacksonVersion
89
implementation group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: jacksonVersion

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
2121

2222
import java.lang.reflect.InvocationTargetException;
23+
import java.net.MalformedURLException;
24+
import java.net.URL;
2325
import java.nio.charset.StandardCharsets;
2426
import java.nio.file.Path;
2527
import java.nio.file.Paths;
@@ -122,6 +124,19 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
122124
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DISPLAY = "Intraday calories topic";
123125
private static final String FITBIT_INTRADAY_CALORIES_TOPIC_DEFAULT = "connect_fitbit_intraday_calories";
124126

127+
128+
public static final String FITBIT_USER_REPOSITORY_CLIENT_ID_CONFIG = "fitbit.user.repository.client.id";
129+
private static final String FITBIT_USER_REPOSITORY_CLIENT_ID_DOC = "Client ID for connecting to the service repository.";
130+
private static final String FITBIT_USER_REPOSITORY_CLIENT_ID_DISPLAY = "Client ID for user repository.";
131+
132+
public static final String FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG = "fitbit.user.repository.client.secret";
133+
private static final String FITBIT_USER_REPOSITORY_CLIENT_SECRET_DOC = "Client secret for connecting to the service repository.";
134+
private static final String FITBIT_USER_REPOSITORY_CLIENT_SECRET_DISPLAY = "Client Secret for user repository.";
135+
136+
public static final String FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG = "fitbit.user.repository.oauth2.token.url";
137+
private static final String FITBIT_USER_REPOSITORY_TOKEN_URL_DOC = "OAuth 2.0 token url for retrieving client credentials.";
138+
private static final String FITBIT_USER_REPOSITORY_TOKEN_URL_DISPLAY = "OAuth 2.0 token URL.";
139+
125140
public static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_CONFIG = "fitbit.user.firebase.collection.fitbit.name";
126141
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DOC = "Firestore Collection for retrieving Fitbit Auth details. Only used when a Firebase based user repository is used.";
127142
private static final String FITBIT_USER_REPOSITORY_FIRESTORE_FITBIT_COLLECTION_DISPLAY = "Firebase Fitbit collection name.";
@@ -242,6 +257,37 @@ public String toString() {
242257
Width.SHORT,
243258
FITBIT_USER_REPOSITORY_URL_DISPLAY)
244259

260+
261+
.define(FITBIT_USER_REPOSITORY_CLIENT_ID_CONFIG,
262+
Type.STRING,
263+
"",
264+
Importance.MEDIUM,
265+
FITBIT_USER_REPOSITORY_CLIENT_ID_DOC,
266+
group,
267+
++orderInGroup,
268+
Width.SHORT,
269+
FITBIT_USER_REPOSITORY_CLIENT_ID_DISPLAY)
270+
271+
.define(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG,
272+
Type.STRING,
273+
"",
274+
Importance.MEDIUM,
275+
FITBIT_USER_REPOSITORY_CLIENT_SECRET_DOC,
276+
group,
277+
++orderInGroup,
278+
Width.SHORT,
279+
FITBIT_USER_REPOSITORY_CLIENT_SECRET_DISPLAY)
280+
281+
.define(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG,
282+
Type.STRING,
283+
"",
284+
Importance.MEDIUM,
285+
FITBIT_USER_REPOSITORY_TOKEN_URL_DOC,
286+
group,
287+
++orderInGroup,
288+
Width.SHORT,
289+
FITBIT_USER_REPOSITORY_TOKEN_URL_DISPLAY)
290+
245291
.define(FITBIT_INTRADAY_STEPS_TOPIC_CONFIG,
246292
Type.STRING,
247293
FITBIT_INTRADAY_STEPS_TOPIC_DEFAULT,
@@ -447,4 +493,25 @@ public String getFitbitUserRepositoryFirestoreFitbitCollection() {
447493
public String getFitbitUserRepositoryFirestoreUserCollection() {
448494
return getString(FITBIT_USER_REPOSITORY_FIRESTORE_USER_COLLECTION_CONFIG);
449495
}
496+
497+
public String getFitbitUserRepositoryClientId() {
498+
return getString(FITBIT_USER_REPOSITORY_CLIENT_ID_CONFIG);
499+
}
500+
501+
public String getFitbitUserRepositoryClientSecret() {
502+
return getString(FITBIT_USER_REPOSITORY_CLIENT_SECRET_CONFIG);
503+
}
504+
505+
public URL getFitbitUserRepositoryTokenUrl() {
506+
String value = getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG);
507+
if (value == null || value.isEmpty()) {
508+
return null;
509+
} else {
510+
try {
511+
return new URL(getString(FITBIT_USER_REPOSITORY_TOKEN_URL_CONFIG));
512+
} catch (MalformedURLException e) {
513+
throw new ConfigException("Fitbit user repository token URL is invalid.");
514+
}
515+
}
516+
}
450517
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.java

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@
2222

2323
import com.fasterxml.jackson.core.JsonProcessingException;
2424
import com.fasterxml.jackson.databind.ObjectReader;
25+
import io.confluent.common.config.ConfigException;
2526
import java.io.IOException;
27+
import java.net.MalformedURLException;
28+
import java.net.URL;
2629
import java.time.Duration;
2730
import java.time.Instant;
2831
import java.util.HashMap;
@@ -34,15 +37,19 @@
3437
import java.util.stream.Collectors;
3538
import java.util.stream.Stream;
3639
import javax.ws.rs.NotAuthorizedException;
40+
import okhttp3.Credentials;
3741
import okhttp3.HttpUrl;
3842
import okhttp3.MediaType;
3943
import okhttp3.OkHttpClient;
4044
import okhttp3.Request;
4145
import okhttp3.RequestBody;
4246
import okhttp3.Response;
4347
import okhttp3.ResponseBody;
48+
import org.apache.kafka.connect.errors.ConnectException;
4449
import org.radarbase.connect.rest.RestSourceConnectorConfig;
4550
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
51+
import org.radarcns.exception.TokenException;
52+
import org.radarcns.oauth.OAuth2Client;
4653
import org.slf4j.Logger;
4754
import org.slf4j.LoggerFactory;
4855

@@ -62,8 +69,10 @@ public class ServiceUserRepository implements UserRepository {
6269
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
6370

6471
private HttpUrl baseUrl;
65-
private HashSet<String> containedUsers;
72+
private final HashSet<String> containedUsers;
6673
private Set<? extends User> timedCachedUsers = new HashSet<>();
74+
private OAuth2Client repositoryClient;
75+
private String basicCredentials;
6776

6877
public ServiceUserRepository() {
6978
this.client = new OkHttpClient();
@@ -82,6 +91,24 @@ public void initialize(RestSourceConnectorConfig config) {
8291
FitbitRestSourceConnectorConfig fitbitConfig = (FitbitRestSourceConnectorConfig) config;
8392
this.baseUrl = fitbitConfig.getFitbitUserRepositoryUrl();
8493
this.containedUsers.addAll(fitbitConfig.getFitbitUsers());
94+
95+
URL tokenUrl = fitbitConfig.getFitbitUserRepositoryTokenUrl();
96+
String clientId = fitbitConfig.getFitbitUserRepositoryClientId();
97+
String clientSecret = fitbitConfig.getFitbitUserRepositoryClientSecret();
98+
99+
if (tokenUrl != null) {
100+
if (clientId.isEmpty()) {
101+
throw new ConfigException("Client ID for user repository is not set.");
102+
}
103+
this.repositoryClient = new OAuth2Client.Builder()
104+
.credentials(clientId, clientSecret)
105+
.endpoint(tokenUrl)
106+
.scopes("SUBJECT.READ")
107+
.httpClient(client)
108+
.build();
109+
} else if (clientId != null) {
110+
basicCredentials = Credentials.basic(clientId, clientSecret);
111+
}
85112
}
86113

87114
@Override
@@ -121,21 +148,39 @@ public void applyPendingUpdates() throws IOException {
121148
Request request = requestFor("users?source-type=FitBit").build();
122149
this.timedCachedUsers =
123150
this.<Users>makeRequest(request, USER_LIST_READER).getUsers().stream()
124-
.filter(
125-
u ->
126-
u.isComplete()
127-
&& (containedUsers.isEmpty()
128-
|| containedUsers.contains(u.getVersionedId())))
151+
.filter(u -> u.isComplete()
152+
&& (containedUsers.isEmpty()
153+
|| containedUsers.contains(u.getVersionedId())))
129154
.collect(Collectors.toSet());
130155
nextFetch.set(Instant.now().plus(FETCH_THRESHOLD));
131156
}
132157

133-
private Request.Builder requestFor(String relativeUrl) {
158+
private Request.Builder requestFor(String relativeUrl) throws IOException {
134159
HttpUrl url = baseUrl.resolve(relativeUrl);
135160
if (url == null) {
136161
throw new IllegalArgumentException("Relative URL is invalid");
137162
}
138-
return new Request.Builder().url(url);
163+
Request.Builder builder = new Request.Builder().url(url);
164+
String authorization = requestAuthorization();
165+
if (authorization != null) {
166+
builder.addHeader("Authorization", authorization);
167+
}
168+
169+
return builder;
170+
}
171+
172+
private String requestAuthorization() throws IOException {
173+
if (repositoryClient != null) {
174+
try {
175+
return "Bearer " + repositoryClient.getValidToken().getAccessToken();
176+
} catch (TokenException ex) {
177+
throw new IOException(ex);
178+
}
179+
} else if (basicCredentials != null) {
180+
return basicCredentials;
181+
} else {
182+
return null;
183+
}
139184
}
140185

141186
private <T> T makeRequest(Request request, ObjectReader reader) throws IOException {

0 commit comments

Comments
 (0)