Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ dependencies {
testImplementation("org.mockito:mockito-core:$mockitoVersion")
testImplementation("org.mockito:mockito-junit-jupiter:$mockitoVersion")
testImplementation("org.assertj:assertj-core:$assertjVersion")
testImplementation("org.bouncycastle:bcprov-jdk15on:1.70")
testImplementation("org.bouncycastle:bcpkix-jdk15on:1.70")

integrationTestRuntimeOnly("io.confluent:kafka-avro-serializer:$confluentPlatformVersion")
integrationTestRuntimeOnly("io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion")
Expand Down
22 changes: 22 additions & 0 deletions docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,28 @@ Connection
* Valid Values: Key value pair string list with format header:value
* Importance: low

``http.ssl.trust.all.certs``
Disable hostname verification. Not recommended for production environments.

* Type: boolean
* Default: false
* Importance: low

``http.ssl.truststore.location``
Path to the SSL truststore file. Only JKS (Java KeyStore) format is supported.

* Type: string
* Default: null
* Valid Values: Path to JKS truststore file
* Importance: low

``http.ssl.truststore.password``
Password for the SSL truststore. Required when truststore location is specified.

* Type: password
* Default: null
* Importance: low

``oauth2.access.token.url``
The URL to be used for fetching an access token. Client Credentials is the only supported grant type.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public final class HttpSinkConfig extends AbstractConfig {
private static final String HTTP_PROXY_HOST = "http.proxy.host";
private static final String HTTP_PROXY_PORT = "http.proxy.port";
private static final String HTTP_SSL_TRUST_ALL_CERTIFICATES = "http.ssl.trust.all.certs";
private static final String HTTP_SSL_TRUSTSTORE_LOCATION = "http.ssl.truststore.location";
private static final String HTTP_SSL_TRUSTSTORE_PASSWORD = "http.ssl.truststore.password";

private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type";
private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization";
Expand Down Expand Up @@ -149,6 +151,28 @@ private static void addConnectionConfigGroup(final ConfigDef configDef) {
ConfigDef.Width.SHORT,
HTTP_SSL_TRUST_ALL_CERTIFICATES
);
configDef.define(
HTTP_SSL_TRUSTSTORE_LOCATION,
Type.STRING,
null,
ConfigDef.Importance.LOW,
"Path to the SSL truststore file.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
HTTP_SSL_TRUSTSTORE_LOCATION
);
configDef.define(
HTTP_SSL_TRUSTSTORE_PASSWORD,
Type.PASSWORD,
null,
ConfigDef.Importance.LOW,
"Password for the SSL truststore.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.MEDIUM,
HTTP_SSL_TRUSTSTORE_PASSWORD
);

configDef.define(
HTTP_AUTHORIZATION_TYPE_CONFIG,
Expand Down Expand Up @@ -811,6 +835,15 @@ public final boolean sslTrustAllCertificates() {
return getBoolean(HTTP_SSL_TRUST_ALL_CERTIFICATES);
}

public final String sslTrustStoreLocation() {
return getString(HTTP_SSL_TRUSTSTORE_LOCATION);
}

public final String sslTrustStorePassword() {
final Password password = getPassword(HTTP_SSL_TRUSTSTORE_PASSWORD);
return password != null ? password.value() : null;
}

public static void main(final String... args) {
System.out.println("=========================================");
System.out.println("HTTP Sink connector Configuration Options");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,10 @@

package io.aiven.kafka.connect.http.sender;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509ExtendedTrustManager;

import java.net.ProxySelector;
import java.net.Socket;
import java.net.http.HttpClient;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

import org.apache.kafka.connect.errors.ConnectException;

Expand All @@ -52,62 +43,26 @@ public static HttpSender createHttpSender(final HttpSinkConfig config) {
}
}

private static final TrustManager DUMMY_TRUST_MANAGER = new X509ExtendedTrustManager() {
@Override
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
throws CertificateException {

}

@Override
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
throws CertificateException {

}

@Override
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
throws CertificateException {

}

@Override
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
throws CertificateException {

}

@Override
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return new java.security.cert.X509Certificate[0];
}

@Override
public void checkClientTrusted(final X509Certificate[] chain, final String authType)
throws CertificateException {

}

@Override
public void checkServerTrusted(final java.security.cert.X509Certificate[] chain, final String authType)
throws CertificateException {
}
};

static HttpClient buildHttpClient(final HttpSinkConfig config) {
final var clientBuilder = HttpClient.newBuilder();
configureProxy(config, clientBuilder);
configureSsl(config, clientBuilder);
return clientBuilder.build();
}

private static void configureProxy(final HttpSinkConfig config, final HttpClient.Builder clientBuilder) {
if (config.hasProxy()) {
clientBuilder.proxy(ProxySelector.of(config.proxy()));
}
if (config.sslTrustAllCertificates()) {
}

private static void configureSsl(final HttpSinkConfig config, final HttpClient.Builder clientBuilder) {
if (config.sslTrustAllCertificates() || config.sslTrustStoreLocation() != null) {
try {
final SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, new TrustManager[] {DUMMY_TRUST_MANAGER}, new SecureRandom());
clientBuilder.sslContext(sslContext);
clientBuilder.sslContext(SslContextBuilder.createSslContext(config));
} catch (NoSuchAlgorithmException | KeyManagementException e) {
throw new RuntimeException(e);
}
}
return clientBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2023 Aiven Oy and http-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.sender;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedTrustManager;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;

final class SslContextBuilder {

private static final TrustManager DUMMY_TRUST_MANAGER = new X509ExtendedTrustManager() {
@Override
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
throws CertificateException {
}

@Override
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
throws CertificateException {
}

@Override
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
throws CertificateException {
}

@Override
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
throws CertificateException {
}

@Override
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return new java.security.cert.X509Certificate[0];
}

@Override
public void checkClientTrusted(final X509Certificate[] chain, final String authType)
throws CertificateException {
}

@Override
public void checkServerTrusted(final java.security.cert.X509Certificate[] chain, final String authType)
throws CertificateException {
}
};

static SSLContext createSslContext(final HttpSinkConfig config)
throws NoSuchAlgorithmException, KeyManagementException {
final SSLContext sslContext = SSLContext.getInstance("TLS");
if (config.sslTrustAllCertificates()) {
sslContext.init(null, new TrustManager[] {DUMMY_TRUST_MANAGER}, new SecureRandom());
} else {
final TrustManagerFactory tmf = loadTrustStore(config);
sslContext.init(null, tmf != null ? tmf.getTrustManagers() : null, new SecureRandom());
}
return sslContext;
}

private static TrustManagerFactory loadTrustStore(final HttpSinkConfig config) {
if (config.sslTrustStoreLocation() == null) {
return null;
}
try {
final KeyStore trustStore = KeyStore.getInstance("JKS");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be a good idea to also add a config for the keystore type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but jks meets my need. I can think of adding other types later if I have the time, but it would be good to not hold this off until other support is available. Let me know what you think. if you agree, I can update teh documentation to make it clear that only JKS is supported for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's probably fine to not add the keystore type immediately -- IIUC, adding the truststore here is just a mechanism to create an allow list of acceptable HTTPS servers (not to verify that the client is acceptable). This is by far the most frequent usage (e.g., when my browser contacts https://github.com, the server never checks who I am).

Copy link
Copy Markdown
Contributor Author

@madhavvishnubhatta madhavvishnubhatta Oct 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This is only for the client to validate the server. This is not a mechanism for the server to check who is connecting. But makes me think if mTLS might be a future feature request. Anyway, that is out of scope for this PR.

final String path = config.sslTrustStoreLocation();

try (InputStream is = TrustStoreLoader.findTrustStoreInputStream(path)) {
if (is == null) {
throw new RuntimeException("TrustStore file not found: " + path
+ ". Tried classpath and file system locations.");
}
trustStore.load(is, config.sslTrustStorePassword() != null
? config.sslTrustStorePassword().toCharArray() : null);
}
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);
return tmf;
} catch (KeyStoreException
| IOException
| NoSuchAlgorithmException
| CertificateException e) {
throw new RuntimeException("Failed to load truststore: " + config.sslTrustStoreLocation(), e);
}
}
}
Loading
Loading