Skip to content

Commit 51636c8

Browse files
authored
feat: Make Intermediate CA optional in TLS configuration (#88)
1 parent c2bfe8a commit 51636c8

File tree

4 files changed

+16
-12
lines changed

4 files changed

+16
-12
lines changed

internal/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func newConsumer(kafkaConfig *kafka.Config) *kafkaConsumer {
4040
}
4141

4242
if kafkaConfig.SASL.Enabled {
43-
readerConfig.Dialer.TLS = NewTLSConfig(kafkaConfig.SASL)
43+
readerConfig.Dialer.TLS = NewTLSConfig(kafkaConfig)
4444
readerConfig.Dialer.SASLMechanism = Mechanism(kafkaConfig.SASL)
4545

4646
if kafkaConfig.SASL.Rack != "" {

internal/producer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func newProducer(kafkaConfig *kafka.Config) Producer {
3737
}
3838

3939
if kafkaConfig.SASL.Enabled {
40-
transport.TLS = NewTLSConfig(kafkaConfig.SASL)
40+
transport.TLS = NewTLSConfig(kafkaConfig)
4141
transport.SASL = Mechanism(kafkaConfig.SASL)
4242
}
4343

internal/secure.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,24 @@ import (
1111
"github.com/segmentio/kafka-go/sasl/scram"
1212
)
1313

14-
func NewTLSConfig(sasl kafka.SASLConfig) *tls.Config {
15-
rootCA, err := os.ReadFile(sasl.RootCAPath)
14+
func NewTLSConfig(cfg *kafka.Config) *tls.Config {
15+
rootCA, err := os.ReadFile(cfg.SASL.RootCAPath)
1616
if err != nil {
17-
panic("Error while reading Root CA file: " + sasl.RootCAPath + " error: " + err.Error())
17+
panic("Error while reading Root CA file: " + cfg.SASL.RootCAPath + " error: " + err.Error())
1818
}
1919

20-
interCA, err := os.ReadFile(sasl.IntermediateCAPath)
21-
if err != nil {
22-
panic("Error while reading Intermediate CA file: " + sasl.IntermediateCAPath + " error: " + err.Error())
20+
caCertPool := x509.NewCertPool()
21+
if ok := caCertPool.AppendCertsFromPEM(rootCA); !ok {
22+
panic("failed to append Root CA certificates from file: " + cfg.SASL.RootCAPath)
2323
}
2424

25-
caCertPool := x509.NewCertPool()
26-
caCertPool.AppendCertsFromPEM(rootCA)
27-
caCertPool.AppendCertsFromPEM(interCA)
25+
interCA, err := os.ReadFile(cfg.SASL.IntermediateCAPath)
26+
if err != nil {
27+
cfg.Logger.Warnf("Unable to read Intermediate CA file: %s, error: %v", cfg.SASL.IntermediateCAPath, err)
28+
cfg.Logger.Info("Intermediate CA will be skipped.")
29+
} else if ok := caCertPool.AppendCertsFromPEM(interCA); !ok {
30+
cfg.Logger.Warnf("Failed to append Intermediate CA certificates from file: %s", cfg.SASL.IntermediateCAPath)
31+
}
2832

2933
return &tls.Config{
3034
RootCAs: caCertPool,

internal/verify_topic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func NewKafkaClient(cfg *kafka.Config) (kafkaClient, error) {
2929
}
3030

3131
if cfg.SASL.Enabled {
32-
transport.TLS = NewTLSConfig(cfg.SASL)
32+
transport.TLS = NewTLSConfig(cfg)
3333
transport.SASL = Mechanism(cfg.SASL)
3434
}
3535

0 commit comments

Comments
 (0)