Skip to content

Commit b96beec

Browse files
Make forbidden backoff time configurable
1 parent 05e285e commit b96beec

File tree

2 files changed

+41
-18
lines changed

2 files changed

+41
-18
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.radarbase.connect.rest.fitbit;
1919

20-
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
21-
2220
import java.lang.reflect.InvocationTargetException;
2321
import java.net.MalformedURLException;
2422
import java.net.URL;
@@ -31,11 +29,9 @@
3129
import java.util.List;
3230
import java.util.Map;
3331

34-
import okhttp3.Headers;
35-
import okhttp3.HttpUrl;
36-
3732
import org.apache.kafka.common.config.ConfigDef;
3833
import org.apache.kafka.common.config.ConfigDef.Importance;
34+
import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
3935
import org.apache.kafka.common.config.ConfigDef.NonEmptyString;
4036
import org.apache.kafka.common.config.ConfigDef.Type;
4137
import org.apache.kafka.common.config.ConfigDef.Validator;
@@ -47,6 +43,9 @@
4743
import org.radarbase.connect.rest.fitbit.user.UserRepository;
4844
import org.radarbase.connect.rest.fitbit.user.YamlUserRepository;
4945

46+
import okhttp3.Headers;
47+
import okhttp3.HttpUrl;
48+
5049
public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
5150
public static final String FITBIT_USERS_CONFIG = "fitbit.users";
5251
private static final String FITBIT_USERS_DOC =
@@ -250,6 +249,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
250249
private static final String FITBIT_MAX_FORBIDDEN_DISPLAY = "Max forbidden responses";
251250
private static final int FITBIT_MAX_FORBIDDEN_DEFAULT = 3;
252251

252+
private static final String FITBIT_FORBIDDEN_BACKOFF_CONFIG = "fitbit.request.forbidden.backoff.s";
253+
private static final String FITBIT_FORBIDDEN_BACKOFF_DOC = "Backoff time in seconds between forbidden requests.";
254+
private static final String FITBIT_FORBIDDEN_BACKOFF_DISPLAY = "Forbidden backoff time (s)";
255+
private static final int FITBIT_FORBIDDEN_BACKOFF_DEFAULT = 86400; // 24 hours
256+
253257

254258
private UserRepository userRepository;
255259
private final Headers clientCredentials;
@@ -699,6 +703,18 @@ public String toString() {
699703
++orderInGroup,
700704
Width.SHORT,
701705
FITBIT_MAX_FORBIDDEN_DISPLAY)
706+
707+
.define(FITBIT_FORBIDDEN_BACKOFF_CONFIG,
708+
Type.INT,
709+
FITBIT_FORBIDDEN_BACKOFF_DEFAULT,
710+
Importance.MEDIUM,
711+
FITBIT_FORBIDDEN_BACKOFF_DOC,
712+
group,
713+
++orderInGroup,
714+
Width.SHORT,
715+
FITBIT_FORBIDDEN_BACKOFF_DISPLAY)
716+
717+
702718
;
703719
}
704720

@@ -902,4 +918,8 @@ public URL getFitbitUserRepositoryTokenUrl() {
902918
public int getMaxForbidden() {
903919
return getInt(FITBIT_MAX_FORBIDDEN_CONFIG);
904920
}
921+
922+
public int getForbiddenBackoff() {
923+
return getInt(FITBIT_FORBIDDEN_BACKOFF_CONFIG);
924+
}
905925
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,16 @@
1717

1818
package org.radarbase.connect.rest.fitbit.route;
1919

20-
import static java.time.ZoneOffset.UTC;
21-
import static java.time.temporal.ChronoUnit.DAYS;
22-
import static java.time.temporal.ChronoUnit.MINUTES;
23-
import static java.time.temporal.ChronoUnit.NANOS;
24-
import static java.time.temporal.ChronoUnit.SECONDS;
25-
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT;
26-
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.TIMESTAMP_OFFSET_KEY;
27-
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.nearFuture;
28-
import static org.radarbase.connect.rest.request.PollingRequestRoute.max;
29-
3020
import java.io.IOException;
3121
import java.time.Duration;
3222
import java.time.Instant;
23+
import static java.time.ZoneOffset.UTC;
3324
import java.time.ZonedDateTime;
3425
import java.time.format.DateTimeFormatter;
26+
import static java.time.temporal.ChronoUnit.DAYS;
27+
import static java.time.temporal.ChronoUnit.MINUTES;
28+
import static java.time.temporal.ChronoUnit.NANOS;
29+
import static java.time.temporal.ChronoUnit.SECONDS;
3530
import java.time.temporal.TemporalAmount;
3631
import java.util.AbstractMap;
3732
import java.util.Comparator;
@@ -42,11 +37,13 @@
4237
import java.util.concurrent.ConcurrentHashMap;
4338
import java.util.stream.Collectors;
4439
import java.util.stream.Stream;
45-
import okhttp3.Request;
46-
import okhttp3.Response;
40+
4741
import org.apache.kafka.connect.source.SourceRecord;
4842
import org.apache.kafka.connect.storage.OffsetStorageReader;
4943
import org.radarbase.connect.rest.RestSourceConnectorConfig;
44+
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.MIN_INSTANT;
45+
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.TIMESTAMP_OFFSET_KEY;
46+
import static org.radarbase.connect.rest.converter.PayloadToSourceRecordConverter.nearFuture;
5047
import org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig;
5148
import org.radarbase.connect.rest.fitbit.request.FitbitRequestGenerator;
5249
import org.radarbase.connect.rest.fitbit.request.FitbitRestRequest;
@@ -55,10 +52,14 @@
5552
import org.radarbase.connect.rest.fitbit.user.UserRepository;
5653
import org.radarbase.connect.rest.fitbit.util.DateRange;
5754
import org.radarbase.connect.rest.request.PollingRequestRoute;
55+
import static org.radarbase.connect.rest.request.PollingRequestRoute.max;
5856
import org.radarbase.connect.rest.request.RestRequest;
5957
import org.slf4j.Logger;
6058
import org.slf4j.LoggerFactory;
6159

60+
import okhttp3.Request;
61+
import okhttp3.Response;
62+
6263
/**
6364
* Route for regular polling.
6465
*
@@ -118,6 +119,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
118119
private Duration tooManyRequestsCooldown;
119120
private final Map<String, Integer> forbidden403Counter;
120121
private int maxForbiddenResponses;
122+
private Duration forbidden403Cooldown;
121123

122124
public FitbitPollingRoute(
123125
FitbitRequestGenerator generator,
@@ -144,6 +146,7 @@ public void initialize(RestSourceConnectorConfig config) {
144146
.minus(getPollIntervalPerUser());
145147
this.maxForbiddenResponses = fitbitConfig.getMaxForbidden();
146148
this.converter().initialize(fitbitConfig);
149+
this.forbidden403Cooldown = Duration.ofSeconds(fitbitConfig.getForbiddenBackoff());
147150
}
148151

149152
@Override
@@ -190,7 +193,7 @@ public void requestFailed(RestRequest request, Response response) {
190193
String userId = user.getId();
191194
int count = forbidden403Counter.compute(userId, (k, v) -> v == null ? 1 : v + 1);
192195
if (count >= maxForbiddenResponses) {
193-
Instant backOff = lastPoll.plus(ONE_DAY);
196+
Instant backOff = lastPoll.plus(this.forbidden403Cooldown);
194197
lastPollPerUser.put(userId, backOff);
195198
forbidden403Counter.remove(userId);
196199
logger.warn("User {} reached max 403 responses for route {}. Backing off until {}",

0 commit comments

Comments
 (0)