Skip to content

Commit 88dbcc4

Browse files
authored
Merge pull request #150 from RADAR-base/feature/sentry
Add support for Sentry monitoring
2 parents 39ca6dc + 17d0469 commit 88dbcc4

File tree

16 files changed

+225
-31
lines changed

16 files changed

+225
-31
lines changed

README.md

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,19 @@
11
# Kafka Connect REST Source and Fitbit Source
22

3-
This project contains a Kafka Connect source connector for a general REST API, and one for
4-
Fitbit in particular. The documentation of the Kafka Connect REST source still needs to be done.
3+
This project contains a Kafka Connect source connector for a general REST API, for
4+
specific Fitbit and Oura devices. The documentation of the Kafka Connect REST source still needs to
5+
be done.
6+
7+
<!-- TOC -->
8+
9+
* [Kafka Connect REST Source and Fitbit Source](#kafka-connect-rest-source-and-fitbit-source)
10+
* [Fitbit source connector](#fitbit-source-connector)
11+
* [Installation](#installation)
12+
* [Usage](#usage)
13+
* [Sentry monitoring](#sentry-monitoring)
14+
* [Contributing](#contributing)
15+
16+
<!-- TOC -->
517

618
## Fitbit source connector
719

@@ -12,7 +24,9 @@ of Java 17 or later.
1224

1325
### Usage
1426

15-
Generally, this component is installed with [RADAR-Kubernetes](https://github.com/RADAR-base/RADAR-Kubernetes). It uses Docker image [radarbase/kafka-connect-rest-fitbit-source](https://hub.docker.com/r/radarbase/kafka-connect-rest-fitbit-source).
27+
Generally, this component is installed
28+
with [RADAR-Kubernetes](https://github.com/RADAR-base/RADAR-Kubernetes). It uses Docker
29+
image [radarbase/kafka-connect-rest-fitbit-source](https://hub.docker.com/r/radarbase/kafka-connect-rest-fitbit-source).
1630

1731
First, [register a Fitbit App](https://dev.fitbit.com/apps) with Fitbit. It should be either a
1832
server app, for multiple users, or a personal app for a single user. With the server app, you need
@@ -22,7 +36,8 @@ For every Fitbit user you want access to, copy `docker/fitbit-user.yml.template`
2236
`docker/users/`. Get an access token and refresh token for the user using for example the
2337
[Fitbit OAuth 2.0 tutorial page](https://dev.fitbit.com/apps/oauthinteractivetutorial).
2438

25-
For automatic configuration for multiple users, please take a look at `scripts/REDCAP-FITBIT-AUTH-AUTO/README.md`.
39+
For automatic configuration for multiple users, please take a look at
40+
`scripts/REDCAP-FITBIT-AUTH-AUTO/README.md`.
2641

2742
Copy `docker/source-fitbit.properties.template` to `docker/source-fitbit.properties` and enter
2843
your Fitbit App client ID and client secret. The following tables shows the possible properties.
@@ -37,6 +52,10 @@ your Fitbit App client ID and client secret. The following tables shows the poss
3752
<th>Importance</th>
3853
</tr>
3954
<tr>
55+
<td>application.loop.interval.ms</td><td>How often to perform the main application loop (only controls how often to poll for new user registrations).></td><td>long</td><td>300000</td><td></td><td></td></tr>
56+
<tr>
57+
<td>user.cache.refresh.interval.ms</td><td>How often to invalidate the cache and poll for new user registrations.</td><td>long</td><td>3600000</td><td></td><td></td></tr>
58+
<tr>
4059
<td>rest.source.poll.interval.ms</td><td>How often to poll the source URL.</td><td>long</td><td>60000</td><td></td><td>low</td></tr>
4160
<tr>
4261
<td>rest.source.base.url</td><td>Base URL for REST source connector.</td><td>string</td><td></td><td></td><td>high</td></tr>
@@ -90,7 +109,8 @@ your Fitbit App client ID and client secret. The following tables shows the poss
90109
<td>fitbit.user.firebase.collection.user.name</td><td>Firestore Collection for retrieving User details. Only used when a Firebase based user repository is used.</td><td>string</td><td>users</td><td></td><td>low</td></tr>
91110
</tbody></table>
92111

93-
If the ManagementPortal is used to authenticate against the user repository, please add an OAuth client to ManagementPortal with the following properties:
112+
If the ManagementPortal is used to authenticate against the user repository, please add an OAuth
113+
client to ManagementPortal with the following properties:
94114

95115
```
96116
Client ID: fitbit.user.repository.client.id
@@ -102,7 +122,8 @@ Access Token validity: 600
102122
Refresh Token validity: 0
103123
```
104124

105-
Finally set the `fitbit.user.repository.oauth.token.url` to `http://managementportal-app:8080/managementportal/oauth/token`.
125+
Finally set the `fitbit.user.repository.oauth.token.url` to
126+
`http://managementportal-app:8080/managementportal/oauth/token`.
106127

107128
Now you can run a full Kafka stack using
108129

@@ -159,7 +180,29 @@ sequenceDiagram
159180
connector ->> connector: Update offset times
160181
```
161182

183+
## Sentry monitoring
184+
185+
To enable Sentry monitoring for the generic REST, Fitbit, or Oura source connector service:
186+
187+
1. Set a `SENTRY_DSN` environment variable that points to the desired Sentry DSN.
188+
2. (Optional) Set the `SENTRY_LOG_LEVEL` environment variable to control the minimum log level of
189+
events sent to Sentry.
190+
The default log level for Sentry is `WARN`. Possible values are `TRACE`, `DEBUG`, `INFO`, `WARN`,
191+
and `ERROR`.
192+
193+
For further configuration of Sentry via environmental variables see [here](https://docs.sentry.io/platforms/java/configuration/#configuration-via-the-runtime-environment). For instance:
194+
195+
```
196+
SENTRY_LOG_LEVEL: 'ERROR'
197+
SENTRY_DSN: 'https://000000000000.ingest.de.sentry.io/000000000000'
198+
SENTRY_ATTACHSTACKTRACE: true
199+
SENTRY_STACKTRACE_APP_PACKAGES: io.confluent.connect,org.radarbase.connect.rest
200+
```
201+
162202
## Contributing
163203

164-
Code should be formatted using the [Google Java Code Style Guide](https://google.github.io/styleguide/javaguide.html).
165-
If you want to contribute a feature or fix browse our [issues](https://github.com/RADAR-base/RADAR-REST-Connector/issues), and please make a pull request.
204+
Code should be formatted using
205+
the [Google Java Code Style Guide](https://google.github.io/styleguide/javaguide.html).
206+
If you want to contribute a feature or fix browse
207+
our [issues](https://github.com/RADAR-base/RADAR-REST-Connector/issues), and please make a pull
208+
request.

buildSrc/src/main/kotlin/Versions.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ object Versions {
66
const val kotlin = "1.9.22"
77
const val wrapper = "8.4"
88

9-
const val radarCommons = "1.1.2"
9+
const val radarCommons = "1.1.3"
1010
const val confluent = "7.7.0"
1111
const val kafka = "$confluent-ce"
1212
const val avro = "1.12.0"
@@ -18,6 +18,7 @@ object Versions {
1818

1919
const val log4j2 = "2.23.1"
2020
const val slf4j = "2.0.13"
21+
const val sentryLog4j = "1.7.30"
2122

2223
const val okhttp = "4.12.0"
2324

docker-compose.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ services:
180180
KAFKA_HEAP_OPTS: "-Xms256m -Xmx768m"
181181
KAFKA_BROKERS: 3
182182
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
183+
# SENTRY_LOG_LEVEL: 'ERROR'
184+
# SENTRY_DSN: 'https://000000000000.ingest.de.sentry.io/000000000000'
185+
# SENTRY_ATTACHSTACKTRACE: true
186+
# SENTRY_STACKTRACE_APP_PACKAGES: io.confluent.connect,org.radarbase.connect.rest
183187

184188
#---------------------------------------------------------------------------#
185189
# RADAR Oura connector #
@@ -221,3 +225,7 @@ services:
221225
KAFKA_HEAP_OPTS: "-Xms256m -Xmx768m"
222226
KAFKA_BROKERS: 3
223227
CONNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
228+
# SENTRY_LOG_LEVEL: 'ERROR'
229+
# SENTRY_DSN: 'https://000000000000.ingest.de.sentry.io/000000000000'
230+
# SENTRY_ATTACHSTACKTRACE: true
231+
# SENTRY_STACKTRACE_APP_PACKAGES: io.confluent.connect,org.radarbase.connect.rest

docker/kafka-wait renamed to docker/ensure

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ fi
77

88
max_timeout=32
99

10-
1110
IS_TEMP=0
1211

12+
echo "===> Wait for infrastructure ..."
13+
1314
if [ -z "$COMMAND_CONFIG_FILE_PATH" ]; then
1415
COMMAND_CONFIG_FILE_PATH="$(mktemp)"
1516
IS_TEMP=1

docker/launch

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,6 @@ if [ "$KAFKA_JMX_PORT" ]; then
3333
export JMX_PORT=$KAFKA_JMX_PORT
3434
export KAFKA_JMX_OPTS="$KAFKA_JMX_OPTS -Djava.rmi.server.hostname=$KAFKA_JMX_HOSTNAME -Dcom.sun.management.jmxremote.local.only=false -Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Dcom.sun.management.jmxremote.port=$JMX_PORT"
3535
fi
36-
#
37-
## Busy waiting loop that waits until all topic are available
38-
echo "===> Wait for infrastructure ..."
39-
kafka-wait
40-
radar_check=$?
41-
if [ "$radar_check" -ne 0 ]; then
42-
exit $radar_check
43-
fi
4436

4537
echo "===> Launching ${COMPONENT} ..."
4638
# Add our jar to the classpath so that the custom classes can be loaded first.

docker/log4j.properties.template

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# This template file was taken from the Confluent Platform distribution and modified to add Sentry support in Docker images.
2+
# See: https://docs.confluent.io/platform/current/installation/docker/development.html#log-to-external-volumes
3+
4+
log4j.rootLogger={{ env["CONNECT_LOG4J_ROOT_LOGLEVEL"] | default('INFO') }}, stdout{% if env['SENTRY_DSN'] %}, sentryAppender{% endif %}
5+
6+
7+
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
8+
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
9+
log4j.appender.stdout.layout.ConversionPattern ={{ env["CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN"] | default('[%d] %p %m (%c)%n') }}
10+
11+
# Appender for Sentry monitoring
12+
{% if env['SENTRY_DSN'] %}
13+
log4j.appender.sentryAppender=io.sentry.log4j.SentryAppender
14+
log4j.appender.sentryAppender.threshold={{ env['SENTRY_LOG_LEVEL'] | default('ERROR') }}
15+
{% endif %}
16+
17+
{% set default_loggers = {
18+
'org.reflections': 'ERROR',
19+
'org.apache.zookeeper': 'ERROR',
20+
'org.I0Itec.zkclient': 'ERROR'
21+
} -%}
22+
23+
{% if env['CONNECT_LOG4J_LOGGERS'] %}
24+
# loggers from CONNECT_LOG4J_LOGGERS env variable
25+
{% set loggers = parse_log4j_loggers(env['CONNECT_LOG4J_LOGGERS']) %}
26+
{% else %}
27+
# default log levels
28+
{% set loggers = default_loggers %}
29+
{% endif %}
30+
{% for logger,loglevel in loggers.items() %}
31+
log4j.logger.{{logger}}={{loglevel}}
32+
{% endfor %}

kafka-connect-fitbit-source/Dockerfile

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ COPY ./kafka-connect-fitbit-source/src/ /code/kafka-connect-fitbit-source/src
3232

3333
RUN gradle jar
3434

35-
FROM confluentinc/cp-kafka-connect-base:7.5.0
35+
FROM confluentinc/cp-kafka-connect-base:7.6.0
3636

3737
USER root
3838

@@ -41,7 +41,7 @@ RUN yum install -y zulu17-ca-jdk-headless && yum install -y zulu17-ca-jre-headle
4141

4242
USER appuser
4343

44-
MAINTAINER Joris Borgdorff <joris@thehyve.nl>
44+
MAINTAINER Pim van Nierop <pim@thehyve.nl>
4545

4646
LABEL description="Kafka REST API Source connector"
4747

@@ -57,7 +57,13 @@ COPY --from=builder /code/kafka-connect-rest-source/build/libs/*.jar ${CONNECT_P
5757
COPY --from=builder /code/kafka-connect-fitbit-source/build/libs/*.jar ${CONNECT_PLUGIN_PATH}/kafka-connect-fitbit-source/
5858

5959
# Load topics validator
60-
COPY --chown=appuser:appuser ./docker/kafka-wait /usr/bin/kafka-wait
60+
COPY --chown=appuser:appuser ./docker/ensure /etc/confluent/docker/ensure
6161

6262
# Load modified launcher
6363
COPY --chown=appuser:appuser ./docker/launch /etc/confluent/docker/launch
64+
65+
# Overwrite the log4j configuration to include Sentry monitoring.
66+
COPY ./docker/log4j.properties.template /etc/confluent/docker/log4j.properties.template
67+
# Copy Sentry monitoring jars.
68+
COPY --from=builder /code/kafka-connect-fitbit-source/build/third-party/sentry-* /etc/kafka-connect/jars
69+

kafka-connect-fitbit-source/build.gradle.kts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,12 @@ dependencies {
2727
compileOnly(platform("com.fasterxml.jackson:jackson-bom:${Versions.jackson}"))
2828
compileOnly("com.fasterxml.jackson.core:jackson-databind")
2929

30+
// Application monitoring
31+
// This dependency is not used by the REST connector, but copied into the Docker image (Dockerfile)
32+
compileOnly("io.sentry:sentry-log4j:${Versions.sentryLog4j}") {
33+
// Exclude log4j with security vulnerability (safe version is provided by docker image).
34+
exclude(group = "log4j", module = "log4j")
35+
}
36+
3037
testImplementation("org.apache.kafka:connect-api:${Versions.kafka}")
3138
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;
2121

2222
import java.io.IOException;
23+
import java.time.Duration;
2324
import java.util.HashMap;
2425
import java.util.List;
2526
import java.util.Map;
@@ -45,9 +46,12 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {
4546

4647
@Override
4748
public void start(Map<String, String> props) {
49+
logger.info("Starting Fitbit source connector");
4850
super.start(props);
4951
executor = Executors.newSingleThreadScheduledExecutor();
5052

53+
Duration applicationLoopInterval = config.getApplicationLoopInterval();
54+
5155
executor.scheduleAtFixedRate(() -> {
5256
if (repository.hasPendingUpdates()) {
5357
try {
@@ -66,7 +70,7 @@ public void start(Map<String, String> props) {
6670
} else {
6771
logger.info("No pending updates found. Not attempting to refresh users.");
6872
}
69-
}, 0, 5, TimeUnit.MINUTES);
73+
}, 0, applicationLoopInterval.toSeconds(), TimeUnit.SECONDS);
7074
}
7175

7276
@Override

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
100100
protected static final TemporalAmount ONE_SECOND = SECONDS.getDuration();
101101
protected static final TemporalAmount ONE_MINUTE = MINUTES.getDuration();
102102

103-
private static final Logger logger = LoggerFactory.getLogger(FitbitSleepRoute.class);
103+
private static final Logger logger = LoggerFactory.getLogger(FitbitPollingRoute.class);
104104

105105
/** Committed offsets. */
106106
private Map<String, Instant> offsets;
@@ -164,7 +164,7 @@ public void requestEmpty(RestRequest request) {
164164
@Override
165165
public void requestFailed(RestRequest request, Response response) {
166166
if (response != null && response.code() == 429) {
167-
User user = ((FitbitRestRequest)request).getUser();
167+
User user = ((FitbitRestRequest) request).getUser();
168168
tooManyRequestsForUser.add(user);
169169
String cooldownString = response.header("Retry-After");
170170
Duration cooldown = getTooManyRequestsCooldown();
@@ -179,6 +179,8 @@ public void requestFailed(RestRequest request, Response response) {
179179
lastPollPerUser.put(user.getId(), backOff);
180180
logger.info("Too many requests for user {}. Backing off until {}",
181181
user, backOff.plus(getPollIntervalPerUser()));
182+
} else if (response != null) {
183+
logger.warn("Failed to make request {}. Response is: {}", request, response);
182184
} else {
183185
logger.warn("Failed to make request {}", request);
184186
}
@@ -197,8 +199,11 @@ public Stream<FitbitRestRequest> requests() {
197199
lastPoll = Instant.now();
198200
try {
199201
return userRepository.stream()
202+
// Collect Instant of nextPoll for each user
200203
.map(u -> new AbstractMap.SimpleImmutableEntry<>(u, nextPoll(u)))
204+
// Keep users where the lastPoll is later than the nextPoll for the user (i.e., user needs to be polled)
201205
.filter(u -> lastPoll.isAfter(u.getValue()))
206+
// Sort users by nextPoll (old to new?)
202207
.sorted(Map.Entry.comparingByValue())
203208
.flatMap(u -> this.createRequests(u.getKey()))
204209
.filter(Objects::nonNull);

0 commit comments

Comments
 (0)