Conversation
835a4b0 to
8994ae1
Compare
There was a problem hiding this comment.
Pull request overview
This PR migrates the Kafka sink implementation from the segmentio/kafka-go library to Shopify's sarama library, adding comprehensive Kerberos (GSSAPI) authentication support and expanding configuration options for security, SSL/TLS, and producer tuning.
Changes:
- Replaced
segmentio/kafka-gowithgithub.com/Shopify/saramalibrary for Kafka connectivity - Added extensive Kerberos/GSSAPI authentication support with keytab and user auth modes
- Expanded configuration to support multiple security protocols (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL) and SASL mechanisms (PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI)
- Refactored message sending from synchronous to asynchronous producer with separate response handling goroutine
Reviewed changes
Copilot reviewed 4 out of 5 changed files in this pull request and generated 19 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/sinks/kafka/kafa.go | Complete rewrite of Kafka sink using Sarama library with new authentication, TLS configuration, and async producer implementation |
| config.go | Expanded KafkaSinkConfig structure with 20+ new fields for security, Kerberos, SSL/TLS, and producer settings |
| cmd/ktranslate/main.go | Added flag handling for all new Kafka configuration options (80+ lines of new cases) |
| go.mod | Added Shopify/sarama v1.38.1 and related dependencies, removed segmentio/kafka-go |
| go.sum | Updated dependency checksums for new and transitive dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pkg/sinks/kafka/kafa.go
Outdated
| "crypto/x509" | ||
| "flag" | ||
| "fmt" | ||
| "io/ioutil" |
There was a problem hiding this comment.
The io/ioutil package is deprecated since Go 1.16. Use os.ReadFile directly instead of ioutil.ReadFile, which is now located in the os package.
| "io/ioutil" |
| func (s *KafkaSink) handleResponses(ctx context.Context) { | ||
| for { | ||
| select { | ||
| case success := <-s.producer.Successes(): | ||
| if success != nil { | ||
| s.metrics.DeliveryWin.Mark(1) | ||
| } | ||
| case err := <-s.producer.Errors(): | ||
| if err != nil { | ||
| s.Errorf("Failed to produce message: %v", err.Err) | ||
| s.metrics.DeliveryErr.Mark(1) | ||
| } | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The handleResponses goroutine can leak if it doesn't exit properly when the producer is closed. The goroutine waits for ctx.Done(), but if the context is never cancelled, it will continue running even after the producer is closed in the Close() method. Consider storing the context or adding a mechanism to properly shut down this goroutine when closing the producer.
| if s.config.SSLCertFile != "" && s.config.SSLKeyFile != "" { | ||
| cert, err := tls.LoadX509KeyPair(s.config.SSLCertFile, s.config.SSLKeyFile) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to load client certificate: %v", err) | ||
| } | ||
| tlsConfig.Certificates = []tls.Certificate{cert} | ||
| } |
There was a problem hiding this comment.
The SSLKeyPassword configuration field is defined but never used in the TLS configuration. If encrypted private keys need to be supported, the password should be used when loading the key pair. However, tls.LoadX509KeyPair doesn't support encrypted keys directly - you would need to decrypt the key first using the password.
| SSLInsecure: false, | ||
| RequiredAcks: 1, | ||
| Compression: "none", | ||
| MaxMessageBytes: 1000000, |
There was a problem hiding this comment.
The default value FlushBytes: 64 * 1024 (65536 bytes) uses a magic number calculation. For consistency with the comment "Flush after this many bytes" and the flag default of 65536, this is correct, but consider defining a constant like DefaultFlushBytes = 64 * 1024 to make the intent clearer and avoid magic numbers.
| if s.config.SSLCertFile != "" && s.config.SSLKeyFile != "" { | ||
| cert, err := tls.LoadX509KeyPair(s.config.SSLCertFile, s.config.SSLKeyFile) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to load client certificate: %v", err) | ||
| } | ||
| tlsConfig.Certificates = []tls.Certificate{cert} | ||
| } |
There was a problem hiding this comment.
When both SSLCertFile and SSLKeyFile are provided for client certificate authentication, there's no validation that both files actually exist and are readable before attempting to load them. Unlike the Kerberos configuration which checks file existence with os.Stat, the TLS configuration relies on the error from tls.LoadX509KeyPair. Consider adding explicit file existence checks for consistency and better error messages.
pkg/sinks/kafka/kafa.go
Outdated
| // Version configuration - use a recent version that supports all features | ||
| version, err := sarama.ParseKafkaVersion("2.8.0") | ||
| if err != nil { | ||
| return fmt.Errorf("failed to parse kafka version: %v", err) |
There was a problem hiding this comment.
The Kafka version is hardcoded to "2.8.0". This might not be compatible with newer Kafka clusters or might miss features from newer versions. Consider making this configurable or dynamically detecting the broker version, especially since Kafka 3.x and 4.x have been released.
| // Version configuration - use a recent version that supports all features | |
| version, err := sarama.ParseKafkaVersion("2.8.0") | |
| if err != nil { | |
| return fmt.Errorf("failed to parse kafka version: %v", err) | |
| // Version configuration: allow override via KAFKA_VERSION env var, default to 2.8.0 | |
| versionStr := os.Getenv("KAFKA_VERSION") | |
| if versionStr == "" { | |
| versionStr = "2.8.0" | |
| } | |
| version, err := sarama.ParseKafkaVersion(versionStr) | |
| if err != nil { | |
| return fmt.Errorf("failed to parse kafka version %q: %v", versionStr, err) |
| var ( | ||
| topic string | ||
| bootstrapServers string | ||
| tlsConfig string | ||
| saslUser string | ||
| saslPass string | ||
| saslMech string | ||
| skipVerify bool | ||
| topic string | ||
| bootstrapServers string | ||
| kafkaSecurityProtocol string | ||
| kafkaSASLMechanism string | ||
| kafkaSASLUsername string | ||
| kafkaSASLPassword string | ||
| kafkaKerberosServiceName string | ||
| kafkaKerberosRealm string | ||
| kafkaKerberosConfigPath string | ||
| kafkaKerberosKeytabPath string | ||
| kafkaKerberosPrincipal string | ||
| kafkaKerberosDisablePAFXFAST bool | ||
| kafkaSSLCAFile string | ||
| kafkaSSLCertFile string | ||
| kafkaSSLKeyFile string | ||
| kafkaSSLKeyPassword string | ||
| kafkaSSLInsecure bool | ||
| kafkaRequiredAcks int | ||
| kafkaCompression string | ||
| kafkaMaxMessageBytes int | ||
| kafkaRetryMax int | ||
| kafkaFlushFrequency int | ||
| kafkaFlushMessages int | ||
| kafkaFlushBytes int |
There was a problem hiding this comment.
Package-level variables (lines 23-46) are defined but never used in this file. These appear to be flag variables that should be used to populate the config, but the actual flag parsing and config population happens in cmd/ktranslate/main.go. These unused variables add confusion and maintenance burden. Consider removing them if they're truly unused, or document their purpose if they're used elsewhere.
| func (s *KafkaSink) configureSASL(config *sarama.Config) error { | ||
| switch strings.ToUpper(s.config.SASLMechanism) { | ||
| case "PLAIN": | ||
| config.Net.SASL.Mechanism = sarama.SASLTypePlaintext |
There was a problem hiding this comment.
For PLAIN SASL mechanism, the code sets config.Net.SASL.Mechanism = sarama.SASLTypePlaintext, but this should be sarama.SASLTypePlain. The constant SASLTypePlaintext is for plaintext connections without SASL, not for the PLAIN SASL mechanism. This will cause PLAIN authentication to fail.
| config.Net.SASL.Mechanism = sarama.SASLTypePlaintext | |
| config.Net.SASL.Mechanism = sarama.SASLTypePlain |
| func NewSink(log logger.Underlying, registry go_metrics.Registry, cfg *ktranslate.KafkaSinkConfig) (*KafkaSink, error) { | ||
|
|
||
| ks := KafkaSink{ | ||
| return &KafkaSink{ | ||
| registry: registry, | ||
| ContextL: logger.NewContextLFromUnderlying(logger.SContext{S: "kafkaSink"}, log), | ||
| metrics: &KafkaMetric{ | ||
| DeliveryErr: go_metrics.GetOrRegisterMeter("delivery_errors_kafka", registry), | ||
| DeliveryWin: go_metrics.GetOrRegisterMeter("delivery_wins_kafka", registry), | ||
| }, | ||
| config: cfg, | ||
| }, nil |
There was a problem hiding this comment.
The NewSink function does not validate that the cfg parameter is not nil before dereferencing it to store in the struct. If a nil config is passed, this will cause a nil pointer dereference later in the Init method when accessing s.config.Topic and s.config.BootstrapServers. Add a nil check for the config parameter.
| return | ||
| } |
There was a problem hiding this comment.
The flag case "kafka_brokers" is handled in main.go (line 599), but there's no corresponding flag definition in the kafa.go init() function. This means the "kafka_brokers" flag won't be recognized by the flag parser. Either add the flag definition or remove this case if it's not needed.
8994ae1 to
66d2d0c
Compare
This is a merge of code from here:
https://github.com/mkrygeri/ktranslate/tree/chore/add-krb5-conf
Seeing if this backing kafka sink is better.