Skip to content

Commit 9fe5433

Browse files
top of the hour cooldown strategy for 429
1 parent 1c37e87 commit 9fe5433

File tree

2 files changed

+77
-11
lines changed

2 files changed

+77
-11
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitRestSourceConnectorConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,11 @@ public class FitbitRestSourceConnectorConfig extends RestSourceConnectorConfig {
259259
private static final String FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY = "Too many requests cooldown (s)";
260260
private static final int FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DEFAULT = 3600; // 1 hour
261261

262+
public static final String FITBIT_COOLDOWN_STRATEGY_CONFIG = "fitbit.cooldown.strategy";
263+
private static final String FITBIT_COOLDOWN_STRATEGY_DOC = "Strategy for handling too many requests cooldown. Options: ROLLING_WINDOW, TOP_OF_HOUR";
264+
private static final String FITBIT_COOLDOWN_STRATEGY_DISPLAY = "Cooldown strategy";
265+
private static final String FITBIT_COOLDOWN_STRATEGY_DEFAULT = "ROLLING_WINDOW";
266+
262267
private UserRepository userRepository;
263268
private final Headers clientCredentials;
264269

@@ -703,6 +708,16 @@ public String toString() {
703708
++orderInGroup,
704709
Width.SHORT,
705710
FITBIT_TOO_MANY_REQUESTS_COOLDOWN_DISPLAY)
711+
712+
.define(FITBIT_COOLDOWN_STRATEGY_CONFIG,
713+
Type.STRING,
714+
FITBIT_COOLDOWN_STRATEGY_DEFAULT,
715+
Importance.MEDIUM,
716+
FITBIT_COOLDOWN_STRATEGY_DOC,
717+
group,
718+
++orderInGroup,
719+
Width.SHORT,
720+
FITBIT_COOLDOWN_STRATEGY_DISPLAY)
706721
;
707722
}
708723

@@ -910,4 +925,12 @@ public int getMaxForbidden() {
910925
public int getForbiddenBackoff() {
911926
return getInt(FITBIT_FORBIDDEN_BACKOFF_CONFIG);
912927
}
928+
929+
public String getCooldownStrategy() {
930+
String value = getString(FITBIT_COOLDOWN_STRATEGY_CONFIG);
931+
if (!value.equalsIgnoreCase("ROLLING_WINDOW") && !value.equalsIgnoreCase("TOP_OF_HOUR")) {
932+
throw new ConfigException(FITBIT_COOLDOWN_STRATEGY_CONFIG, value, "Invalid cooldown strategy. Must be either ROLLING_WINDOW or TOP_OF_HOUR.");
933+
}
934+
return value;
935+
}
913936
}

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/route/FitbitPollingRoute.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.time.ZoneOffset.UTC;
2424
import java.time.ZonedDateTime;
2525
import java.time.format.DateTimeFormatter;
26+
import java.time.temporal.ChronoUnit;
2627
import static java.time.temporal.ChronoUnit.DAYS;
2728
import static java.time.temporal.ChronoUnit.MINUTES;
2829
import static java.time.temporal.ChronoUnit.NANOS;
@@ -120,6 +121,7 @@ public abstract class FitbitPollingRoute implements PollingRequestRoute {
120121
private final Map<String, Integer> forbidden403Counter;
121122
private int maxForbiddenResponses;
122123
private Duration forbidden403Cooldown;
124+
private String cooldownStrategy;
123125

124126
public FitbitPollingRoute(
125127
FitbitRequestGenerator generator,
@@ -147,6 +149,7 @@ public void initialize(RestSourceConnectorConfig config) {
147149
this.maxForbiddenResponses = fitbitConfig.getMaxForbidden();
148150
this.converter().initialize(fitbitConfig);
149151
this.forbidden403Cooldown = Duration.ofSeconds(fitbitConfig.getForbiddenBackoff());
152+
this.cooldownStrategy = fitbitConfig.getCooldownStrategy();
150153
}
151154

152155
@Override
@@ -175,19 +178,19 @@ public void requestFailed(RestRequest request, Response response) {
175178
if (response != null && response.code() == 429) {
176179
User user = ((FitbitRestRequest) request).getUser();
177180
tooManyRequestsForUser.add(user);
178-
String cooldownString = response.header("Retry-After");
179-
Duration cooldown = getTooManyRequestsCooldown();
180-
if (cooldownString != null) {
181-
try {
182-
cooldown = Duration.ofSeconds(Long.parseLong(cooldownString));
183-
} catch (NumberFormatException ex) {
184-
cooldown = getTooManyRequestsCooldown();
185-
}
181+
182+
Instant backOff;
183+
if ("TOP_OF_HOUR".equalsIgnoreCase(cooldownStrategy)) {
184+
backOff = calculateTopOfHourBackoff();
185+
logger.info("Too many requests for user {}. Using TOP_OF_HOUR strategy, backing off until: {}",
186+
user, backOff);
187+
} else {
188+
backOff = calculateRollingWindowBackoff(response);
189+
logger.info("Too many requests for user {}. Using ROLLING_WINDOW strategy, backing off until: {}",
190+
user, backOff.plus(getPollIntervalPerUser()));
186191
}
187-
Instant backOff = lastPoll.plus(cooldown);
192+
188193
lastPollPerUser.put(user.getId(), backOff);
189-
logger.info("Too many requests for user {}. Backing off until {}",
190-
user, backOff.plus(getPollIntervalPerUser()));
191194
} else if (response != null && response.code() == 403) {
192195
User user = ((FitbitRestRequest) request).getUser();
193196
String userId = user.getId();
@@ -206,6 +209,46 @@ public void requestFailed(RestRequest request, Response response) {
206209
}
207210
}
208211

212+
/**
213+
* Calculate backoff using top-of-hour strategy.
214+
* Waits until the top of the next hour to align with Fitbit's rate limit reset.
215+
*/
216+
private Instant calculateTopOfHourBackoff() {
217+
Instant now = Instant.now();
218+
Instant topOfNextHour = calculateTopOfNextHour(now).plus(ONE_SECOND);
219+
return topOfNextHour;
220+
}
221+
222+
/**
223+
* Calculate backoff using rolling window strategy.
224+
* Uses configured cooldown duration from when the error occurred.
225+
*/
226+
private Instant calculateRollingWindowBackoff(Response response) {
227+
String cooldownString = response.header("Retry-After");
228+
Duration cooldown = getTooManyRequestsCooldown();
229+
230+
if (cooldownString != null) {
231+
try {
232+
cooldown = Duration.ofSeconds(Long.parseLong(cooldownString));
233+
} catch (NumberFormatException ex) {
234+
cooldown = getTooManyRequestsCooldown();
235+
}
236+
}
237+
238+
return lastPoll.plus(cooldown);
239+
}
240+
241+
/**
242+
* Calculate the top of the next hour from the given instant.
243+
*/
244+
private Instant calculateTopOfNextHour(Instant instant) {
245+
ZonedDateTime zonedDateTime = instant.atZone(UTC);
246+
ZonedDateTime topOfNextHour = zonedDateTime
247+
.plusHours(1)
248+
.truncatedTo(ChronoUnit.HOURS);
249+
return topOfNextHour.toInstant();
250+
}
251+
209252
/**
210253
* Actually construct requests, based on the current offset
211254
* @param user Fitbit user

0 commit comments

Comments
 (0)