Skip to content

Commit 036919c

Browse files
authored
Merge pull request #44 from jotak/kafka-tls
NETOBSERV-397 Implement TLS for Kafka connection in the agent
2 parents 83db69e + 6b2189b commit 036919c

File tree

5 files changed

+69
-0
lines changed

5 files changed

+69
-0
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ egress flows on a Linux host (required a Kernel 4.18+ with eBPF enabled).
1717
make build
1818
```
1919

20+
To build the agent image and push it to your Docker / Quay repository, run:
21+
22+
```bash
23+
IMG=quay.io/myaccount/netobserv-ebpf-agent:dev make image-build image-push
24+
```
25+
2026
## How to configure
2127

2228
The eBPF Agent is configured by means of environment variables. Check the

docs/config.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,10 @@ The following environment variables are available to configure the NetObserv eBF
4646
means that errors are ignored since the caller will not receive the returned value.
4747
* `KAFKA_COMPRESSION` (default: `none`). Compression codec to be used to compress messages. Accepted
4848
values: `none`, `gzip`, `snappy`, `lz4`, `zstd`.
49+
* `KAFKA_ENABLE_TLS` (default: false). If `true`, enable TLS encryption for Kafka messages. The following settings are used only when TLS is enabled:
50+
* `KAFKA_TLS_INSECURE_SKIP_VERIFY` (default: false). Skips server certificate verification in TLS connections.
51+
* `KAFKA_TLS_CA_CERT_PATH` (default: unset). Path to the Kafka server certificate for TLS connections.
52+
* `KAFKA_TLS_USER_CERT_PATH` (default: unset). Path to the user (client) certificate for mutual TLS connections.
53+
* `KAFKA_TLS_USER_KEY_PATH` (default: unset). Path to the user (client) private key for mutual TLS connections.
4954
* `PROFILE_PORT` (default: unset). Sets the listening port for [Go's Pprof tool](https://pkg.go.dev/net/http/pprof).
5055
If it is not set, profile is disabled.

pkg/agent/agent.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
100100
return nil, fmt.Errorf("wrong Kafka compression value %s. Admitted values are "+
101101
"none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err)
102102
}
103+
transport := kafkago.Transport{}
104+
if cfg.KafkaEnableTLS {
105+
tlsConfig, err := buildTLSConfig(cfg)
106+
if err != nil {
107+
return nil, err
108+
}
109+
transport.TLS = tlsConfig
110+
}
103111
exportFunc = (&exporter.KafkaJSON{
104112
Writer: &kafkago.Writer{
105113
Addr: kafkago.TCP(cfg.KafkaBrokers...),
@@ -114,6 +122,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
114122
BatchBytes: cfg.KafkaBatchBytes,
115123
Async: cfg.KafkaAsync,
116124
Compression: compression,
125+
Transport: &transport,
117126
},
118127
}).ExportFlows
119128
default:

pkg/agent/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ type Config struct {
6767
// KafkaCompression sets the compression codec to be used to compress messages. The accepted
6868
// values are: none (default), gzip, snappy, lz4, zstd.
6969
KafkaCompression string `env:"KAFKA_COMPRESSION" envDefault:"none"`
70+
// KafkaEnableTLS set true to enable TLS
71+
KafkaEnableTLS bool `env:"KAFKA_ENABLE_TLS" envDefault:"false"`
72+
// KafkaTLSInsecureSkipVerify skips server certificate verification in TLS connections
73+
KafkaTLSInsecureSkipVerify bool `env:"KAFKA_TLS_INSECURE_SKIP_VERIFY" envDefault:"false"`
74+
// KafkaTLSCACertPath is the path to the Kafka server certificate for TLS connections
75+
KafkaTLSCACertPath string `env:"KAFKA_TLS_CA_CERT_PATH"`
76+
// KafkaTLSUserCertPath is the path to the user (client) certificate for mTLS connections
77+
KafkaTLSUserCertPath string `env:"KAFKA_TLS_USER_CERT_PATH"`
78+
// KafkaTLSUserKeyPath is the path to the user (client) private key for mTLS connections
79+
KafkaTLSUserKeyPath string `env:"KAFKA_TLS_USER_KEY_PATH"`
7080
// ProfilePort sets the listening port for Go's Pprof tool. If it is not set, profile is disabled
7181
ProfilePort int `env:"PROFILE_PORT"`
7282
}

pkg/agent/tls.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package agent
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
"io/ioutil"
7+
)
8+
9+
func buildTLSConfig(cfg *Config) (*tls.Config, error) {
10+
tlsConfig := &tls.Config{
11+
InsecureSkipVerify: cfg.KafkaTLSInsecureSkipVerify,
12+
}
13+
if cfg.KafkaTLSCACertPath != "" {
14+
caCert, err := ioutil.ReadFile(cfg.KafkaTLSCACertPath)
15+
if err != nil {
16+
return nil, err
17+
}
18+
tlsConfig.RootCAs = x509.NewCertPool()
19+
tlsConfig.RootCAs.AppendCertsFromPEM(caCert)
20+
21+
if cfg.KafkaTLSUserCertPath != "" && cfg.KafkaTLSUserKeyPath != "" {
22+
userCert, err := ioutil.ReadFile(cfg.KafkaTLSUserCertPath)
23+
if err != nil {
24+
return nil, err
25+
}
26+
userKey, err := ioutil.ReadFile(cfg.KafkaTLSUserKeyPath)
27+
if err != nil {
28+
return nil, err
29+
}
30+
pair, err := tls.X509KeyPair([]byte(userCert), []byte(userKey))
31+
if err != nil {
32+
return nil, err
33+
}
34+
tlsConfig.Certificates = []tls.Certificate{pair}
35+
}
36+
return tlsConfig, nil
37+
}
38+
return nil, nil
39+
}

0 commit comments

Comments
 (0)