3030import org .slf4j .Logger ;
3131import org .slf4j .LoggerFactory ;
3232
33+ import org .apache .kafka .common .KafkaException ;
34+ import org .apache .kafka .common .Reconfigurable ;
35+ import org .apache .kafka .common .config .ConfigException ;
36+ import org .apache .kafka .common .config .SslConfigs ;
37+ import org .apache .kafka .common .config .internals .BrokerSecurityConfigs ;
38+ import org .apache .kafka .common .config .types .Password ;
39+ import org .apache .kafka .common .network .Mode ;
40+ import org .apache .kafka .common .utils .Base64 ;
41+ import org .apache .kafka .common .utils .Utils ;
42+ import org .slf4j .Logger ;
43+ import org .slf4j .LoggerFactory ;
44+
45+ import javax .net .ssl .KeyManager ;
46+ import javax .net .ssl .KeyManagerFactory ;
47+ import javax .net .ssl .SSLContext ;
48+ import javax .net .ssl .SSLEngine ;
49+ import javax .net .ssl .SSLEngineResult ;
50+ import javax .net .ssl .SSLException ;
51+ import javax .net .ssl .SSLParameters ;
52+ import javax .net .ssl .TrustManagerFactory ;
53+ import javax .net .ssl .X509ExtendedKeyManager ;
54+ import javax .net .ssl .X509KeyManager ;
55+ import java .io .ByteArrayInputStream ;
56+ import java .io .InputStream ;
57+
3358import java .io .ByteArrayInputStream ;
3459import java .io .IOException ;
3560import java .io .InputStream ;
@@ -164,12 +189,14 @@ public void configure(Map<String, ?> configs) {
164189 (Password ) configs .get (SslConfigs .SSL_KEYSTORE_PASSWORD_CONFIG ),
165190 (Password ) configs .get (SslConfigs .SSL_KEY_PASSWORD_CONFIG ),
166191 (Password ) configs .get (SslConfigs .SSL_KEYSTORE_KEY_CONFIG ),
167- (Password ) configs .get (SslConfigs .SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG ));
192+ (Password ) configs .get (SslConfigs .SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG )),
193+ Boolean .parseBoolean ((String ) configs .get (SslConfigs .SSL_KEYSTORE_AS_STRING )));
168194
169195 this .truststore = createTruststore ((String ) configs .get (SslConfigs .SSL_TRUSTSTORE_TYPE_CONFIG ),
170196 (String ) configs .get (SslConfigs .SSL_TRUSTSTORE_LOCATION_CONFIG ),
171197 (Password ) configs .get (SslConfigs .SSL_TRUSTSTORE_PASSWORD_CONFIG ),
172- (Password ) configs .get (SslConfigs .SSL_TRUSTSTORE_CERTIFICATES_CONFIG ));
198+ (Password ) configs .get (SslConfigs .SSL_TRUSTSTORE_CERTIFICATES_CONFIG )),
199+ Boolean .parseBoolean ((String ) configs .get (SslConfigs .SSL_TRUSTSTORE_AS_STRING )));
173200
174201 this .sslContext = createSSLContext (keystore , truststore );
175202 }
@@ -275,7 +302,7 @@ protected TrustManager[] getTrustManagers(SecurityStore truststore, String tmfAl
275302 }
276303
277304 // Visibility to override for testing
278- protected SecurityStore createKeystore (String type , String path , Password password , Password keyPassword , Password privateKey , Password certificateChain ) {
305+ protected SecurityStore createKeystore (String type , String path , Password password , Password keyPassword , Password privateKey , Password certificateChain , boolean pathAsBase64EncodedString ) {
279306 if (privateKey != null ) {
280307 if (!PEM_TYPE .equals (type ))
281308 throw new InvalidConfigurationException ("SSL private key can be specified only for PEM, but key store type is " + type + "." );
@@ -299,12 +326,12 @@ else if (password != null)
299326 } else if (path != null && password == null ) {
300327 throw new InvalidConfigurationException ("SSL key store is specified, but key store password is not specified." );
301328 } else if (path != null && password != null ) {
302- return new FileBasedStore (type , path , password , keyPassword , true );
329+ return new FileBasedStore (type , path , password , keyPassword , true , pathAsBase64EncodedString );
303330 } else
304331 return null ; // path == null, clients may use this path with brokers that don't require client auth
305332 }
306333
307- private static SecurityStore createTruststore (String type , String path , Password password , Password trustStoreCerts ) {
334+ private static SecurityStore createTruststore (String type , String path , Password password , Password trustStoreCerts , boolean pathAsBase64EncodedString ) {
308335 if (trustStoreCerts != null ) {
309336 if (!PEM_TYPE .equals (type ))
310337 throw new InvalidConfigurationException ("SSL trust store certs can be specified only for PEM, but trust store type is " + type + "." );
@@ -322,7 +349,7 @@ else if (password != null)
322349 } else if (path == null && password != null ) {
323350 throw new InvalidConfigurationException ("SSL trust store is not specified, but trust store password is specified." );
324351 } else if (path != null ) {
325- return new FileBasedStore (type , path , password , null , false );
352+ return new FileBasedStore (type , path , password , null , false , pathAsBase64EncodedString );
326353 } else
327354 return null ;
328355 }
@@ -341,15 +368,17 @@ static class FileBasedStore implements SecurityStore {
341368 protected final Password keyPassword ;
342369 private final Long fileLastModifiedMs ;
343370 private final KeyStore keyStore ;
371+ private final boolean pathAsBase64EncodedString ;
344372
345- FileBasedStore (String type , String path , Password password , Password keyPassword , boolean isKeyStore ) {
373+ FileBasedStore (String type , String path , Password password , Password keyPassword , boolean isKeyStore , boolean pathAsBase64EncodedString ) {
346374 Objects .requireNonNull (type , "type must not be null" );
347375 this .type = type ;
348376 this .path = path ;
349377 this .password = password ;
350378 this .keyPassword = keyPassword ;
351379 fileLastModifiedMs = lastModifiedMs (path );
352380 this .keyStore = load (isKeyStore );
381+ this .pathAsBase64EncodedString = pathAsBase64EncodedString ;
353382 }
354383
355384 @ Override
@@ -370,11 +399,24 @@ public char[] keyPassword() {
370399 * using the specified configs (e.g. if the password or keystore type is invalid)
371400 */
372401 protected KeyStore load (boolean isKeyStore ) {
373- try (InputStream in = Files .newInputStream (Paths .get (path ))) {
402+ if (path == null ) {
403+ throw new KafkaException ("Failed to load SSL keystore: path was null" );
404+ }
405+ InputStream in ;
406+ try {
407+ if (pathAsBase64EncodedString ) {
408+ String encodedKeyStore = System .getenv (path );
409+ in = new ByteArrayInputStream (Base64 .decoder ().decode (encodedKeyStore ));
410+ } else if (type .equalsIgnoreCase (TruststoreUtility .CRT )) {
411+ return TruststoreUtility .createTrustStore (path , password .value ());
412+ } else {
413+ in = new FileInputStream (path );
414+ }
374415 KeyStore ks = KeyStore .getInstance (type );
375416 // If a password is not set access to the truststore is still available, but integrity checking is disabled.
376417 char [] passwordChars = password != null ? password .value ().toCharArray () : null ;
377418 ks .load (in , passwordChars );
419+ in .close ();
378420 return ks ;
379421 } catch (GeneralSecurityException | IOException e ) {
380422 throw new KafkaException ("Failed to load SSL keystore " + path + " of type " + type , e );
0 commit comments