diff --git a/README.md b/README.md index 8d6bcad3..06b99f09 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ We consider the connector **stable** despite the major version is currently 0. ## How it works -The connector uses the POST HTTP method to deliver records. +As default the connector uses the POST HTTP method to deliver records. The connector supports: - authorization (static, OAuth2); diff --git a/docs/sink-connector-config-options.rst b/docs/sink-connector-config-options.rst index 03cffc66..1c4deb78 100644 --- a/docs/sink-connector-config-options.rst +++ b/docs/sink-connector-config-options.rst @@ -12,6 +12,14 @@ Connection * Valid Values: HTTP(S) URL * Importance: high +``http.method`` + The HTTP Method to use when send the data. + + * Type: string + * Default: "POST" + * Valid Values: [POST, PUT] + * Importance: low + ``http.authorization.type`` The HTTP authorization type. diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java new file mode 100644 index 00000000..f0a5ad64 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpMethodsType.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.config; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Objects; +import java.util.stream.Collectors; + +public enum HttpMethodsType { + POST("POST"), + PUT("PUT"); + + public final String name; + + HttpMethodsType(final String name) { + this.name = name; + } + + public static HttpMethodsType forName(final String name) { + Objects.requireNonNull(name); + return Arrays.stream(values()) + .filter(v -> v.name.equalsIgnoreCase(name)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException("HTTP Method type: " + name)); + } + + public static final Collection NAMES = + Arrays.stream(values()).map(v -> v.name).collect(Collectors.toList()); +} diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index ed486c38..489564f4 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -41,6 +41,7 @@ public final class HttpSinkConfig extends AbstractConfig { private static final String CONNECTION_GROUP = "Connection"; private static final String HTTP_URL_CONFIG = "http.url"; + private static final String HTTP_METHOD = "http.method"; private static final String HTTP_PROXY_HOST = "http.proxy.host"; private static final String HTTP_PROXY_PORT = "http.proxy.port"; private static final String HTTP_SSL_TRUST_ALL_CERTIFICATES = "http.ssl.trust.all.certs"; @@ -58,6 +59,7 @@ public final class HttpSinkConfig extends AbstractConfig { private static final String OAUTH2_GRANT_TYPE_CONFIG = "oauth2.grant.type"; private static final String OAUTH2_CLIENT_ID_PROP_CONFIG = "oauth2.request.client.id.property"; private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id"; + private static final String OAUTH2_BODY_PARAMS = "oauth2.body.params"; private static final String OAUTH2_CLIENT_SECRET_PROP_CONFIG = "oauth2.request.client.secret.property"; private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret"; private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode"; @@ -253,6 +255,23 @@ public boolean visible(final String name, final Map parsedConfig OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG) ); + configDef.define( + OAUTH2_BODY_PARAMS, + ConfigDef.Type.STRING, + null, + new ConfigDef.NonEmptyStringWithoutControlChars() { + @Override + public String toString() { + return "OAuth2 additional params"; + } + }, + ConfigDef.Importance.HIGH, + "Additional params to add to the body.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.LONG, + OAUTH2_BODY_PARAMS + ); configDef.define(OAUTH2_GRANT_TYPE_PROP_CONFIG, ConfigDef.Type.STRING, "grant_type", @@ -424,6 +443,41 @@ public String toString() { List.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG, OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG) ); + + configDef.define( + HTTP_METHOD, + ConfigDef.Type.STRING, + "POST", + new ConfigDef.Validator() { + @Override + @SuppressFBWarnings("NP_LOAD_OF_KNOWN_NULL_VALUE") // Suppress the ConfigException with null value. + public void ensureValid(final String name, final Object value) { + if (value == null) { + throw new ConfigException(HTTP_METHOD, value); + } + assert value instanceof String; + final String valueStr = (String) value; + if (!HttpMethodsType.NAMES.contains(valueStr)) { + throw new ConfigException( + HTTP_METHOD, valueStr, + "supported values are: " + HttpMethodsType.NAMES); + } + } + + @Override + public String toString() { + return HttpMethodsType.NAMES.toString(); + } + }, + ConfigDef.Importance.LOW, + "The HTTP Method to use when send the data.", + CONNECTION_GROUP, + groupCounter++, + ConfigDef.Width.SHORT, + HTTP_METHOD, + FixedSetRecommender.ofSupportedValues(HttpMethodsType.NAMES) + ); + } private static void addBatchingConfigGroup(final ConfigDef configDef) { @@ -676,6 +730,10 @@ public final URI httpUri() { return toURI(HTTP_URL_CONFIG); } + public final HttpMethodsType httpMethod() { + return HttpMethodsType.valueOf(getString(HTTP_METHOD)); + } + public final Long kafkaRetryBackoffMs() { return getLong(KAFKA_RETRY_BACKOFF_MS_CONFIG); } @@ -767,6 +825,10 @@ public final String oauth2GrantType() { return getString(OAUTH2_GRANT_TYPE_CONFIG); } + public final String getOauth2BodyParams() { + return getString(OAUTH2_BODY_PARAMS); + } + public final String oauth2ClientIdProperty() { return getString(OAUTH2_CLIENT_ID_PROP_CONFIG); } diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index 0fa5f520..7fc5065d 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -37,23 +38,46 @@ abstract class AbstractHttpSender { protected final HttpClient httpClient; protected final HttpSinkConfig config; + protected final HttpMethodsType method; protected final HttpRequestBuilder httpRequestBuilder; protected AbstractHttpSender( - final HttpSinkConfig config, final HttpRequestBuilder httpRequestBuilder, final HttpClient httpClient + final HttpSinkConfig config, + final HttpRequestBuilder httpRequestBuilder, + final HttpClient httpClient, + final HttpMethodsType method ) { this.config = Objects.requireNonNull(config); this.httpRequestBuilder = Objects.requireNonNull(httpRequestBuilder); this.httpClient = Objects.requireNonNull(httpClient); + this.method = method; } public final HttpResponse send(final String body) { - final var requestBuilder = - httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + final var requestBuilder = prepareRequest(body); return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER, config.maxRetries()); } + // seth http bethod based on config + private Builder prepareRequest(final String body) { + if(method == null) { + switch (config.httpMethod()) { + case POST: + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + case PUT: + return httpRequestBuilder + .build(config).PUT(HttpRequest.BodyPublishers.ofString(body)); + default: + throw new ConnectException("Unsupported HTTP method: " + config.httpMethod()); + } + } else + return httpRequestBuilder + .build(config).POST(HttpRequest.BodyPublishers.ofString(body)); + + } + /** * Sends an HTTP body using {@code httpSender}, respecting the configured retry policy. * diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java index f461d85d..85f3b593 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/DefaultHttpSender.java @@ -21,12 +21,13 @@ import java.net.http.HttpRequest.Builder; import java.time.Duration; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import io.aiven.kafka.connect.http.config.HttpSinkConfig; class DefaultHttpSender extends AbstractHttpSender implements HttpSender { DefaultHttpSender(final HttpSinkConfig config, final HttpClient client) { - super(config, new DefaultHttpRequestBuilder(), client); + super(config, new DefaultHttpRequestBuilder(), client, null); } static class DefaultHttpRequestBuilder implements HttpRequestBuilder { diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java index 0341f3eb..6d1a5813 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2AccessTokenHttpSender.java @@ -24,6 +24,7 @@ import java.util.Base64; import java.util.Objects; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import io.aiven.kafka.connect.http.config.HttpSinkConfig; import io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode; import io.aiven.kafka.connect.http.sender.request.OAuth2AccessTokenRequestForm; @@ -33,7 +34,7 @@ class OAuth2AccessTokenHttpSender extends AbstractHttpSender implements HttpSender { OAuth2AccessTokenHttpSender(final HttpSinkConfig config, final HttpClient httpClient) { - super(config, new AccessTokenHttpRequestBuilder(), httpClient); + super(config, new AccessTokenHttpRequestBuilder(), httpClient, HttpMethodsType.POST); } HttpResponse call() { @@ -41,6 +42,7 @@ HttpResponse call() { .newBuilder() .withGrantTypeProperty(config.oauth2GrantTypeProperty()) .withGrantType(config.oauth2GrantType()) + .withBodyParams(config.getOauth2BodyParams()) .withScope(config.oauth2ClientScope()); if (config.oauth2AuthorizationMode() == OAuth2AuthorizationMode.URL) { diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java index e8dffdd1..90fd8b69 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/OAuth2HttpSender.java @@ -23,6 +23,7 @@ import java.net.http.HttpResponse; import java.util.Map; +import io.aiven.kafka.connect.http.config.HttpMethodsType; import org.apache.kafka.connect.errors.ConnectException; import io.aiven.kafka.connect.http.config.HttpSinkConfig; @@ -41,7 +42,7 @@ class OAuth2HttpSender extends AbstractHttpSender implements HttpSender { final HttpClient httpClient, final OAuth2AccessTokenHttpSender oauth2AccessTokenHttpSender ) { - super(config, new OAuth2AuthHttpRequestBuilder(config, oauth2AccessTokenHttpSender), httpClient); + super(config, new OAuth2AuthHttpRequestBuilder(config, oauth2AccessTokenHttpSender), httpClient, null); } @Override diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java index 5bc0b202..a5f49c50 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/StaticAuthHttpSender.java @@ -25,7 +25,7 @@ class StaticAuthHttpSender extends AbstractHttpSender implements HttpSender { StaticAuthHttpSender(final HttpSinkConfig config, final HttpClient client) { - super(config, new StaticAuthHttpRequestBuilder(), client); + super(config, new StaticAuthHttpRequestBuilder(), client, null); } private static class StaticAuthHttpRequestBuilder extends DefaultHttpRequestBuilder { diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java b/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java index f0202b5a..12eca1fc 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/request/OAuth2AccessTokenRequestForm.java @@ -34,6 +34,7 @@ public class OAuth2AccessTokenRequestForm { private final String clientSecretProperty; private final String clientSecret; + private final String bodyParams; private OAuth2AccessTokenRequestForm( final String grantTypeProperty, @@ -42,7 +43,8 @@ private OAuth2AccessTokenRequestForm( final String clientIdProperty, final String clientId, final String clientSecretProperty, - final String clientSecret + final String clientSecret, + final String bodyParams ) { this.grantTypeProperty = grantTypeProperty; this.grantType = grantType; @@ -51,6 +53,7 @@ private OAuth2AccessTokenRequestForm( this.clientId = clientId; this.clientSecretProperty = clientSecretProperty; this.clientSecret = clientSecret; + this.bodyParams = bodyParams; } public String toBodyString() { @@ -58,6 +61,9 @@ public String toBodyString() { if (scope != null) { stringJoiner.add(encodeNameAndValue(SCOPE, scope)); } + if (bodyParams != null) { + stringJoiner.add(bodyParams); + } if (clientId != null && clientSecret != null) { stringJoiner .add(encodeNameAndValue(clientIdProperty, clientId)) @@ -89,6 +95,7 @@ public static class Builder { private String clientSecretProperty; private String clientSecret; + private String bodyParams; private Builder() { } @@ -103,6 +110,11 @@ public Builder withGrantType(final String grantType) { return this; } + public Builder withBodyParams(final String bodyParams) { + this.bodyParams = bodyParams; + return this; + } + public Builder withScope(final String scope) { this.scope = scope; return this; @@ -144,7 +156,8 @@ public OAuth2AccessTokenRequestForm build() { } return new OAuth2AccessTokenRequestForm( - grantTypeProperty, grantType, scope, clientIdProperty, clientId, clientSecretProperty, clientSecret); + grantTypeProperty, grantType, scope, clientIdProperty, clientId, + clientSecretProperty, clientSecret, bodyParams); } } diff --git a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java index f39fd7d4..16056ca4 100644 --- a/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java +++ b/src/test/java/io/aiven/kafka/connect/http/sender/DefaultHttpSenderTest.java @@ -24,10 +24,11 @@ import java.util.List; import java.util.Map; -import org.apache.kafka.connect.errors.ConnectException; - +import io.aiven.kafka.connect.http.config.HttpMethodsType; import io.aiven.kafka.connect.http.config.HttpSinkConfig; +import org.apache.kafka.connect.errors.ConnectException; + import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -46,6 +47,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; @ExtendWith(MockitoExtension.class) public class DefaultHttpSenderTest extends HttpSenderTestBase { @@ -87,7 +89,7 @@ void shouldBuildDefaultHttpRequest() throws Exception { .isPresent() .get(as(InstanceOfAssertFactories.DURATION)) .hasSeconds(config.httpTimeout()); - assertThat(httpRequest.method()).isEqualTo("POST"); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.POST.name()); assertThat(httpRequest .headers() @@ -100,6 +102,54 @@ void shouldBuildDefaultHttpRequest() throws Exception { } + @Test + void shouldBuildDefaultHttpPutRequest() throws Exception { + final var configBase = new HashMap<>(defaultConfig()); + configBase.put("http.method", "PUT"); + + // Build the configuration + final HttpSinkConfig config = new HttpSinkConfig(configBase); + + // Mock the Http Client and Http Response + when(mockedClient.send(any(HttpRequest.class), any(BodyHandler.class))).thenReturn(mockedResponse); + + // Create a spy on the HttpSender implementation to capture methods parameters + final var httpSender = Mockito.spy(new DefaultHttpSender(config, mockedClient)); + + // Trigger the client + final List messages = List.of("some message"); + messages.forEach(httpSender::send); + + // Capture the RequestBuilder + final ArgumentCaptor defaultHttpRequestBuilder = ArgumentCaptor.forClass(HttpRequest.Builder.class); + verify(httpSender, atLeast(messages.size())).sendWithRetries(defaultHttpRequestBuilder.capture(), + any(HttpResponseHandler.class), anyInt()); + + // Retrieve the builders and rebuild the HttpRequests to check the HttpRequest proper configuration + defaultHttpRequestBuilder + .getAllValues() + .stream() + .map(Builder::build) + .forEach(httpRequest -> { + assertThat(httpRequest.uri()).isEqualTo(config.httpUri()); + assertThat(httpRequest.timeout()) + .isPresent() + .get(as(InstanceOfAssertFactories.DURATION)) + .hasSeconds(config.httpTimeout()); + assertThat(httpRequest.method()).isEqualTo(HttpMethodsType.PUT.name()); + + assertThat(httpRequest + .headers() + .firstValue(HttpRequestBuilder.HEADER_CONTENT_TYPE)).isEmpty(); + }); + + // Check the messages have been sent once + messages.forEach( + message -> bodyPublishers.verify(() -> HttpRequest.BodyPublishers.ofString(eq(message)), times(1))); + + } + + @Test void shouldBuildCustomHttpRequest() throws Exception { final var configBase = new HashMap<>(defaultConfig());