2626import org .apache .kafka .common .security .auth .SslEngineFactory ;
2727import org .apache .kafka .common .utils .SecurityUtils ;
2828import org .apache .kafka .common .utils .Utils ;
29-
29+ import java . net . Socket ;
3030import org .slf4j .Logger ;
3131import org .slf4j .LoggerFactory ;
3232
4141import org .apache .kafka .common .utils .Utils ;
4242import org .slf4j .Logger ;
4343import org .slf4j .LoggerFactory ;
44-
44+ import java . security . PrivateKey ;
4545import javax .net .ssl .KeyManager ;
4646import javax .net .ssl .KeyManagerFactory ;
4747import javax .net .ssl .SSLContext ;
5454import javax .net .ssl .X509KeyManager ;
5555import java .io .ByteArrayInputStream ;
5656import java .io .InputStream ;
57-
57+ import javax .net .ssl .X509ExtendedKeyManager ;
58+ import javax .net .ssl .X509KeyManager ;
5859import java .io .ByteArrayInputStream ;
5960import java .io .IOException ;
6061import java .io .InputStream ;
@@ -198,7 +199,7 @@ public void configure(Map<String, ?> configs) {
198199 (Password ) configs .get (SslConfigs .SSL_TRUSTSTORE_CERTIFICATES_CONFIG )),
199200 Boolean .parseBoolean ((String ) configs .get (SslConfigs .SSL_TRUSTSTORE_AS_STRING )));
200201
201- this .sslContext = createSSLContext (keystore , truststore );
202+ this .sslContext = createSSLContext (keystore , truststore , configs );
202203 }
203204
204205 @ Override
@@ -261,7 +262,7 @@ private static SecureRandom createSecureRandom(String key) {
261262 }
262263 }
263264
264- private SSLContext createSSLContext (SecurityStore keystore , SecurityStore truststore ) {
265+ private SSLContext createSSLContext (SecurityStore keystore , SecurityStore truststore , Map < String , ?> configs ) {
265266 try {
266267 SSLContext sslContext ;
267268 if (provider != null )
@@ -285,7 +286,7 @@ private SSLContext createSSLContext(SecurityStore keystore, SecurityStore trusts
285286 String tmfAlgorithm = this .tmfAlgorithm != null ? this .tmfAlgorithm : TrustManagerFactory .getDefaultAlgorithm ();
286287 TrustManager [] trustManagers = getTrustManagers (truststore , tmfAlgorithm );
287288
288- sslContext .init (keyManagers , trustManagers , this .secureRandomImplementation );
289+ sslContext .init (applyAliasToKM ( keyManagers , ( String ) configs . get ( "ssl.keystore.alias" )) , trustManagers , this .secureRandomImplementation );
289290 log .debug ("Created SSL context with keystore {}, truststore {}, provider {}." ,
290291 keystore , truststore , sslContext .getProvider ().getName ());
291292 return sslContext ;
@@ -294,6 +295,63 @@ private SSLContext createSSLContext(SecurityStore keystore, SecurityStore trusts
294295 }
295296 }
296297
298+ private KeyManager [] applyAliasToKM (KeyManager [] kms , final String alias ) {
299+ if (alias == null || alias .isEmpty ()){
300+ return kms ;
301+ }
302+
303+ log .debug ("Applying the custom KeyManagers for alias: {}" , alias );
304+
305+ KeyManager [] updatedKMs = new KeyManager [kms .length ];
306+
307+ int i =0 ;
308+ for (KeyManager km : kms ){
309+ final X509KeyManager origKM = (X509KeyManager )km ;
310+ X509ExtendedKeyManager exKM = new X509ExtendedKeyManager () {
311+ /* (non-Javadoc)
312+ * @see javax.net.ssl.X509ExtendedKeyManager#chooseEngineClientAlias(java.lang.String[], java.security.Principal[], javax.net.ssl.SSLEngine)
313+ */
314+ @ Override
315+ public String chooseEngineClientAlias (String [] arg0 ,
316+ Principal [] arg1 , SSLEngine arg2 ) {
317+ return alias ;
318+ }
319+
320+ @ Override
321+ public String [] getServerAliases (String arg0 , Principal [] arg1 ) {
322+ return origKM .getServerAliases (arg0 , arg1 );
323+ }
324+
325+ @ Override
326+ public PrivateKey getPrivateKey (String arg0 ) {
327+ return origKM .getPrivateKey (arg0 );
328+ }
329+
330+ @ Override
331+ public String [] getClientAliases (String arg0 , Principal [] arg1 ) {
332+ return origKM .getClientAliases (arg0 , arg1 );
333+ }
334+
335+ @ Override
336+ public X509Certificate [] getCertificateChain (String arg0 ) {
337+ return origKM .getCertificateChain (arg0 );
338+ }
339+
340+ @ Override
341+ public String chooseServerAlias (String arg0 , Principal [] arg1 , Socket arg2 ) {
342+ return origKM .chooseServerAlias (arg0 , arg1 , arg2 );
343+ }
344+
345+ @ Override
346+ public String chooseClientAlias (String [] arg0 , Principal [] arg1 , Socket arg2 ) {
347+ return alias ;
348+ }
349+ };
350+ updatedKMs [i ++] = exKM ;
351+ }
352+ return updatedKMs ;
353+ }
354+
297355 protected TrustManager [] getTrustManagers (SecurityStore truststore , String tmfAlgorithm ) throws NoSuchAlgorithmException , KeyStoreException {
298356 TrustManagerFactory tmf = TrustManagerFactory .getInstance (tmfAlgorithm );
299357 KeyStore ts = truststore == null ? null : truststore .get ();
0 commit comments