Skip to content

Commit a6905ff

Browse files
authored
feat: configurable http codes to new access token (#290)
1 parent 712c44c commit a6905ff

File tree

4 files changed

+88
-12
lines changed

4 files changed

+88
-12
lines changed

src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public final class HttpSinkConfig extends AbstractConfig {
6565
private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode";
6666
private static final String OAUTH2_CLIENT_SCOPE_CONFIG = "oauth2.client.scope";
6767
private static final String OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG = "oauth2.response.token.property";
68+
private static final String OAUTH2_TOKEN_RENEW_ON_STATUS_CODES_CONFIG ="oauth2.token.renew.on.status.codes";
6869

6970
private static final String BATCHING_GROUP = "Batching";
7071
private static final String BATCHING_ENABLED_CONFIG = "batching.enabled";
@@ -448,6 +449,48 @@ public String toString() {
448449
List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG,
449450
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG)
450451
);
452+
configDef.define(
453+
OAUTH2_TOKEN_RENEW_ON_STATUS_CODES_CONFIG,
454+
ConfigDef.Type.LIST,
455+
"401",
456+
new ConfigDef.Validator() {
457+
458+
@Override
459+
public void ensureValid(String name, Object value) {
460+
if (value == null) {
461+
throw new ConfigException(name, null, "can't be null");
462+
}
463+
if (!(value instanceof List)) {
464+
throw new ConfigException(name, value, "must be a list");
465+
}
466+
for (Object entry : (List) value) {
467+
if (!(entry instanceof String) || !isNumber((String) entry)) {
468+
throw new ConfigException(name, value, "must be a list of numbers");
469+
}
470+
}
471+
}
472+
473+
private boolean isNumber(String value) {
474+
try {
475+
Integer.parseInt(value);
476+
return true;
477+
} catch (NumberFormatException e) {
478+
return false;
479+
}
480+
}
481+
482+
@Override
483+
public String toString() {
484+
return "List of HTTP status codes";
485+
}
486+
},
487+
ConfigDef.Importance.LOW,
488+
"Comma separated list of HTTP response status codes which should trigger an access token renewal",
489+
CONNECTION_GROUP,
490+
groupCounter++,
491+
ConfigDef.Width.LONG,
492+
OAUTH2_TOKEN_RENEW_ON_STATUS_CODES_CONFIG
493+
);
451494
}
452495

453496
private static void addBatchingConfigGroup(final ConfigDef configDef) {
@@ -819,6 +862,10 @@ public final String oauth2ResponseTokenProperty() {
819862
return getString(OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG);
820863
}
821864

865+
public final List<Integer> oauth2RenewTokenOnStatusCodes() {
866+
return getList(OAUTH2_TOKEN_RENEW_ON_STATUS_CODES_CONFIG).stream().map(Integer::valueOf).collect(Collectors.toList());
867+
}
868+
822869
public final boolean hasProxy() {
823870
return getString(HTTP_PROXY_HOST) != null;
824871
}

src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ class OAuth2HttpSender extends AbstractHttpSender implements HttpSender {
4848
protected HttpResponse<String> sendWithRetries(
4949
final Builder requestBuilder, final HttpResponseHandler originHttpResponseHandler, final int retries
5050
) {
51-
// This handler allows to request a new access token if a 401 occurs, meaning the session might be expired
51+
// This handler allows to request a new access token if a an expected error occurs, meaning the session might be expired
5252
final HttpResponseHandler handler = (response, remainingRetries) -> {
53-
// If the response has a 401 error and we have retries left, we attempt to renew the session
54-
if (response.statusCode() == 401 && remainingRetries > 0) {
53+
// If the response has one of the configured error codes and we have retries left, we attempt to renew the session
54+
if (config.oauth2RenewTokenOnStatusCodes().contains(response.statusCode()) && remainingRetries > 0) {
5555
// Update the request builder with the new access token
5656
((OAuth2AuthHttpRequestBuilder) this.httpRequestBuilder).renewAccessToken(requestBuilder);
5757
// Retry the call and decrease the retries counter to avoid looping on token renewal

src/test/java/io/aiven/kafka/connect/http/config/HttpSinkConfigTest.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.net.URISyntaxException;
2222
import java.util.Collections;
2323
import java.util.HashMap;
24+
import java.util.List;
2425
import java.util.Map;
2526
import java.util.concurrent.TimeUnit;
2627
import java.util.stream.Stream;
@@ -77,7 +78,9 @@ void correctMinimalConfig() throws URISyntaxException {
7778
.returns("access_token", from(HttpSinkConfig::oauth2ResponseTokenProperty))
7879
.returns(null, from(HttpSinkConfig::kafkaRetryBackoffMs))
7980
.returns(false, from(HttpSinkConfig::hasProxy))
80-
.returns(false, from(HttpSinkConfig::sslTrustAllCertificates));
81+
.returns(false, from(HttpSinkConfig::sslTrustAllCertificates))
82+
.returns(List.of(401), from(HttpSinkConfig::oauth2RenewTokenOnStatusCodes));
83+
8184
}
8285

8386
@Test
@@ -195,7 +198,8 @@ void validOAuth2FullConfiguration() throws URISyntaxException {
195198
"oauth2.client.secret", "client_secret",
196199
"oauth2.client.authorization.mode", "url",
197200
"oauth2.client.scope", "scope1,scope2",
198-
"oauth2.response.token.property", "moooooo"
201+
"oauth2.response.token.property", "moooooo",
202+
"oauth2.token.renew.on.status.codes", "401, 403"
199203
);
200204

201205
final var config = new HttpSinkConfig(oauth2Config);
@@ -206,7 +210,22 @@ void validOAuth2FullConfiguration() throws URISyntaxException {
206210
.returns("client_secret", from(httpSinkConfig -> httpSinkConfig.oauth2ClientSecret().value()))
207211
.returns("scope1,scope2", from(HttpSinkConfig::oauth2ClientScope))
208212
.returns(OAuth2AuthorizationMode.URL, from(HttpSinkConfig::oauth2AuthorizationMode))
209-
.returns("moooooo", from(HttpSinkConfig::oauth2ResponseTokenProperty));
213+
.returns("moooooo", from(HttpSinkConfig::oauth2ResponseTokenProperty))
214+
.returns(List.of(401, 403), from(HttpSinkConfig::oauth2RenewTokenOnStatusCodes));
215+
}
216+
217+
@Test
218+
void invalidHttpErrorCode() {
219+
final Map<String, String> properties = Map.of(
220+
"http.url", "http://localhost:8090",
221+
"http.authorization.type", "none",
222+
"oauth2.token.renew.on.status.codes", "wrong"
223+
);
224+
225+
assertThatExceptionOfType(ConfigException.class)
226+
.describedAs("Expected config exception due to malformed status code list")
227+
.isThrownBy(() -> new HttpSinkConfig(properties))
228+
.withMessage("Invalid value [wrong] for configuration oauth2.token.renew.on.status.codes: must be a list of numbers");
210229
}
211230

212231
@Test

src/test/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSenderTest.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.assertj.core.api.InstanceOfAssertFactories;
3535
import org.junit.jupiter.api.Test;
3636
import org.junit.jupiter.api.extension.ExtendWith;
37+
import org.junit.jupiter.params.ParameterizedTest;
38+
import org.junit.jupiter.params.provider.CsvSource;
39+
import org.junit.jupiter.params.provider.ValueSource;
3740
import org.mockito.ArgumentCaptor;
3841
import org.mockito.Mock;
3942
import org.mockito.Mockito;
@@ -228,8 +231,13 @@ void reuseAccessToken() throws Exception {
228231

229232
}
230233

231-
@Test
232-
void refreshAccessToken() throws Exception {
234+
@ParameterizedTest
235+
@CsvSource( {
236+
"'401' , '401'",
237+
"'401,403', '401'",
238+
"'401,403', '403'"
239+
})
240+
void refreshAccessToken(final String configuredErrorCodes, final String sendErrorCode) throws Exception {
233241

234242
// first call to retrieve an access token
235243
final HttpResponse<String> mockedAccessTokenResponse = mock(HttpResponse.class);
@@ -241,15 +249,17 @@ void refreshAccessToken() throws Exception {
241249
when(oauth2AccessTokenHttpSender.call()).thenReturn(
242250
mockedAccessTokenResponse, mockedAccessTokenResponseRefreshed);
243251

244-
// Mock a 2nd response with 401.
252+
// Mock a 2nd response with error code.
245253
final HttpResponse<String> errorResponse = mock(HttpResponse.class);
246-
when(errorResponse.statusCode()).thenReturn(401);
247-
// Mock a 2nd response with 401.
254+
when(errorResponse.statusCode()).thenReturn(Integer.parseInt(sendErrorCode));
255+
// Mock a 2nd response with 200.
248256
final HttpResponse<String> normalResponse = mock(HttpResponse.class);
249257
when(normalResponse.statusCode()).thenReturn(200);
250258

251259
// Build the configuration
252-
final HttpSinkConfig config = new HttpSinkConfig(defaultConfig());
260+
Map<String,String> configMap =new HashMap<>(defaultConfig());
261+
configMap.put("oauth2.token.renew.on.status.codes", configuredErrorCodes);
262+
final HttpSinkConfig config = new HttpSinkConfig(configMap);
253263

254264
// Mock the Client and Response
255265
when(mockedClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(mockedResponse,

0 commit comments

Comments
 (0)