Skip to content

Commit 3dbc423

Browse files
authored
NETOBSERV-397 Kafka TLS (#256)
* Kafka TLS - Add API / config - Read certificates / prepare TLS config for HTTP transport * Allow insecure + ca cert * Add TLS to kafka transformer (ingester) * Remove currently unused server tls config * Add TLS config tests * Use explicit defaults from DefaultDialer
1 parent 8469fe0 commit 3dbc423

File tree

10 files changed

+364
-29
lines changed

10 files changed

+364
-29
lines changed

pkg/api/encode_kafka.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package api
1919

2020
type EncodeKafka struct {
21-
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
22-
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
23-
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
24-
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
25-
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
26-
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
27-
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
21+
Address string `yaml:"address" json:"address" doc:"address of kafka server"`
22+
Topic string `yaml:"topic" json:"topic" doc:"kafka topic to write to"`
23+
Balancer string `yaml:"balancer,omitempty" json:"balancer,omitempty" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
24+
WriteTimeout int64 `yaml:"writeTimeout,omitempty" json:"writeTimeout,omitempty" doc:"timeout (in seconds) for write operation performed by the Writer"`
25+
ReadTimeout int64 `yaml:"readTimeout,omitempty" json:"readTimeout,omitempty" doc:"timeout (in seconds) for read operation performed by the Writer"`
26+
BatchBytes int64 `yaml:"batchBytes,omitempty" json:"batchBytes,omitempty" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
27+
BatchSize int `yaml:"batchSize,omitempty" json:"batchSize,omitempty" doc:"limit on how many messages will be buffered before being sent to a partition"`
28+
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
2829
}
2930

3031
type KafkaEncodeBalancerEnum struct {

pkg/api/ingest_kafka.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package api
1919

2020
type IngestKafka struct {
21-
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
22-
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
23-
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
24-
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
25-
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
26-
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
27-
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
28-
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
29-
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
21+
Brokers []string `yaml:"brokers,omitempty" json:"brokers,omitempty" doc:"list of kafka broker addresses"`
22+
Topic string `yaml:"topic,omitempty" json:"topic,omitempty" doc:"kafka topic to listen on"`
23+
GroupId string `yaml:"groupid,omitempty" json:"groupid,omitempty" doc:"separate groupid for each consumer on specified topic"`
24+
GroupBalancers []string `yaml:"groupBalancers,omitempty" json:"groupBalancers,omitempty" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
25+
StartOffset string `yaml:"startOffset,omitempty" json:"startOffset,omitempty" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
26+
BatchReadTimeout int64 `yaml:"batchReadTimeout,omitempty" json:"batchReadTimeout,omitempty" doc:"how often (in milliseconds) to process input"`
27+
Decoder Decoder `yaml:"decoder,omitempty" json:"decoder" doc:"decoder to use (E.g. json or protobuf)"`
28+
BatchMaxLen int `yaml:"batchMaxLen,omitempty" json:"batchMaxLen,omitempty" doc:"the number of accumulated flows before being forwarded for processing"`
29+
CommitInterval int64 `yaml:"commitInterval,omitempty" json:"commitInterval,omitempty" doc:"the interval (in milliseconds) at which offsets are committed to the broker. If 0, commits will be handled synchronously."`
30+
TLS *ClientTLS `yaml:"tls" json:"tls" doc:"TLS client configuration (optional)"`
3031
}

pkg/api/tls.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
import (
21+
"crypto/tls"
22+
"crypto/x509"
23+
"errors"
24+
"io/ioutil"
25+
)
26+
27+
type ClientTLS struct {
28+
InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty" json:"insecureSkipVerify,omitempty" doc:"skip client verifying the server's certificate chain and host name"`
29+
CACertPath string `yaml:"caCertPath,omitempty" json:"caCertPath,omitempty" doc:"path to the CA certificate"`
30+
UserCertPath string `yaml:"userCertPath,omitempty" json:"userCertPath,omitempty" doc:"path to the user certificate"`
31+
UserKeyPath string `yaml:"userKeyPath,omitempty" json:"userKeyPath,omitempty" doc:"path to the user private key"`
32+
}
33+
34+
func (c *ClientTLS) Build() (*tls.Config, error) {
35+
tlsConfig := &tls.Config{
36+
InsecureSkipVerify: c.InsecureSkipVerify,
37+
}
38+
if c.CACertPath != "" {
39+
caCert, err := ioutil.ReadFile(c.CACertPath)
40+
if err != nil {
41+
return nil, err
42+
}
43+
tlsConfig.RootCAs = x509.NewCertPool()
44+
tlsConfig.RootCAs.AppendCertsFromPEM(caCert)
45+
46+
if c.UserCertPath != "" && c.UserKeyPath != "" {
47+
userCert, err := ioutil.ReadFile(c.UserCertPath)
48+
if err != nil {
49+
return nil, err
50+
}
51+
userKey, err := ioutil.ReadFile(c.UserKeyPath)
52+
if err != nil {
53+
return nil, err
54+
}
55+
pair, err := tls.X509KeyPair([]byte(userCert), []byte(userKey))
56+
if err != nil {
57+
return nil, err
58+
}
59+
tlsConfig.Certificates = []tls.Certificate{pair}
60+
} else if c.UserCertPath != "" || c.UserKeyPath != "" {
61+
return nil, errors.New("userCertPath and userKeyPath must be both present or both absent.")
62+
}
63+
return tlsConfig, nil
64+
}
65+
return nil, nil
66+
}

pkg/config/pipeline_builder_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ func TestKafkaPromPipeline(t *testing.T) {
9898
Topic: "netflows",
9999
GroupId: "my-group",
100100
Decoder: api.Decoder{Type: "json"},
101+
TLS: &api.ClientTLS{
102+
InsecureSkipVerify: true,
103+
CACertPath: "/ca.crt",
104+
},
101105
})
102106
pl = pl.TransformFilter("filter", api.TransformFilter{
103107
Rules: []api.TransformFilterRule{{
@@ -137,7 +141,7 @@ func TestKafkaPromPipeline(t *testing.T) {
137141

138142
b, err = json.Marshal(params[0])
139143
require.NoError(t, err)
140-
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"}}}}`, string(b))
144+
require.Equal(t, `{"name":"ingest","ingest":{"type":"kafka","kafka":{"brokers":["http://kafka"],"topic":"netflows","groupid":"my-group","decoder":{"type":"json"},"tls":{"insecureSkipVerify":true,"caCertPath":"/ca.crt"}}}}`, string(b))
141145

142146
b, err = json.Marshal(params[1])
143147
require.NoError(t, err)

pkg/pipeline/encode/encode_kafka.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2525
"github.com/netobserv/flowlogs-pipeline/pkg/config"
26+
"github.com/segmentio/kafka-go"
2627
kafkago "github.com/segmentio/kafka-go"
2728
log "github.com/sirupsen/logrus"
2829
"golang.org/x/net/context"
@@ -66,13 +67,13 @@ func (r *encodeKafka) Encode(in []config.GenericMap) {
6667
// NewEncodeKafka create a new writer to kafka
6768
func NewEncodeKafka(params config.StageParam) (Encoder, error) {
6869
log.Debugf("entering NewEncodeKafka")
69-
jsonEncodeKafka := api.EncodeKafka{}
70+
config := api.EncodeKafka{}
7071
if params.Encode != nil && params.Encode.Kafka != nil {
71-
jsonEncodeKafka = *params.Encode.Kafka
72+
config = *params.Encode.Kafka
7273
}
7374

7475
var balancer kafkago.Balancer
75-
switch jsonEncodeKafka.Balancer {
76+
switch config.Balancer {
7677
case api.KafkaEncodeBalancerName("RoundRobin"):
7778
balancer = &kafkago.RoundRobin{}
7879
case api.KafkaEncodeBalancerName("LeastBytes"):
@@ -88,31 +89,42 @@ func NewEncodeKafka(params config.StageParam) (Encoder, error) {
8889
}
8990

9091
readTimeoutSecs := defaultReadTimeoutSeconds
91-
if jsonEncodeKafka.ReadTimeout != 0 {
92-
readTimeoutSecs = jsonEncodeKafka.ReadTimeout
92+
if config.ReadTimeout != 0 {
93+
readTimeoutSecs = config.ReadTimeout
9394
}
9495

9596
writeTimeoutSecs := defaultWriteTimeoutSeconds
96-
if jsonEncodeKafka.WriteTimeout != 0 {
97-
writeTimeoutSecs = jsonEncodeKafka.WriteTimeout
97+
if config.WriteTimeout != 0 {
98+
writeTimeoutSecs = config.WriteTimeout
99+
}
100+
101+
transport := kafka.Transport{}
102+
if config.TLS != nil {
103+
log.Infof("Using TLS configuration: %v", config.TLS)
104+
tlsConfig, err := config.TLS.Build()
105+
if err != nil {
106+
return nil, err
107+
}
108+
transport.TLS = tlsConfig
98109
}
99110

100111
// connect to the kafka server
101112
kafkaWriter := kafkago.Writer{
102-
Addr: kafkago.TCP(jsonEncodeKafka.Address),
103-
Topic: jsonEncodeKafka.Topic,
113+
Addr: kafkago.TCP(config.Address),
114+
Topic: config.Topic,
104115
Balancer: balancer,
105116
ReadTimeout: time.Duration(readTimeoutSecs) * time.Second,
106117
WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second,
107-
BatchSize: jsonEncodeKafka.BatchSize,
108-
BatchBytes: jsonEncodeKafka.BatchBytes,
118+
BatchSize: config.BatchSize,
119+
BatchBytes: config.BatchBytes,
109120
// Temporary fix may be we should implement a batching systems
110121
// https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403
111122
BatchTimeout: time.Nanosecond,
123+
Transport: &transport,
112124
}
113125

114126
return &encodeKafka{
115-
kafkaParams: jsonEncodeKafka,
127+
kafkaParams: config,
116128
kafkaWriter: &kafkaWriter,
117129
}, nil
118130
}

pkg/pipeline/encode/encode_kafka_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"testing"
2323

24+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2425
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2526
"github.com/netobserv/flowlogs-pipeline/pkg/test"
2627
kafkago "github.com/segmentio/kafka-go"
@@ -90,3 +91,60 @@ func Test_EncodeKafka(t *testing.T) {
9091
}
9192
require.Equal(t, expectedOutput, receivedData[0])
9293
}
94+
95+
func Test_TLSConfigEmpty(t *testing.T) {
96+
pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{})
97+
pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{
98+
Address: "any",
99+
Topic: "topic",
100+
})
101+
newEncode, err := NewEncodeKafka(pipeline.GetStageParams()[1])
102+
require.NoError(t, err)
103+
tlsConfig := newEncode.(*encodeKafka).kafkaWriter.(*kafkago.Writer).Transport.(*kafkago.Transport).TLS
104+
require.Nil(t, tlsConfig)
105+
}
106+
107+
func Test_TLSConfigCA(t *testing.T) {
108+
ca, cleanup := test.CreateCACert(t)
109+
defer cleanup()
110+
pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{})
111+
pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{
112+
Address: "any",
113+
Topic: "topic",
114+
TLS: &api.ClientTLS{
115+
CACertPath: ca,
116+
},
117+
})
118+
newEncode, err := NewEncodeKafka(pipeline.GetStageParams()[1])
119+
require.NoError(t, err)
120+
tlsConfig := newEncode.(*encodeKafka).kafkaWriter.(*kafkago.Writer).Transport.(*kafkago.Transport).TLS
121+
122+
require.Empty(t, tlsConfig.Certificates)
123+
require.NotNil(t, tlsConfig.RootCAs)
124+
require.Len(t, tlsConfig.RootCAs.Subjects(), 1)
125+
}
126+
127+
func Test_MutualTLSConfig(t *testing.T) {
128+
ca, user, userKey, cleanup := test.CreateAllCerts(t)
129+
defer cleanup()
130+
pipeline := config.NewCollectorPipeline("ingest", api.IngestCollector{})
131+
pipeline.EncodeKafka("encode-kafka", api.EncodeKafka{
132+
Address: "any",
133+
Topic: "topic",
134+
TLS: &api.ClientTLS{
135+
CACertPath: ca,
136+
UserCertPath: user,
137+
UserKeyPath: userKey,
138+
},
139+
})
140+
newEncode, err := NewEncodeKafka(pipeline.GetStageParams()[1])
141+
require.NoError(t, err)
142+
143+
tlsConfig := newEncode.(*encodeKafka).kafkaWriter.(*kafkago.Writer).Transport.(*kafkago.Transport).TLS
144+
145+
require.Len(t, tlsConfig.Certificates, 1)
146+
require.NotEmpty(t, tlsConfig.Certificates[0].Certificate)
147+
require.NotNil(t, tlsConfig.Certificates[0].PrivateKey)
148+
require.NotNil(t, tlsConfig.RootCAs)
149+
require.Len(t, tlsConfig.RootCAs.Subjects(), 1)
150+
}

pkg/pipeline/ingest/ingest_kafka.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2626
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/decode"
2727
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
28+
"github.com/segmentio/kafka-go"
2829
kafkago "github.com/segmentio/kafka-go"
2930
log "github.com/sirupsen/logrus"
3031
"golang.org/x/net/context"
@@ -172,13 +173,27 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
172173
commitInterval = jsonIngestKafka.CommitInterval
173174
}
174175

176+
dialer := &kafka.Dialer{
177+
Timeout: kafka.DefaultDialer.Timeout,
178+
DualStack: kafka.DefaultDialer.DualStack,
179+
}
180+
if jsonIngestKafka.TLS != nil {
181+
log.Infof("Using TLS configuration: %v", jsonIngestKafka.TLS)
182+
tlsConfig, err := jsonIngestKafka.TLS.Build()
183+
if err != nil {
184+
return nil, err
185+
}
186+
dialer.TLS = tlsConfig
187+
}
188+
175189
kafkaReader := kafkago.NewReader(kafkago.ReaderConfig{
176190
Brokers: jsonIngestKafka.Brokers,
177191
Topic: jsonIngestKafka.Topic,
178192
GroupID: jsonIngestKafka.GroupId,
179193
GroupBalancers: groupBalancers,
180194
StartOffset: startOffset,
181195
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
196+
Dialer: dialer,
182197
})
183198
if kafkaReader == nil {
184199
errMsg := "NewIngestKafka: failed to create kafka-go reader"

pkg/pipeline/ingest/ingest_kafka_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222
"time"
2323

24+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2425
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2526
"github.com/netobserv/flowlogs-pipeline/pkg/test"
2627
kafkago "github.com/segmentio/kafka-go"
@@ -249,3 +250,61 @@ func Test_BatchTimeout(t *testing.T) {
249250
require.LessOrEqual(t, int64(100), afterIngest.Sub(beforeIngest).Milliseconds())
250251
require.Greater(t, int64(120), afterIngest.Sub(beforeIngest).Milliseconds())
251252
}
253+
254+
func Test_TLSConfigEmpty(t *testing.T) {
255+
stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{
256+
Brokers: []string{"any"},
257+
Topic: "topic",
258+
Decoder: api.Decoder{Type: "json"},
259+
})
260+
newIngest, err := NewIngestKafka(stage.GetStageParams()[0])
261+
require.NoError(t, err)
262+
tlsConfig := newIngest.(*ingestKafka).kafkaReader.Config().Dialer.TLS
263+
require.Nil(t, tlsConfig)
264+
}
265+
266+
func Test_TLSConfigCA(t *testing.T) {
267+
ca, cleanup := test.CreateCACert(t)
268+
defer cleanup()
269+
stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{
270+
Brokers: []string{"any"},
271+
Topic: "topic",
272+
Decoder: api.Decoder{Type: "json"},
273+
TLS: &api.ClientTLS{
274+
CACertPath: ca,
275+
},
276+
})
277+
newIngest, err := NewIngestKafka(stage.GetStageParams()[0])
278+
require.NoError(t, err)
279+
280+
tlsConfig := newIngest.(*ingestKafka).kafkaReader.Config().Dialer.TLS
281+
282+
require.Empty(t, tlsConfig.Certificates)
283+
require.NotNil(t, tlsConfig.RootCAs)
284+
require.Len(t, tlsConfig.RootCAs.Subjects(), 1)
285+
}
286+
287+
func Test_MutualTLSConfig(t *testing.T) {
288+
ca, user, userKey, cleanup := test.CreateAllCerts(t)
289+
defer cleanup()
290+
stage := config.NewKafkaPipeline("ingest-kafka", api.IngestKafka{
291+
Brokers: []string{"any"},
292+
Topic: "topic",
293+
Decoder: api.Decoder{Type: "json"},
294+
TLS: &api.ClientTLS{
295+
CACertPath: ca,
296+
UserCertPath: user,
297+
UserKeyPath: userKey,
298+
},
299+
})
300+
newIngest, err := NewIngestKafka(stage.GetStageParams()[0])
301+
require.NoError(t, err)
302+
303+
tlsConfig := newIngest.(*ingestKafka).kafkaReader.Config().Dialer.TLS
304+
305+
require.Len(t, tlsConfig.Certificates, 1)
306+
require.NotEmpty(t, tlsConfig.Certificates[0].Certificate)
307+
require.NotNil(t, tlsConfig.Certificates[0].PrivateKey)
308+
require.NotNil(t, tlsConfig.RootCAs)
309+
require.Len(t, tlsConfig.RootCAs.Subjects(), 1)
310+
}

0 commit comments

Comments
 (0)