Skip to content

Commit e50c3cc

Browse files
committed
Impl. configuration of application loop and user service cache refresh intervals
Needed for efficient e2e testing.
1 parent 39ca6dc commit e50c3cc

File tree

4 files changed

+53
-6
lines changed

4 files changed

+53
-6
lines changed

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/FitbitSourceConnector.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.radarbase.connect.rest.fitbit.FitbitRestSourceConnectorConfig.FITBIT_USERS_CONFIG;
2121

2222
import java.io.IOException;
23+
import java.time.Duration;
2324
import java.util.HashMap;
2425
import java.util.List;
2526
import java.util.Map;
@@ -45,9 +46,12 @@ public class FitbitSourceConnector extends AbstractRestSourceConnector {
4546

4647
@Override
4748
public void start(Map<String, String> props) {
49+
logger.info("Starting Fitbit source connector");
4850
super.start(props);
4951
executor = Executors.newSingleThreadScheduledExecutor();
5052

53+
Duration applicationLoopInterval = config.getApplicationLoopInterval();
54+
5155
executor.scheduleAtFixedRate(() -> {
5256
if (repository.hasPendingUpdates()) {
5357
try {
@@ -66,7 +70,7 @@ public void start(Map<String, String> props) {
6670
} else {
6771
logger.info("No pending updates found. Not attempting to refresh users.");
6872
}
69-
}, 0, 5, TimeUnit.MINUTES);
73+
}, 0, applicationLoopInterval.toSeconds(), TimeUnit.SECONDS);
7074
}
7175

7276
@Override

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepository.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ import java.io.IOException
6161
import java.util.concurrent.ConcurrentHashMap
6262
import java.util.stream.Stream
6363
import kotlin.time.Duration.Companion.days
64-
import kotlin.time.Duration.Companion.hours
6564
import kotlin.time.Duration.Companion.minutes
6665
import kotlin.time.Duration.Companion.seconds
66+
import kotlin.time.toKotlinDuration
6767

6868
@Suppress("unused")
6969
class ServiceUserRepository : UserRepository {
@@ -92,8 +92,12 @@ class ServiceUserRepository : UserRepository {
9292
clientSecret = config.fitbitUserRepositoryClientSecret,
9393
)
9494

95+
val refreshDuration = config.userCacheRefreshInterval.toKotlinDuration()
9596
userCache = CachedSet(
96-
CacheConfig(refreshDuration = 1.hours, retryDuration = 1.minutes),
97+
CacheConfig(
98+
refreshDuration = refreshDuration,
99+
retryDuration = if (refreshDuration > 1.minutes) 1.minutes else refreshDuration,
100+
),
97101
) {
98102
makeRequest<Users> { url("users?source-type=FitBit") }
99103
.users

kafka-connect-fitbit-source/src/main/java/org/radarbase/connect/rest/fitbit/user/ServiceUserRepositoryLegacy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,4 +246,4 @@ private <T> T makeRequest(Request request, ObjectReader reader) throws IOExcepti
246246
throw new IOException("Failed to make request to user repository", ex);
247247
}
248248
}
249-
}
249+
}

kafka-connect-rest-source/src/main/java/org/radarbase/connect/rest/RestSourceConnectorConfig.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,15 @@
4545
public class RestSourceConnectorConfig extends AbstractConfig {
4646
public static final Pattern COLON_PATTERN = Pattern.compile(":");
4747

48+
public static final String APPLICATION_LOOP_INTERVAL_CONFIG = "application.loop.interval.ms";
49+
private static final String APPLICATION_LOOP_INTERVAL_DOC = "How often to perform the main application loop.";
50+
private static final String APPLICATION_LOOP_INTERVAL_DISPLAY = "Application loop interval";
51+
private static final Long APPLICATION_LOOP_INTERVAL_DEFAULT = 300000L; // 5 minutes
52+
4853
private static final String SOURCE_POLL_INTERVAL_CONFIG = "rest.source.poll.interval.ms";
4954
private static final String SOURCE_POLL_INTERVAL_DOC = "How often to poll the source URL.";
5055
private static final String SOURCE_POLL_INTERVAL_DISPLAY = "Polling interval";
51-
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L;
56+
private static final Long SOURCE_POLL_INTERVAL_DEFAULT = 60000L; // 1 minute
5257

5358
static final String SOURCE_URL_CONFIG = "rest.source.base.url";
5459
private static final String SOURCE_URL_DOC = "Base URL for REST source connector.";
@@ -81,6 +86,11 @@ public class RestSourceConnectorConfig extends AbstractConfig {
8186
"Class to be used to generate REST requests";
8287
private static final String REQUEST_GENERATOR_DISPLAY = "Request generator class";
8388

89+
public static final String USER_CACHE_REFRESH_INTERVAL_CONFIG = "user.cache.refresh.interval.ms";
90+
private static final String USER_CACHE_REFRESH_INTERVAL_DOC = "How often to poll for new user registrations.";
91+
private static final String USER_CACHE_REFRESH_INTERVAL_DISPLAY = "Refresh interval";
92+
private static final Long USER_CACHE_REFRESH_INTERVAL_DEFAULT = 3600000L; // 1 hour
93+
8494
private final TopicSelector topicSelector;
8595
private final PayloadToSourceRecordConverter payloadToSourceRecordConverter;
8696
private final RequestGenerator requestGenerator;
@@ -171,7 +181,36 @@ public static ConfigDef conf() {
171181
++orderInGroup,
172182
Width.SHORT,
173183
REQUEST_GENERATOR_DISPLAY)
174-
;
184+
185+
.define(APPLICATION_LOOP_INTERVAL_CONFIG,
186+
Type.LONG,
187+
APPLICATION_LOOP_INTERVAL_DEFAULT,
188+
Importance.LOW,
189+
APPLICATION_LOOP_INTERVAL_DOC,
190+
group,
191+
++orderInGroup,
192+
Width.SHORT,
193+
APPLICATION_LOOP_INTERVAL_DISPLAY)
194+
195+
.define(USER_CACHE_REFRESH_INTERVAL_CONFIG,
196+
Type.LONG,
197+
USER_CACHE_REFRESH_INTERVAL_DEFAULT,
198+
Importance.LOW,
199+
USER_CACHE_REFRESH_INTERVAL_DOC,
200+
group,
201+
++orderInGroup,
202+
Width.SHORT,
203+
USER_CACHE_REFRESH_INTERVAL_DISPLAY
204+
205+
);
206+
}
207+
208+
public Duration getApplicationLoopInterval() {
209+
return Duration.ofMillis(this.getLong(APPLICATION_LOOP_INTERVAL_CONFIG));
210+
}
211+
212+
public Duration getUserCacheRefreshInterval() {
213+
return Duration.ofMillis(this.getLong(USER_CACHE_REFRESH_INTERVAL_CONFIG));
175214
}
176215

177216
public Duration getPollInterval() {

0 commit comments

Comments
 (0)