Skip to content

Commit 8994ae1

Browse files
committed
Switching to the samara kafka sink
1 parent c7b9c7f commit 8994ae1

File tree

5 files changed

+452
-151
lines changed

5 files changed

+452
-151
lines changed

cmd/ktranslate/main.go

Lines changed: 81 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -604,23 +604,94 @@ func applyFlags(cfg *ktranslate.Config) error {
604604
// pkg/sinks/kafka
605605
case "kafka_topic":
606606
cfg.KafkaSink.Topic = val
607+
case "kafka_brokers":
608+
cfg.KafkaSink.BootstrapServers = val
607609
case "bootstrap.servers":
608610
cfg.KafkaSink.BootstrapServers = val
609-
case "kafka.tls.config":
610-
cfg.KafkaSink.TlsConfig = val
611-
case "kafka.sasl.user":
612-
cfg.KafkaSink.SaslUser = val
613-
case "kafka.sasl.password":
614-
cfg.KafkaSink.SaslPass = val
615-
case "kafka.sasl.mechanism":
616-
cfg.KafkaSink.SaslMech = val
617-
case "kafka.tls.skip.verify":
611+
case "kafka_security_protocol":
612+
cfg.KafkaSink.SecurityProtocol = val
613+
case "kafka_sasl_mechanism":
614+
cfg.KafkaSink.SASLMechanism = val
615+
case "kafka_sasl_username":
616+
cfg.KafkaSink.SASLUsername = val
617+
case "kafka_sasl_password":
618+
cfg.KafkaSink.SASLPassword = val
619+
case "kafka_kerberos_service_name":
620+
cfg.KafkaSink.KerberosServiceName = val
621+
case "kafka_kerberos_realm":
622+
cfg.KafkaSink.KerberosRealm = val
623+
case "kafka_kerberos_config_path":
624+
cfg.KafkaSink.KerberosConfigPath = val
625+
case "kafka_kerberos_keytab_path":
626+
cfg.KafkaSink.KerberosKeytabPath = val
627+
case "kafka_kerberos_principal":
628+
cfg.KafkaSink.KerberosPrincipal = val
629+
case "kafka_kerberos_disable_pafx_fast":
618630
v, err := strconv.ParseBool(val)
619631
if err != nil {
620632
errCh <- err
621633
return
622634
}
623-
cfg.KafkaSink.SkipVerify = v
635+
cfg.KafkaSink.KerberosDisablePAFXFAST = v
636+
case "kafka_ssl_ca_file":
637+
cfg.KafkaSink.SSLCAFile = val
638+
case "kafka_ssl_cert_file":
639+
cfg.KafkaSink.SSLCertFile = val
640+
case "kafka_ssl_key_file":
641+
cfg.KafkaSink.SSLKeyFile = val
642+
case "kafka_ssl_key_password":
643+
cfg.KafkaSink.SSLKeyPassword = val
644+
case "kafka_ssl_insecure":
645+
v, err := strconv.ParseBool(val)
646+
if err != nil {
647+
errCh <- err
648+
return
649+
}
650+
cfg.KafkaSink.SSLInsecure = v
651+
case "kafka_required_acks":
652+
v, err := strconv.Atoi(val)
653+
if err != nil {
654+
errCh <- err
655+
return
656+
}
657+
cfg.KafkaSink.RequiredAcks = v
658+
case "kafka_compression":
659+
cfg.KafkaSink.Compression = val
660+
case "kafka_max_message_bytes":
661+
v, err := strconv.Atoi(val)
662+
if err != nil {
663+
errCh <- err
664+
return
665+
}
666+
cfg.KafkaSink.MaxMessageBytes = v
667+
case "kafka_retry_max":
668+
v, err := strconv.Atoi(val)
669+
if err != nil {
670+
errCh <- err
671+
return
672+
}
673+
cfg.KafkaSink.RetryMax = v
674+
case "kafka_flush_frequency":
675+
v, err := strconv.Atoi(val)
676+
if err != nil {
677+
errCh <- err
678+
return
679+
}
680+
cfg.KafkaSink.FlushFrequency = v
681+
case "kafka_flush_messages":
682+
v, err := strconv.Atoi(val)
683+
if err != nil {
684+
errCh <- err
685+
return
686+
}
687+
cfg.KafkaSink.FlushMessages = v
688+
case "kafka_flush_bytes":
689+
v, err := strconv.Atoi(val)
690+
if err != nil {
691+
errCh <- err
692+
return
693+
}
694+
cfg.KafkaSink.FlushBytes = v
624695
// pkg/sinks/kentik
625696
case "kentik_relay_url":
626697
cfg.KentikSink.RelayURL = val

config.go

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,11 +136,32 @@ type HTTPSinkConfig struct {
136136
type KafkaSinkConfig struct {
137137
Topic string
138138
BootstrapServers string
139-
TlsConfig string
140-
SaslUser string
141-
SaslPass string
142-
SaslMech string
143-
SkipVerify bool
139+
// Security settings
140+
SecurityProtocol string // PLAINTEXT, SASL_PLAINTEXT, SASL_SSL, SSL
141+
SASLMechanism string // PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI (Kerberos), OAUTHBEARER
142+
SASLUsername string
143+
SASLPassword string
144+
// Kerberos (GSSAPI) settings
145+
KerberosServiceName string // Usually "kafka"
146+
KerberosRealm string
147+
KerberosConfigPath string // Path to krb5.conf
148+
KerberosKeytabPath string // Path to keytab file
149+
KerberosPrincipal string // Kerberos principal
150+
KerberosDisablePAFXFAST bool // Disable PA-FX-FAST
151+
// SSL/TLS settings
152+
SSLCAFile string // CA certificate file
153+
SSLCertFile string // Client certificate file
154+
SSLKeyFile string // Client private key file
155+
SSLKeyPassword string // Private key password
156+
SSLInsecure bool // Skip certificate verification
157+
// Producer settings
158+
RequiredAcks int // 0=NoResponse, 1=WaitForLocal, -1=WaitForAll
159+
Compression string // none, gzip, snappy, lz4, zstd
160+
MaxMessageBytes int // Maximum message size
161+
RetryMax int // Maximum retries
162+
FlushFrequency int // Flush frequency in milliseconds
163+
FlushMessages int // Flush after this many messages
164+
FlushBytes int // Flush after this many bytes
144165
}
145166

146167
// KentikSinkConfig is the config for the Kentik sink
@@ -477,13 +498,30 @@ func DefaultConfig() *Config {
477498
TimeoutInSeconds: 30,
478499
},
479500
KafkaSink: &KafkaSinkConfig{
480-
Topic: "",
481-
BootstrapServers: "",
482-
TlsConfig: "",
483-
SaslUser: "",
484-
SaslPass: "",
485-
SaslMech: "",
486-
SkipVerify: false,
501+
Topic: "",
502+
BootstrapServers: "",
503+
SecurityProtocol: "PLAINTEXT",
504+
SASLMechanism: "",
505+
SASLUsername: "",
506+
SASLPassword: "",
507+
KerberosServiceName: "kafka",
508+
KerberosRealm: "",
509+
KerberosConfigPath: "/etc/krb5.conf",
510+
KerberosKeytabPath: "",
511+
KerberosPrincipal: "",
512+
KerberosDisablePAFXFAST: false,
513+
SSLCAFile: "",
514+
SSLCertFile: "",
515+
SSLKeyFile: "",
516+
SSLKeyPassword: "",
517+
SSLInsecure: false,
518+
RequiredAcks: 1,
519+
Compression: "none",
520+
MaxMessageBytes: 1000000,
521+
RetryMax: 3,
522+
FlushFrequency: 100,
523+
FlushMessages: 100,
524+
FlushBytes: 64 * 1024,
487525
},
488526
KentikSink: &KentikSinkConfig{
489527
RelayURL: "",

go.mod

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.6.0
1010
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1
1111
github.com/DataDog/datadog-api-client-go/v2 v2.5.0
12+
github.com/Shopify/sarama v1.38.1
1213
github.com/agoda-com/opentelemetry-go/otelslog v0.1.1
1314
github.com/agoda-com/opentelemetry-logs-go v0.5.0
1415
github.com/aristanetworks/goeapi v0.6.0
@@ -45,7 +46,6 @@ require (
4546
github.com/prometheus/client_golang v1.19.0
4647
github.com/prometheus/prometheus v0.46.0
4748
github.com/sasha-s/go-hll v0.0.0-20180522065212-c6eb27aee351
48-
github.com/segmentio/kafka-go v0.4.23
4949
github.com/stretchr/testify v1.11.1
5050
github.com/syndtr/goleveldb v1.0.0
5151
github.com/xitongsys/parquet-go v1.6.2
@@ -89,6 +89,9 @@ require (
8989
github.com/cyphar/filepath-securejoin v0.6.1 // indirect
9090
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
9191
github.com/dgryski/go-bits v0.0.0-20180113010104-bd8a69a71dc2 // indirect
92+
github.com/eapache/go-resiliency v1.3.0 // indirect
93+
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
94+
github.com/eapache/queue v1.1.0 // indirect
9295
github.com/emirpasic/gods v1.18.1 // indirect
9396
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
9497
github.com/facebookgo/limitgroup v0.0.0-20150612190941-6abd8d71ec01 // indirect
@@ -114,6 +117,14 @@ require (
114117
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
115118
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
116119
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 // indirect
120+
github.com/hashicorp/errwrap v1.1.0 // indirect
121+
github.com/hashicorp/go-multierror v1.1.1 // indirect
122+
github.com/hashicorp/go-uuid v1.0.3 // indirect
123+
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
124+
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
125+
github.com/jcmturner/gofork v1.7.6 // indirect
126+
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
127+
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
117128
github.com/jmespath/go-jmespath v0.4.0 // indirect
118129
github.com/josharian/intern v1.0.0 // indirect
119130
github.com/kevinburke/ssh_config v1.5.0 // indirect
@@ -132,22 +143,20 @@ require (
132143
github.com/opentracing/opentracing-go v1.2.0 // indirect
133144
github.com/oschwald/maxminddb-golang v1.11.0 // indirect
134145
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 // indirect
135-
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
136146
github.com/pierrec/lz4/v4 v4.1.17 // indirect
137147
github.com/pjbgf/sha1cd v0.5.0 // indirect
138148
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
139149
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
140150
github.com/prometheus/client_model v0.5.0 // indirect
141151
github.com/prometheus/common v0.48.0 // indirect
142152
github.com/prometheus/procfs v0.12.0 // indirect
153+
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
143154
github.com/sergi/go-diff v1.4.0 // indirect
144155
github.com/sirupsen/logrus v1.9.3 // indirect
145156
github.com/tinylib/msgp v1.1.6 // indirect
146157
github.com/vaughan0/go-ini v0.0.0-20130923145212-a98ad7ee00ec // indirect
147158
github.com/vmihailenco/msgpack/v5 v5.3.4 // indirect
148159
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
149-
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
150-
github.com/xdg/stringprep v1.0.0 // indirect
151160
go.mongodb.org/mongo-driver v1.12.1 // indirect
152161
go.opencensus.io v0.24.0 // indirect
153162
go.opentelemetry.io/auto/sdk v1.1.0 // indirect

0 commit comments

Comments
 (0)