Skip to content

Commit 7247fdf

Browse files
Support custom trust stores for https (#295)
* With a Keystore, the connect workers get deployed on MSK * Added code to try out different ways to access the trust store file * Modified to use the jks as trust store. Tested and it works to reach https endpoints * Refactored code to meet all Style Checks * Added Test cases * Modified the parameters to add the 'http' prefix * Updated documentation for the two new config options for SSL custom trust store * Used Logger, instead of print statements * Used 'try-with-resources' for TrustStore input stream * Modified test case to generate jks file on the fly and remove it when done. Also changed 'Truststore' to 'TrustStore' (note the 'S' in caps) * Fixed style checks * Added missing dependencies for jks generation for testing * File rename from Truststore to TrustStore
1 parent fd722fb commit 7247fdf

File tree

10 files changed

+521
-56
lines changed

10 files changed

+521
-56
lines changed

build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ dependencies {
135135
testImplementation("org.mockito:mockito-core:$mockitoVersion")
136136
testImplementation("org.mockito:mockito-junit-jupiter:$mockitoVersion")
137137
testImplementation("org.assertj:assertj-core:$assertjVersion")
138+
testImplementation("org.bouncycastle:bcprov-jdk15on:1.70")
139+
testImplementation("org.bouncycastle:bcpkix-jdk15on:1.70")
138140

139141
integrationTestRuntimeOnly("io.confluent:kafka-avro-serializer:$confluentPlatformVersion")
140142
integrationTestRuntimeOnly("io.confluent:kafka-connect-avro-converter:$confluentPlatformVersion")

docs/sink-connector-config-options.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,28 @@ Connection
4343
* Valid Values: Key value pair string list with format header:value
4444
* Importance: low
4545

46+
``http.ssl.trust.all.certs``
47+
Disable hostname verification. Not recommended for production environments.
48+
49+
* Type: boolean
50+
* Default: false
51+
* Importance: low
52+
53+
``http.ssl.truststore.location``
54+
Path to the SSL truststore file. Only JKS (Java KeyStore) format is supported.
55+
56+
* Type: string
57+
* Default: null
58+
* Valid Values: Path to JKS truststore file
59+
* Importance: low
60+
61+
``http.ssl.truststore.password``
62+
Password for the SSL truststore. Required when truststore location is specified.
63+
64+
* Type: password
65+
* Default: null
66+
* Importance: low
67+
4668
``oauth2.access.token.url``
4769
The URL to be used for fetching an access token. Client Credentials is the only supported grant type.
4870

src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public final class HttpSinkConfig extends AbstractConfig {
4444
private static final String HTTP_PROXY_HOST = "http.proxy.host";
4545
private static final String HTTP_PROXY_PORT = "http.proxy.port";
4646
private static final String HTTP_SSL_TRUST_ALL_CERTIFICATES = "http.ssl.trust.all.certs";
47+
private static final String HTTP_SSL_TRUSTSTORE_LOCATION = "http.ssl.truststore.location";
48+
private static final String HTTP_SSL_TRUSTSTORE_PASSWORD = "http.ssl.truststore.password";
4749

4850
private static final String HTTP_AUTHORIZATION_TYPE_CONFIG = "http.authorization.type";
4951
private static final String HTTP_HEADERS_AUTHORIZATION_CONFIG = "http.headers.authorization";
@@ -149,6 +151,28 @@ private static void addConnectionConfigGroup(final ConfigDef configDef) {
149151
ConfigDef.Width.SHORT,
150152
HTTP_SSL_TRUST_ALL_CERTIFICATES
151153
);
154+
configDef.define(
155+
HTTP_SSL_TRUSTSTORE_LOCATION,
156+
Type.STRING,
157+
null,
158+
ConfigDef.Importance.LOW,
159+
"Path to the SSL truststore file.",
160+
CONNECTION_GROUP,
161+
groupCounter++,
162+
ConfigDef.Width.LONG,
163+
HTTP_SSL_TRUSTSTORE_LOCATION
164+
);
165+
configDef.define(
166+
HTTP_SSL_TRUSTSTORE_PASSWORD,
167+
Type.PASSWORD,
168+
null,
169+
ConfigDef.Importance.LOW,
170+
"Password for the SSL truststore.",
171+
CONNECTION_GROUP,
172+
groupCounter++,
173+
ConfigDef.Width.MEDIUM,
174+
HTTP_SSL_TRUSTSTORE_PASSWORD
175+
);
152176

153177
configDef.define(
154178
HTTP_AUTHORIZATION_TYPE_CONFIG,
@@ -811,6 +835,15 @@ public final boolean sslTrustAllCertificates() {
811835
return getBoolean(HTTP_SSL_TRUST_ALL_CERTIFICATES);
812836
}
813837

838+
public final String sslTrustStoreLocation() {
839+
return getString(HTTP_SSL_TRUSTSTORE_LOCATION);
840+
}
841+
842+
public final String sslTrustStorePassword() {
843+
final Password password = getPassword(HTTP_SSL_TRUSTSTORE_PASSWORD);
844+
return password != null ? password.value() : null;
845+
}
846+
814847
public static void main(final String... args) {
815848
System.out.println("=========================================");
816849
System.out.println("HTTP Sink connector Configuration Options");

src/main/java/io/aiven/kafka/connect/http/sender/HttpSenderFactory.java

Lines changed: 11 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,10 @@
1616

1717
package io.aiven.kafka.connect.http.sender;
1818

19-
import javax.net.ssl.SSLContext;
20-
import javax.net.ssl.SSLEngine;
21-
import javax.net.ssl.TrustManager;
22-
import javax.net.ssl.X509ExtendedTrustManager;
23-
2419
import java.net.ProxySelector;
25-
import java.net.Socket;
2620
import java.net.http.HttpClient;
2721
import java.security.KeyManagementException;
2822
import java.security.NoSuchAlgorithmException;
29-
import java.security.SecureRandom;
30-
import java.security.cert.CertificateException;
31-
import java.security.cert.X509Certificate;
3223

3324
import org.apache.kafka.connect.errors.ConnectException;
3425

@@ -52,62 +43,26 @@ public static HttpSender createHttpSender(final HttpSinkConfig config) {
5243
}
5344
}
5445

55-
private static final TrustManager DUMMY_TRUST_MANAGER = new X509ExtendedTrustManager() {
56-
@Override
57-
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
58-
throws CertificateException {
59-
60-
}
61-
62-
@Override
63-
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
64-
throws CertificateException {
65-
66-
}
67-
68-
@Override
69-
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
70-
throws CertificateException {
71-
72-
}
73-
74-
@Override
75-
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
76-
throws CertificateException {
77-
78-
}
79-
80-
@Override
81-
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
82-
return new java.security.cert.X509Certificate[0];
83-
}
84-
85-
@Override
86-
public void checkClientTrusted(final X509Certificate[] chain, final String authType)
87-
throws CertificateException {
88-
89-
}
90-
91-
@Override
92-
public void checkServerTrusted(final java.security.cert.X509Certificate[] chain, final String authType)
93-
throws CertificateException {
94-
}
95-
};
96-
9746
static HttpClient buildHttpClient(final HttpSinkConfig config) {
9847
final var clientBuilder = HttpClient.newBuilder();
48+
configureProxy(config, clientBuilder);
49+
configureSsl(config, clientBuilder);
50+
return clientBuilder.build();
51+
}
52+
53+
private static void configureProxy(final HttpSinkConfig config, final HttpClient.Builder clientBuilder) {
9954
if (config.hasProxy()) {
10055
clientBuilder.proxy(ProxySelector.of(config.proxy()));
10156
}
102-
if (config.sslTrustAllCertificates()) {
57+
}
58+
59+
private static void configureSsl(final HttpSinkConfig config, final HttpClient.Builder clientBuilder) {
60+
if (config.sslTrustAllCertificates() || config.sslTrustStoreLocation() != null) {
10361
try {
104-
final SSLContext sslContext = SSLContext.getInstance("TLS");
105-
sslContext.init(null, new TrustManager[] {DUMMY_TRUST_MANAGER}, new SecureRandom());
106-
clientBuilder.sslContext(sslContext);
62+
clientBuilder.sslContext(SslContextBuilder.createSslContext(config));
10763
} catch (NoSuchAlgorithmException | KeyManagementException e) {
10864
throw new RuntimeException(e);
10965
}
11066
}
111-
return clientBuilder.build();
11267
}
11368
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2023 Aiven Oy and http-connector-for-apache-kafka project contributors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.http.sender;
18+
19+
import javax.net.ssl.SSLContext;
20+
import javax.net.ssl.SSLEngine;
21+
import javax.net.ssl.TrustManager;
22+
import javax.net.ssl.TrustManagerFactory;
23+
import javax.net.ssl.X509ExtendedTrustManager;
24+
25+
import java.io.IOException;
26+
import java.io.InputStream;
27+
import java.net.Socket;
28+
import java.security.KeyManagementException;
29+
import java.security.KeyStore;
30+
import java.security.KeyStoreException;
31+
import java.security.NoSuchAlgorithmException;
32+
import java.security.SecureRandom;
33+
import java.security.cert.CertificateException;
34+
import java.security.cert.X509Certificate;
35+
36+
import io.aiven.kafka.connect.http.config.HttpSinkConfig;
37+
38+
final class SslContextBuilder {
39+
40+
private static final TrustManager DUMMY_TRUST_MANAGER = new X509ExtendedTrustManager() {
41+
@Override
42+
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
43+
throws CertificateException {
44+
}
45+
46+
@Override
47+
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final Socket socket)
48+
throws CertificateException {
49+
}
50+
51+
@Override
52+
public void checkClientTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
53+
throws CertificateException {
54+
}
55+
56+
@Override
57+
public void checkServerTrusted(final X509Certificate[] chain, final String authType, final SSLEngine engine)
58+
throws CertificateException {
59+
}
60+
61+
@Override
62+
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
63+
return new java.security.cert.X509Certificate[0];
64+
}
65+
66+
@Override
67+
public void checkClientTrusted(final X509Certificate[] chain, final String authType)
68+
throws CertificateException {
69+
}
70+
71+
@Override
72+
public void checkServerTrusted(final java.security.cert.X509Certificate[] chain, final String authType)
73+
throws CertificateException {
74+
}
75+
};
76+
77+
static SSLContext createSslContext(final HttpSinkConfig config)
78+
throws NoSuchAlgorithmException, KeyManagementException {
79+
final SSLContext sslContext = SSLContext.getInstance("TLS");
80+
if (config.sslTrustAllCertificates()) {
81+
sslContext.init(null, new TrustManager[] {DUMMY_TRUST_MANAGER}, new SecureRandom());
82+
} else {
83+
final TrustManagerFactory tmf = loadTrustStore(config);
84+
sslContext.init(null, tmf != null ? tmf.getTrustManagers() : null, new SecureRandom());
85+
}
86+
return sslContext;
87+
}
88+
89+
private static TrustManagerFactory loadTrustStore(final HttpSinkConfig config) {
90+
if (config.sslTrustStoreLocation() == null) {
91+
return null;
92+
}
93+
try {
94+
final KeyStore trustStore = KeyStore.getInstance("JKS");
95+
final String path = config.sslTrustStoreLocation();
96+
97+
try (InputStream is = TrustStoreLoader.findTrustStoreInputStream(path)) {
98+
if (is == null) {
99+
throw new RuntimeException("TrustStore file not found: " + path
100+
+ ". Tried classpath and file system locations.");
101+
}
102+
trustStore.load(is, config.sslTrustStorePassword() != null
103+
? config.sslTrustStorePassword().toCharArray() : null);
104+
}
105+
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
106+
tmf.init(trustStore);
107+
return tmf;
108+
} catch (KeyStoreException
109+
| IOException
110+
| NoSuchAlgorithmException
111+
| CertificateException e) {
112+
throw new RuntimeException("Failed to load truststore: " + config.sslTrustStoreLocation(), e);
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)