Skip to content

Commit 66d2d0c

Browse files
committed
cleaning up commit for kafka
1 parent 33fbf47 commit 66d2d0c

File tree

3 files changed

+12
-17
lines changed

3 files changed

+12
-17
lines changed

cmd/ktranslate/main.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -604,8 +604,6 @@ func applyFlags(cfg *ktranslate.Config) error {
604604
// pkg/sinks/kafka
605605
case "kafka_topic":
606606
cfg.KafkaSink.Topic = val
607-
case "kafka_brokers":
608-
cfg.KafkaSink.BootstrapServers = val
609607
case "bootstrap.servers":
610608
cfg.KafkaSink.BootstrapServers = val
611609
case "kafka_security_protocol":
@@ -639,8 +637,6 @@ func applyFlags(cfg *ktranslate.Config) error {
639637
cfg.KafkaSink.SSLCertFile = val
640638
case "kafka_ssl_key_file":
641639
cfg.KafkaSink.SSLKeyFile = val
642-
case "kafka_ssl_key_password":
643-
cfg.KafkaSink.SSLKeyPassword = val
644640
case "kafka_ssl_insecure":
645641
v, err := strconv.ParseBool(val)
646642
if err != nil {

config.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,10 @@ type KafkaSinkConfig struct {
149149
KerberosPrincipal string // Kerberos principal
150150
KerberosDisablePAFXFAST bool // Disable PA-FX-FAST
151151
// SSL/TLS settings
152-
SSLCAFile string // CA certificate file
153-
SSLCertFile string // Client certificate file
154-
SSLKeyFile string // Client private key file
155-
SSLKeyPassword string // Private key password
156-
SSLInsecure bool // Skip certificate verification
152+
SSLCAFile string // CA certificate file
153+
SSLCertFile string // Client certificate file
154+
SSLKeyFile string // Client private key file
155+
SSLInsecure bool // Skip certificate verification
157156
// Producer settings
158157
RequiredAcks int // 0=NoResponse, 1=WaitForLocal, -1=WaitForAll
159158
Compression string // none, gzip, snappy, lz4, zstd
@@ -513,7 +512,6 @@ func DefaultConfig() *Config {
513512
SSLCAFile: "",
514513
SSLCertFile: "",
515514
SSLKeyFile: "",
516-
SSLKeyPassword: "",
517515
SSLInsecure: false,
518516
RequiredAcks: 1,
519517
Compression: "none",

pkg/sinks/kafka/kafa.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"crypto/x509"
77
"flag"
88
"fmt"
9-
"io/ioutil"
109
"os"
1110
"strings"
1211
"time"
@@ -35,7 +34,6 @@ var (
3534
kafkaSSLCAFile string
3635
kafkaSSLCertFile string
3736
kafkaSSLKeyFile string
38-
kafkaSSLKeyPassword string
3937
kafkaSSLInsecure bool
4038
kafkaRequiredAcks int
4139
kafkaCompression string
@@ -62,7 +60,6 @@ func init() {
6260
flag.StringVar(&kafkaSSLCAFile, "kafka_ssl_ca_file", "", "SSL CA certificate file")
6361
flag.StringVar(&kafkaSSLCertFile, "kafka_ssl_cert_file", "", "SSL client certificate file")
6462
flag.StringVar(&kafkaSSLKeyFile, "kafka_ssl_key_file", "", "SSL client private key file")
65-
flag.StringVar(&kafkaSSLKeyPassword, "kafka_ssl_key_password", "", "SSL private key password")
6663
flag.BoolVar(&kafkaSSLInsecure, "kafka_ssl_insecure", false, "Skip SSL certificate verification")
6764
flag.IntVar(&kafkaRequiredAcks, "kafka_required_acks", 1, "Required acks (0=NoResponse, 1=WaitForLocal, -1=WaitForAll)")
6865
flag.StringVar(&kafkaCompression, "kafka_compression", "none", "Compression codec (none, gzip, snappy, lz4, zstd)")
@@ -172,10 +169,14 @@ func (s *KafkaSink) Init(ctx context.Context, format formats.Format, compression
172169
return fmt.Errorf("unsupported security protocol: %s", s.config.SecurityProtocol)
173170
}
174171

175-
// Version configuration - use a recent version that supports all features
176-
version, err := sarama.ParseKafkaVersion("2.8.0")
172+
// Version configuration: allow override via KAFKA_VERSION env var, default to 2.8.0
173+
versionStr := os.Getenv("KT_KAFKA_VERSION")
174+
if versionStr == "" {
175+
versionStr = "2.8.0"
176+
}
177+
version, err := sarama.ParseKafkaVersion(versionStr)
177178
if err != nil {
178-
return fmt.Errorf("failed to parse kafka version: %v", err)
179+
return fmt.Errorf("failed to parse kafka version %q: %v", versionStr, err)
179180
}
180181
config.Version = version
181182

@@ -213,7 +214,7 @@ func (s *KafkaSink) configureTLS(config *sarama.Config) error {
213214

214215
// Load CA certificate
215216
if s.config.SSLCAFile != "" {
216-
caCert, err := ioutil.ReadFile(s.config.SSLCAFile)
217+
caCert, err := os.ReadFile(s.config.SSLCAFile)
217218
if err != nil {
218219
return fmt.Errorf("failed to read CA certificate: %v", err)
219220
}

0 commit comments

Comments
 (0)