Skip to content

Commit f6624b3

Browse files
authored
Merge pull request #163 from RADAR-base/release-0.6.2
Release 0.6.2
2 parents 520f316 + 90976ea commit f6624b3

File tree

15 files changed

+206
-22
lines changed

15 files changed

+206
-22
lines changed

README.md

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
11
# Kafka Connect REST Source and Fitbit Source
22

3-
This project contains a Kafka Connect source connector for a general REST API, and one for
4-
Fitbit in particular. The documentation of the Kafka Connect REST source still needs to be done.
3+
This project contains a Kafka Connect source connector for a general REST API, for
4+
specific Fitbit and Oura devices. The documentation of the Kafka Connect REST source still needs to
5+
be done.
6+
7+
<!-- TOC -->
8+
9+
* [Kafka Connect REST Source and Fitbit Source](#kafka-connect-rest-source-and-fitbit-source)
10+
* [Fitbit source connector](#fitbit-source-connector)
11+
* [Installation](#installation)
12+
* [Usage](#usage)
13+
* [Sentry monitoring](#sentry-monitoring)
14+
* [Contributing](#contributing)
15+
16+
<!-- TOC -->
517

618
## Fitbit source connector
719

@@ -12,7 +24,9 @@ of Java 17 or later.
1224

1325
### Usage
1426

15-
Generally, this component is installed with [RADAR-Kubernetes](https://github.com/RADAR-base/RADAR-Kubernetes). It uses Docker image [radarbase/kafka-connect-rest-fitbit-source](https://hub.docker.com/r/radarbase/kafka-connect-rest-fitbit-source).
27+
Generally, this component is installed
28+
with [RADAR-Kubernetes](https://github.com/RADAR-base/RADAR-Kubernetes). It uses Docker
29+
image [radarbase/kafka-connect-rest-fitbit-source](https://hub.docker.com/r/radarbase/kafka-connect-rest-fitbit-source).
1630

1731
First, [register a Fitbit App](https://dev.fitbit.com/apps) with Fitbit. It should be either a
1832
server app, for multiple users, or a personal app for a single user. With the server app, you need
@@ -22,7 +36,8 @@ For every Fitbit user you want access to, copy `docker/fitbit-user.yml.template`
2236
`docker/users/`. Get an access token and refresh token for the user using for example the
2337
[Fitbit OAuth 2.0 tutorial page](https://dev.fitbit.com/apps/oauthinteractivetutorial).
2438

25-
For automatic configuration for multiple users, please take a look at `scripts/REDCAP-FITBIT-AUTH-AUTO/README.md`.
39+
For automatic configuration for multiple users, please take a look at
40+
`scripts/REDCAP-FITBIT-AUTH-AUTO/README.md`.
2641

2742
Copy `docker/source-fitbit.properties.template` to `docker/source-fitbit.properties` and enter
2843
your Fitbit App client ID and client secret. The following tables shows the possible properties.
@@ -94,7 +109,8 @@ your Fitbit App client ID and client secret. The following tables shows the poss
94109
<td>fitbit.user.firebase.collection.user.name</td><td>Firestore Collection for retrieving User details. Only used when a Firebase based user repository is used.</td><td>string</td><td>users</td><td></td><td>low</td></tr>
95110
</tbody></table>
96111

97-
If the ManagementPortal is used to authenticate against the user repository, please add an OAuth client to ManagementPortal with the following properties:
112+
If the ManagementPortal is used to authenticate against the user repository, please add an OAuth
113+
client to ManagementPortal with the following properties:
98114

99115
```
100116
Client ID: fitbit.user.repository.client.id
@@ -106,7 +122,8 @@ Access Token validity: 600
106122
Refresh Token validity: 0
107123
```
108124

109-
Finally set the `fitbit.user.repository.oauth.token.url` to `http://managementportal-app:8080/managementportal/oauth/token`.
125+
Finally set the `fitbit.user.repository.oauth.token.url` to
126+
`http://managementportal-app:8080/managementportal/oauth/token`.
110127

111128
Now you can run a full Kafka stack using
112129

@@ -163,7 +180,29 @@ sequenceDiagram
163180
connector ->> connector: Update offset times
164181
```
165182

183+
## Sentry monitoring
184+
185+
To enable Sentry monitoring for the generic REST, Fitbit, or Oura source connector service:
186+
187+
1. Set a `SENTRY_DSN` environment variable that points to the desired Sentry DSN.
188+
2. (Optional) Set the `SENTRY_LOG_LEVEL` environment variable to control the minimum log level of
189+
events sent to Sentry.
190+
The default log level for Sentry is `WARN`. Possible values are `TRACE`, `DEBUG`, `INFO`, `WARN`,
191+
and `ERROR`.
192+
193+
For further configuration of Sentry via environmental variables see [here](https://docs.sentry.io/platforms/java/configuration/#configuration-via-the-runtime-environment). For instance:
194+
195+
```
196+
SENTRY_LOG_LEVEL: 'ERROR'
197+
SENTRY_DSN: 'https://000000000000.ingest.de.sentry.io/000000000000'
198+
SENTRY_ATTACHSTACKTRACE: true
199+
SENTRY_STACKTRACE_APP_PACKAGES: io.confluent.connect,org.radarbase.connect.rest
200+
```
201+
166202
## Contributing
167203

168-
Code should be formatted using the [Google Java Code Style Guide](https://google.github.io/styleguide/javaguide.html).
169-
If you want to contribute a feature or fix browse our [issues](https://github.com/RADAR-base/RADAR-REST-Connector/issues), and please make a pull request.
204+
Code should be formatted using
205+
the [Google Java Code Style Guide](https://google.github.io/styleguide/javaguide.html).
206+
If you want to contribute a feature or fix browse
207+
our [issues](https://github.com/RADAR-base/RADAR-REST-Connector/issues), and please make a pull
208+
request.

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
@Suppress("ConstPropertyName", "MemberVisibilityCanBePrivate")
22
object Versions {
3-
const val project = "0.6.1"
3+
const val project = "0.6.2"
44

55
const val java = 17
66
const val kotlin = "1.9.22"
@@ -18,6 +18,7 @@ object Versions {
1818

1919
const val log4j2 = "2.23.1"
2020
const val slf4j = "2.0.13"
21+
const val sentryLog4j = "1.7.30"
2122

2223
const val okhttp = "4.12.0"
2324

docker-compose.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ services:
180180
KAFKA_HEAP_OPTS: "-Xms256m -Xmx768m"
181181
KAFKA_BROKERS: 3
182182
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
183+
# SENTRY_LOG_LEVEL: 'ERROR'
184+
# SENTRY_DSN: 'https://000000000000.ingest.de.sentry.io/000000000000'
185+
# SENTRY_ATTACHSTACKTRACE: true
186+
# SENTRY_STACKTRACE_APP_PACKAGES: io.confluent.connect,org.radarbase.connect.rest
183187

184188
#---------------------------------------------------------------------------#
185189
# RADAR Oura connector #
@@ -221,3 +225,7 @@ services:
221225
KAFKA_HEAP_OPTS: "-Xms256m -Xmx768m"
222226
KAFKA_BROKERS: 3
223227
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
228+
# SENTRY_LOG_LEVEL: 'ERROR'
229+
# SENTRY_DSN: 'https://000000000000.ingest.de.sentry.io/000000000000'
230+
# SENTRY_ATTACHSTACKTRACE: true
231+
# SENTRY_STACKTRACE_APP_PACKAGES: io.confluent.connect,org.radarbase.connect.rest

docker/kafka-wait renamed to docker/ensure

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ fi
77

88
max_timeout=32
99

10-
1110
IS_TEMP=0
1211

12+
echo "===> Wait for infrastructure ..."
13+
1314
if [ -z "$COMMAND_CONFIG_FILE_PATH" ]; then
1415
COMMAND_CONFIG_FILE_PATH="$(mktemp)"
1516
IS_TEMP=1

docker/launch

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,6 @@ if [ "$KAFKA_JMX_PORT" ]; then
3333
export JMX_PORT=$KAFKA_JMX_PORT
3434
export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Dcom.sun.management.jmxremote.port=$JMX_PORT"
3535
fi
36-
#
37-
## Busy waiting loop that waits until all topic are available
38-
echo "===> Wait for infrastructure ..."
39-
kafka-wait
40-
radar_check=$?
41-
if [ "$radar_check" -ne 0 ]; then
42-
exit $radar_check
43-
fi
4436

4537
echo "===> Launching ${COMPONENT} ..."
4638
# Add our jar to the classpath so that the custom classes can be loaded first.

docker/log4j.properties.template

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# This template file was taken from the Confluent Platform distribution and modified to add Sentry support in Docker images.
2+
# See: https://docs.confluent.io/platform/current/installation/docker/development.html#log-to-external-volumes
3+
4+
log4j.rootLogger={{ env["CONNECT_LOG4J_ROOT_LOGLEVEL"] | default('INFO') }}, stdout{% if env['SENTRY_DSN'] %}, sentryAppender{% endif %}
5+
6+
7+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
8+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
9+
log4j.appender.stdout.layout.ConversionPattern ={{ env["CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN"] | default('[%d] %p %m (%c)%n') }}
10+
11+
# Appender for Sentry monitoring
12+
{% if env['SENTRY_DSN'] %}
13+
log4j.appender.sentryAppender=io.sentry.log4j.SentryAppender
14+
log4j.appender.sentryAppender.threshold={{ env['SENTRY_LOG_LEVEL'] | default('ERROR') }}
15+
{% endif %}
16+
17+
{% set default_loggers = {
18+
'org.reflections': 'ERROR',
19+
'org.apache.zookeeper': 'ERROR',
20+
'org.I0Itec.zkclient': 'ERROR'
21+
} -%}
22+
23+
{% if env['CONNECT_LOG4J_LOGGERS'] %}
24+
# loggers from CONNECT_LOG4J_LOGGERS env variable
25+
{% set loggers = parse_log4j_loggers(env['CONNECT_LOG4J_LOGGERS']) %}
26+
{% else %}
27+
# default log levels
28+
{% set loggers = default_loggers %}
29+
{% endif %}
30+
{% for logger,loglevel in loggers.items() %}
31+
log4j.logger.{{logger}}={{loglevel}}
32+
{% endfor %}

kafka-connect-fitbit-source/Dockerfile

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,13 @@ COPY --from=builder /code/kafka-connect-rest-source/build/libs/*.jar ${CONNECT_P
5252
COPY --from=builder /code/kafka-connect-fitbit-source/build/libs/*.jar ${CONNECT_PLUGIN_PATH}/kafka-connect-fitbit-source/
5353

5454
# Load topics validator
55-
COPY --chown=appuser:appuser ./docker/kafka-wait /usr/bin/kafka-wait
55+
COPY --chown=appuser:appuser ./docker/ensure /etc/confluent/docker/ensure
5656

5757
# Load modified launcher
5858
COPY --chown=appuser:appuser ./docker/launch /etc/confluent/docker/launch
59+
60+
# Overwrite the log4j configuration to include Sentry monitoring.
61+
COPY ./docker/log4j.properties.template /etc/confluent/docker/log4j.properties.template
62+
# Copy Sentry monitoring jars.
63+
COPY --from=builder /code/kafka-connect-fitbit-source/build/third-party/sentry-* /etc/kafka-connect/jars
64+

kafka-connect-fitbit-source/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,5 +34,12 @@ dependencies {
3434
compileOnly(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}"))
3535
compileOnly("com.fasterxml.jackson.core:jackson-databind")
3636

37+
// Application monitoring
38+
// This dependency is not used by the REST connector, but copied into the Docker image (Dockerfile)
39+
compileOnly("io.sentry:sentry-log4j:${Versions.sentryLog4j}") {
40+
// Exclude log4j with security vulnerability (safe version is provided by docker image).
41+
exclude(group = "log4j", module = "log4j")
42+
}
43+
3744
testImplementation("org.apache.kafka:connect-api:${Versions.kafka}")
3845
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ class ServiceUserRepository : UserRepository {
9090
tokenUrl = URLBuilder(config.fitbitUserRepositoryTokenUrl.toString()).build(),
9191
clientId = config.fitbitUserRepositoryClientId,
9292
clientSecret = config.fitbitUserRepositoryClientSecret,
93+
scope = "SUBJECT.READ MEASUREMENT.CREATE",
94+
audience = "res_restAuthorizer",
9395
)
9496

9597
val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
@@ -113,6 +115,8 @@ class ServiceUserRepository : UserRepository {
113115
tokenUrl: Url?,
114116
clientId: String?,
115117
clientSecret: String?,
118+
scope: String?,
119+
audience: String?,
116120
): HttpClient = HttpClient(CIO) {
117121
if (tokenUrl != null) {
118122
install(Auth) {
@@ -121,6 +125,8 @@ class ServiceUserRepository : UserRepository {
121125
tokenUrl.toString(),
122126
clientId,
123127
clientSecret,
128+
scope,
129+
audience,
124130
).copyWithEnv("MANAGEMENT_PORTAL"),
125131
baseUrl.host,
126132
)

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
import java.util.stream.Collectors;
3737
import java.util.stream.Stream;
3838
import okhttp3.Credentials;
39-
import okhttp3.HttpUrl;
39+
import okhttp3.FormBody;
40+
import okhttp3.HttpUrl;
4041
import okhttp3.MediaType;
4142
import okhttp3.OkHttpClient;
4243
import okhttp3.Request;
@@ -64,6 +65,9 @@ public class ServiceUserRepositoryLegacy implements UserRepository {
6465
private static final Duration CONNECTION_TIMEOUT = Duration.ofSeconds(60);
6566
private static final Duration CONNECTION_READ_TIMEOUT = Duration.ofSeconds(90);
6667

68+
private static final String CLIENT_AUDIENCE = "res_restAuthorizer";
69+
private static final String CLIENT_AUDIENCE_KEY = "audience";
70+
6771
private final OkHttpClient client;
6872
private final Map<String, OAuth2UserCredentials> cachedCredentials;
6973
private final AtomicReference<Instant> nextFetch = new AtomicReference<>(MIN_INSTANT);
@@ -76,6 +80,19 @@ public class ServiceUserRepositoryLegacy implements UserRepository {
7680

7781
public ServiceUserRepositoryLegacy() {
7882
this.client = new OkHttpClient.Builder()
83+
.addInterceptor(chain -> {
84+
Request req = chain.request();
85+
if ("POST".equalsIgnoreCase(req.method()) && req.body() instanceof FormBody) {
86+
FormBody oldBody = (FormBody) req.body();
87+
FormBody.Builder newBody = new FormBody.Builder();
88+
for (int i = 0; i < oldBody.size(); i++) {
89+
newBody.addEncoded(oldBody.encodedName(i), oldBody.encodedValue(i));
90+
}
91+
newBody.add(CLIENT_AUDIENCE_KEY, CLIENT_AUDIENCE);
92+
req = req.newBuilder().post(newBody.build()).build();
93+
}
94+
return chain.proceed(req);
95+
})
7996
.connectTimeout(CONNECTION_TIMEOUT)
8097
.readTimeout(CONNECTION_READ_TIMEOUT)
8198
.build();

0 commit comments

Comments
 (0)