11package com .marklogic .kafka .connect ;
22
3+ import javax .net .ssl .SSLContext ;
4+ import java .security .NoSuchAlgorithmException ;
5+ import java .util .Map ;
6+
7+ import java .io .FileInputStream ;
8+ import java .io .FileNotFoundException ;
9+ import java .io .IOException ;
10+ import java .io .InputStream ;
11+ import java .security .KeyManagementException ;
12+ import java .security .KeyStore ;
13+ import java .security .KeyStoreException ;
14+ import java .security .NoSuchAlgorithmException ;
15+ import java .security .UnrecoverableKeyException ;
16+ import java .security .cert .CertificateException ;
17+
18+ import javax .net .ssl .KeyManager ;
19+ import javax .net .ssl .KeyManagerFactory ;
20+ import javax .net .ssl .SSLContext ;
21+ import javax .net .ssl .TrustManager ;
22+ import javax .net .ssl .TrustManagerFactory ;
23+
324import com .marklogic .client .DatabaseClient ;
425import com .marklogic .client .DatabaseClientFactory ;
526import com .marklogic .client .ext .DatabaseClientConfig ;
627import com .marklogic .client .ext .SecurityContextType ;
728import com .marklogic .kafka .connect .sink .MarkLogicSinkConfig ;
829
9- import javax .net .ssl .SSLContext ;
10- import java .security .NoSuchAlgorithmException ;
11- import java .util .Map ;
30+ import com .marklogic .client .ext .modulesloader .ssl .SimpleX509TrustManager ;
1231
1332public class DefaultDatabaseClientConfigBuilder implements DatabaseClientConfigBuilder {
1433
@@ -17,32 +36,26 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaC
1736 DatabaseClientConfig clientConfig = new DatabaseClientConfig ();
1837 clientConfig .setCertFile (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_CERT_FILE ));
1938 clientConfig .setCertPassword (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_CERT_PASSWORD ));
20-
21- String type = kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_TYPE );
22- if (type != null && type .trim ().length () > 0 ) {
23- clientConfig .setConnectionType (DatabaseClient .ConnectionType .valueOf (type .toUpperCase ()));
24- }
25-
39+ clientConfig .setTrustManager (new SimpleX509TrustManager ());
40+ clientConfig = configureHostNameVerifier (clientConfig ,kafkaConfig );
2641 String database = kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_DATABASE );
2742 if (database != null && database .trim ().length () > 0 ) {
2843 clientConfig .setDatabase (database );
2944 }
30-
45+ String connType = kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_TYPE );
46+ if (connType != null && connType .trim ().length () > 0 ) {
47+ clientConfig .setConnectionType (DatabaseClient .ConnectionType .valueOf (connType .toUpperCase ()));
48+ }
3149 clientConfig .setExternalName (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_EXTERNAL_NAME ));
3250 clientConfig .setHost (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_HOST ));
3351 clientConfig .setPassword (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_PASSWORD ));
3452 clientConfig .setPort (Integer .parseInt (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_PORT )));
35-
36- String securityContextType = kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_SECURITY_CONTEXT_TYPE ).toUpperCase ();
37- clientConfig .setSecurityContextType (SecurityContextType .valueOf (securityContextType ));
38-
53+ clientConfig = configureCustomSslConnection (clientConfig , kafkaConfig );
3954 String simpleSsl = kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_SIMPLE_SSL );
4055 if (simpleSsl != null && Boolean .parseBoolean (simpleSsl )) {
41- configureSimpleSsl (clientConfig );
56+ clientConfig = configureSimpleSsl (clientConfig );
4257 }
43-
4458 clientConfig .setUsername (kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_USERNAME ));
45-
4659 return clientConfig ;
4760 }
4861
@@ -53,13 +66,117 @@ public DatabaseClientConfig buildDatabaseClientConfig(Map<String, String> kafkaC
5366 *
5467 * @param clientConfig
5568 */
56- protected void configureSimpleSsl (DatabaseClientConfig clientConfig ) {
69+ protected DatabaseClientConfig configureSimpleSsl (DatabaseClientConfig clientConfig ) {
5770 try {
5871 clientConfig .setSslContext (SSLContext .getDefault ());
72+ clientConfig .setTrustManager (new SimpleX509TrustManager ());
5973 } catch (NoSuchAlgorithmException e ) {
6074 throw new RuntimeException ("Unable to get default SSLContext: " + e .getMessage (), e );
6175 }
62-
6376 clientConfig .setSslHostnameVerifier (DatabaseClientFactory .SSLHostnameVerifier .ANY );
77+ return clientConfig ;
78+ }
79+
80+ /**
81+ * This function configures the Host Name verifier based on the configuration.
82+ * ANY, STRICT and COMMON are the possible values, ANY being default.
83+ *
84+ * @param clientConfig
85+ */
86+ protected DatabaseClientConfig configureHostNameVerifier (DatabaseClientConfig clientConfig , Map <String , String > kafkaConfig ) {
87+ String sslHostNameVerifier = kafkaConfig .get (MarkLogicSinkConfig .SSL_HOST_VERIFIER );
88+ if ("ANY" .equals (sslHostNameVerifier ))
89+ clientConfig .setSslHostnameVerifier (DatabaseClientFactory .SSLHostnameVerifier .ANY );
90+ else if ("COMMON" .equals (sslHostNameVerifier ))
91+ clientConfig .setSslHostnameVerifier (DatabaseClientFactory .SSLHostnameVerifier .COMMON );
92+ else if ("STRICT" .equals (sslHostNameVerifier ))
93+ clientConfig .setSslHostnameVerifier (DatabaseClientFactory .SSLHostnameVerifier .STRICT );
94+ else
95+ clientConfig .setSslHostnameVerifier (DatabaseClientFactory .SSLHostnameVerifier .ANY );
96+ return clientConfig ;
97+ }
98+
99+ protected DatabaseClientConfig configureCustomSslConnection (DatabaseClientConfig clientConfig , Map <String , String > kafkaConfig ) {
100+ String ssl = kafkaConfig .get (MarkLogicSinkConfig .SSL );
101+ String tlsVersion = kafkaConfig .get (MarkLogicSinkConfig .TLS_VERSION );
102+ String sslMutualAuth = kafkaConfig .get (MarkLogicSinkConfig .SSL_MUTUAL_AUTH );
103+ SSLContext sslContext = null ;
104+ String securityContextType = kafkaConfig .get (MarkLogicSinkConfig .CONNECTION_SECURITY_CONTEXT_TYPE ).toUpperCase ();
105+ clientConfig .setSecurityContextType (SecurityContextType .valueOf (securityContextType ));
106+
107+ if ("BASIC" .equals (securityContextType ) ||
108+ "DIGEST" .equals (securityContextType )
109+ ) {
110+ if (ssl != null && Boolean .parseBoolean (ssl )) {
111+ if (sslMutualAuth != null && Boolean .parseBoolean (sslMutualAuth )) {
112+ /*2 way ssl changes*/
113+ KeyStore clientKeyStore = null ;
114+ try {
115+ clientKeyStore = KeyStore .getInstance ("PKCS12" );
116+ } catch (KeyStoreException e ) {
117+
118+ throw new RuntimeException ("Unable to get default SSLContext: " + e .getMessage (), e );
119+ }
120+ TrustManager [] trust = new TrustManager [] { new SimpleX509TrustManager ()};
121+
122+ try (InputStream keystoreInputStream = new FileInputStream (clientConfig .getCertFile ())) {
123+ clientKeyStore .load (keystoreInputStream , clientConfig .getCertPassword ().toCharArray ());
124+ } catch (Exception e ) {
125+ throw new RuntimeException ("Unable to configure custom SSL connection: " + e .getMessage (), e );
126+ }
127+ KeyManagerFactory keyManagerFactory = null ;
128+ try {
129+ keyManagerFactory = KeyManagerFactory .getInstance (TrustManagerFactory .getDefaultAlgorithm ());
130+ } catch (Exception e ) {
131+
132+ throw new RuntimeException ("Unable to configure custom SSL connection: " + e .getMessage (), e );
133+ }
134+ try {
135+ keyManagerFactory .init (clientKeyStore , clientConfig .getCertPassword ().toCharArray ());
136+ } catch (Exception e ) {
137+
138+ throw new RuntimeException ("Unable to configure custom SSL connection: " + e .getMessage (), e );
139+ }
140+ KeyManager [] key = keyManagerFactory .getKeyManagers ();
141+ try {
142+ if (tlsVersion != null && tlsVersion .trim ().length () > 0 ) {
143+ sslContext = SSLContext .getInstance (tlsVersion );
144+ }
145+ else {
146+ sslContext = SSLContext .getInstance ("TLSv1.2" );
147+ }
148+ } catch (NoSuchAlgorithmException e ) {
149+
150+ throw new RuntimeException ("Unable to configure custom SSL connection:" + e .getMessage (), e );
151+ }
152+ try {
153+ sslContext .init (key , trust , null );
154+ } catch (KeyManagementException e ) {
155+ throw new RuntimeException ("Unable to configure custom SSL connection:" + e .getMessage (), e );
156+ }
157+ clientConfig .setSslContext (sslContext );
158+ }
159+ else {/*1wayssl*/
160+ TrustManager [] trust = new TrustManager [] { new SimpleX509TrustManager ()};
161+ try {
162+ if (tlsVersion != null && tlsVersion .trim ().length () > 0 ) {
163+ sslContext = SSLContext .getInstance (tlsVersion );
164+ }
165+ else {
166+ sslContext = SSLContext .getInstance ("TLSv1.2" );
167+ }
168+ } catch (NoSuchAlgorithmException e ) {
169+ throw new RuntimeException ("Unable to configure custom SSL connection: " + e .getMessage (), e );
170+ }
171+ try {
172+ sslContext .init (null , trust , null );
173+ }catch (KeyManagementException e ) {
174+ throw new RuntimeException ("Unable to configure custom SSL connection:" + e .getMessage (), e );
175+ }
176+ clientConfig .setSslContext (sslContext );
177+ }
178+ } /* End of if ssl */
179+ }
180+ return clientConfig ;
64181 }
65182}
0 commit comments