@@ -2,6 +2,7 @@ package cli
22
33import (
44 "crypto/tls"
5+ "crypto/x509"
56 "encoding/json"
67 "flag"
78 "fmt"
@@ -62,11 +63,15 @@ func getKey() (apiKey, error) {
6263}
6364
6465type config struct {
65- BootstrapServers string `json:"bootstrap_servers"`
66- SecurityProtocol string `json:"security_protocol"`
67- SaslMechanisms string `json:"sasl_mechanisms"`
68- SaslUsername string `json:"sasl_username"`
69- SaslPassword string `json:"sasl_password"`
66+ BootstrapServers string `json:"bootstrap_servers"`
67+ SecurityProtocol string `json:"security_protocol"`
68+ SaslMechanisms string `json:"sasl_mechanisms"`
69+ SaslUsername string `json:"sasl_username"`
70+ SaslPassword string `json:"sasl_password"`
71+ TLSClientCertFile string `json:"tls_client_cert_file"`
72+ TLSClientKeyFile string `json:"tls_client_key_file"`
73+ TLSServerCACertFile string `json:"tls_server_ca_cert_file"`
74+ TLSServerInsecureSkipVerify bool `json:"tls_server_insecure_skip_verify"`
7075}
7176
7277func LoadKafkaConfig () ([]kgo.Opt , error ) {
@@ -90,10 +95,31 @@ func LoadKafkaConfig() ([]kgo.Opt, error) {
9095 switch c .SecurityProtocol {
9196 case "" , "PLAINTEXT" , "SASL_PLAINTEXT" :
9297 case "SSL" , "SASL_SSL" :
98+ var tlsConfig tls.Config
99+ if c .TLSClientCertFile != "" && c .TLSClientKeyFile != "" {
100+ cert , err := tls .LoadX509KeyPair (c .TLSClientCertFile , c .TLSClientKeyFile )
101+ if err != nil {
102+ return nil , fmt .Errorf ("failed to load key pair from tls_client_cert_file and tls_client_key_file: %w" , err )
103+ }
104+ tlsConfig .Certificates = []tls.Certificate {cert }
105+ }
106+ if c .TLSServerCACertFile != "" {
107+ caCert , err := os .ReadFile (c .TLSServerCACertFile )
108+ if err != nil {
109+ return nil , fmt .Errorf ("failed to read tls_server_ca_cert_file: %w" , err )
110+ }
111+ p := x509 .NewCertPool ()
112+ if ! p .AppendCertsFromPEM (caCert ) {
113+ return nil , fmt .Errorf ("failed to append certificates from tls_server_ca_cert_file" )
114+ }
115+ tlsConfig .RootCAs = p
116+ }
117+ tlsConfig .InsecureSkipVerify = c .TLSServerInsecureSkipVerify
93118 d := & tls.Dialer {
94119 NetDialer : & net.Dialer {
95120 Timeout : 10 * time .Second ,
96121 },
122+ Config : & tlsConfig ,
97123 }
98124 opts = append (opts , kgo .Dialer (d .DialContext ))
99125 default :
0 commit comments