Skip to content

Commit 3c3c1f3

Browse files
authored
NETOBSERV-974 Implement SASL for Kafka (producer+consumer) (#424)
* Implement SASL for Kafka (producer+consumer) * fix test
1 parent a429d4f commit 3c3c1f3

File tree

38 files changed

+5052
-24
lines changed

38 files changed

+5052
-24
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ require (
9393
github.com/spf13/jwalterweatherman v1.1.0 // indirect
9494
github.com/stretchr/objx v0.5.0 // indirect
9595
github.com/subosito/gotenv v1.2.0 // indirect
96+
github.com/xdg/scram v1.0.5 // indirect
97+
github.com/xdg/stringprep v1.0.3 // indirect
9698
go.uber.org/atomic v1.9.0 // indirect
9799
golang.org/x/crypto v0.5.0 // indirect
98100
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 // indirect

pkg/api/encode_kafka.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
package api
1919

2020
type EncodeKafka struct {
21-
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
22-
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
23-
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
24-
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
25-
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
26-
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
27-
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
28-
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
21+
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
22+
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
23+
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
24+
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
25+
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
26+
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
27+
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
28+
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
29+
SASL *SASLConfig `yaml:"sasl" json:"sasl" doc:"SASL configuration (optional)"`
2930
}
3031

3132
type KafkaEncodeBalancerEnum struct {

pkg/api/enum.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type enums struct {
2828
TransformFilterOperationEnum TransformFilterOperationEnum
2929
TransformGenericOperationEnum TransformGenericOperationEnum
3030
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
31+
SASLTypeEnum SASLTypeEnum
3132
ConnTrackOperationEnum ConnTrackOperationEnum
3233
ConnTrackOutputRecordTypeEnum ConnTrackOutputRecordTypeEnum
3334
DecoderEnum DecoderEnum

pkg/api/ingest_kafka.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,17 @@
1818
package api
1919

2020
type IngestKafka struct {
21-
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
22-
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
23-
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
24-
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
25-
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
26-
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
27-
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
28-
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
29-
PullQueueCapacity int `yaml:"pullQueueCapacity,omitempty" json:"pullQueueCapacity,omitempty" doc:"the capacity of the queue use to store pulled flows"`
30-
PullMaxBytes int `yaml:"pullMaxBytes,omitempty" json:"pullMaxBytes,omitempty" doc:"the maximum number of bytes being pulled from kafka"`
31-
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
32-
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
21+
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
22+
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
23+
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
24+
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
25+
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
26+
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
27+
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
28+
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
29+
PullQueueCapacity int `yaml:"pullQueueCapacity,omitempty" json:"pullQueueCapacity,omitempty" doc:"the capacity of the queue use to store pulled flows"`
30+
PullMaxBytes int `yaml:"pullMaxBytes,omitempty" json:"pullMaxBytes,omitempty" doc:"the maximum number of bytes being pulled from kafka"`
31+
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
32+
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
33+
SASL *SASLConfig `yaml:"sasl" json:"sasl" doc:"SASL configuration (optional)"`
3334
}

pkg/api/sasl.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package api
2+
3+
type SASLConfig struct {
4+
Type string
5+
ClientIDPath string `yaml:"clientIDPath,omitempty" json:"clientIDPath,omitempty" doc:"path to the client ID / SASL username"`
6+
ClientSecretPath string `yaml:"clientSecretPath,omitempty" json:"clientSecretPath,omitempty" doc:"path to the client secret / SASL password"`
7+
}
8+
9+
type SASLTypeEnum struct {
10+
Plain string `yaml:"plain" json:"plain" doc:"Plain SASL"`
11+
ScramSHA512 string `yaml:"scramSHA512" json:"scramSHA512" doc:"SCRAM/SHA512 SASL"`
12+
}
13+
14+
func SASLTypeName(operation string) string {
15+
return GetEnumName(SASLTypeEnum{}, operation)
16+
}

pkg/api/tls.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,5 @@ func (c *ClientTLS) Build() (*tls.Config, error) {
6262
}
6363
return tlsConfig, nil
6464
}
65-
return nil, nil
65+
return tlsConfig, nil
6666
}

pkg/config/pipeline_builder_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func TestKafkaPromPipeline(t *testing.T) {
150150

151151
b, err = json.Marshal(params[0])
152152
require.NoError(t, err)
153-
require.JSONEq(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"},"tls":{"insecureSkipVerify":true,"caCertPath":"/ca.crt"}}}}`, string(b))
153+
require.JSONEq(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"},"sasl":null,"tls":{"insecureSkipVerify":true,"caCertPath":"/ca.crt"}}}}`, string(b))
154154

155155
b, err = json.Marshal(params[1])
156156
require.NoError(t, err)

pkg/pipeline/encode/encode_kafka.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import (
2424
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2525
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2626
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
27+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
2728
"github.com/prometheus/client_golang/prometheus"
28-
"github.com/segmentio/kafka-go"
2929
kafkago "github.com/segmentio/kafka-go"
3030
log "github.com/sirupsen/logrus"
3131
"golang.org/x/net/context"
@@ -95,7 +95,7 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E
9595
writeTimeoutSecs = config.WriteTimeout
9696
}
9797

98-
transport := kafka.Transport{}
98+
transport := kafkago.Transport{}
9999
if config.TLS != nil {
100100
log.Infof("Using TLS configuration: %v", config.TLS)
101101
tlsConfig, err := config.TLS.Build()
@@ -105,6 +105,14 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E
105105
transport.TLS = tlsConfig
106106
}
107107

108+
if config.SASL != nil {
109+
m, err := utils.SetupSASLMechanism(config.SASL)
110+
if err != nil {
111+
return nil, err
112+
}
113+
transport.SASL = m
114+
}
115+
108116
// connect to the kafka server
109117
kafkaWriter := kafkago.Writer{
110118
Addr: kafkago.TCP(config.Address),

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,14 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I
232232
dialer.TLS = tlsConfig
233233
}
234234

235+
if jsonIngestKafka.SASL != nil {
236+
m, err := utils.SetupSASLMechanism(jsonIngestKafka.SASL)
237+
if err != nil {
238+
return nil, err
239+
}
240+
dialer.SASLMechanism = m
241+
}
242+
235243
readerConfig := kafkago.ReaderConfig{
236244
Brokers: jsonIngestKafka.Brokers,
237245
Topic: jsonIngestKafka.Topic,

pkg/pipeline/utils/sasl.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"strings"
7+
8+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
9+
"github.com/segmentio/kafka-go/sasl"
10+
"github.com/segmentio/kafka-go/sasl/plain"
11+
"github.com/segmentio/kafka-go/sasl/scram"
12+
)
13+
14+
func SetupSASLMechanism(cfg *api.SASLConfig) (sasl.Mechanism, error) {
15+
// Read client ID
16+
id, err := os.ReadFile(cfg.ClientIDPath)
17+
if err != nil {
18+
return nil, err
19+
}
20+
strId := strings.TrimSpace(string(id))
21+
// Read password
22+
pwd, err := os.ReadFile(cfg.ClientSecretPath)
23+
if err != nil {
24+
return nil, err
25+
}
26+
strPwd := strings.TrimSpace(string(pwd))
27+
var mechanism sasl.Mechanism
28+
switch cfg.Type {
29+
case api.SASLTypeName("Plain"):
30+
mechanism = plain.Mechanism{Username: strId, Password: strPwd}
31+
case api.SASLTypeName("ScramSHA512"):
32+
mechanism, err = scram.Mechanism(scram.SHA512, strId, strPwd)
33+
default:
34+
return nil, fmt.Errorf("Unknown SASL type: %s", cfg.Type)
35+
}
36+
if err != nil {
37+
return nil, err
38+
}
39+
return mechanism, nil
40+
}

0 commit comments

Comments
 (0)