Skip to content

Commit 90b6b5c

Browse files
authored
Refactoring prior to informers-cache rework (#847)
Related: #681 This is a preliminary work preparing the ground for NETOBSERV-1248. No functional change here. The goal is to minimize rebase hassle in #681 Details: - Kafka read/write logic extracted from ingest/encode pipelines and put into a dedicated kafka package - New k8s "datasource" struct for k8s enrichment - currently only has the usual informers datasource, but later will include a kafka-based datasource as well - Config related to k8s datasource moved into its own package. It includes the SecondaryNetwork config. - Some minor variables/functions renaming
1 parent 5f6d6dc commit 90b6b5c

File tree

14 files changed

+567
-474
lines changed

14 files changed

+567
-474
lines changed

pkg/kafka/reader.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package kafka
2+
3+
import (
4+
"errors"
5+
"os"
6+
"time"
7+
8+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
9+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
10+
kafkago "github.com/segmentio/kafka-go"
11+
"github.com/sirupsen/logrus"
12+
)
13+
14+
var klog = logrus.WithField("component", "kafka-reader")
15+
16+
const defaultBatchReadTimeout = int64(1000)
17+
const defaultKafkaBatchMaxLength = 500
18+
const defaultKafkaCommitInterval = 500
19+
20+
func NewReader(config *api.IngestKafka) (*kafkago.Reader, int, error) {
21+
startOffsetString := config.StartOffset
22+
var startOffset int64
23+
switch startOffsetString {
24+
case "FirstOffset", "":
25+
startOffset = kafkago.FirstOffset
26+
case "LastOffset":
27+
startOffset = kafkago.LastOffset
28+
default:
29+
startOffset = kafkago.FirstOffset
30+
klog.Errorf("illegal value for StartOffset: %s; using default\n", startOffsetString)
31+
}
32+
klog.Debugf("startOffset = %v", startOffset)
33+
groupBalancers := make([]kafkago.GroupBalancer, 0)
34+
for _, gb := range config.GroupBalancers {
35+
switch gb {
36+
case "range":
37+
groupBalancers = append(groupBalancers, &kafkago.RangeGroupBalancer{})
38+
case "roundRobin":
39+
groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{})
40+
case "rackAffinity":
41+
groupBalancers = append(groupBalancers, &kafkago.RackAffinityGroupBalancer{})
42+
default:
43+
klog.Warningf("groupbalancers parameter missing")
44+
groupBalancers = append(groupBalancers, &kafkago.RoundRobinGroupBalancer{})
45+
}
46+
}
47+
48+
batchReadTimeout := defaultBatchReadTimeout
49+
if config.BatchReadTimeout != 0 {
50+
batchReadTimeout = config.BatchReadTimeout
51+
}
52+
klog.Debugf("batchReadTimeout = %d", batchReadTimeout)
53+
54+
commitInterval := int64(defaultKafkaCommitInterval)
55+
if config.CommitInterval != 0 {
56+
commitInterval = config.CommitInterval
57+
}
58+
klog.Debugf("commitInterval = %d", config.CommitInterval)
59+
60+
dialer := &kafkago.Dialer{
61+
Timeout: kafkago.DefaultDialer.Timeout,
62+
DualStack: kafkago.DefaultDialer.DualStack,
63+
}
64+
if config.TLS != nil {
65+
klog.Infof("Using TLS configuration: %v", config.TLS)
66+
tlsConfig, err := config.TLS.Build()
67+
if err != nil {
68+
return nil, 0, err
69+
}
70+
dialer.TLS = tlsConfig
71+
}
72+
73+
if config.SASL != nil {
74+
m, err := utils.SetupSASLMechanism(config.SASL)
75+
if err != nil {
76+
return nil, 0, err
77+
}
78+
dialer.SASLMechanism = m
79+
}
80+
81+
readerConfig := kafkago.ReaderConfig{
82+
Brokers: config.Brokers,
83+
Topic: config.Topic,
84+
GroupID: config.GroupID,
85+
GroupBalancers: groupBalancers,
86+
StartOffset: startOffset,
87+
CommitInterval: time.Duration(commitInterval) * time.Millisecond,
88+
Dialer: dialer,
89+
}
90+
91+
if readerConfig.GroupID == "" {
92+
// Use hostname
93+
readerConfig.GroupID = os.Getenv("HOSTNAME")
94+
}
95+
96+
if config.PullQueueCapacity > 0 {
97+
readerConfig.QueueCapacity = config.PullQueueCapacity
98+
}
99+
100+
if config.PullMaxBytes > 0 {
101+
readerConfig.MaxBytes = config.PullMaxBytes
102+
}
103+
104+
bml := defaultKafkaBatchMaxLength
105+
if config.BatchMaxLen != 0 {
106+
bml = config.BatchMaxLen
107+
}
108+
109+
klog.Debugf("reader config: %#v", readerConfig)
110+
111+
kafkaReader := kafkago.NewReader(readerConfig)
112+
if kafkaReader == nil {
113+
return nil, 0, errors.New("NewIngestKafka: failed to create kafka-go reader")
114+
}
115+
116+
return kafkaReader, bml, nil
117+
}

pkg/kafka/writer.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package kafka
2+
3+
import (
4+
"time"
5+
6+
"github.com/netobserv/flowlogs-pipeline/pkg/api"
7+
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
8+
kafkago "github.com/segmentio/kafka-go"
9+
log "github.com/sirupsen/logrus"
10+
)
11+
12+
const (
13+
defaultReadTimeoutSeconds = int64(10)
14+
defaultWriteTimeoutSeconds = int64(10)
15+
)
16+
17+
func NewWriter(config *api.EncodeKafka) (*kafkago.Writer, error) {
18+
var balancer kafkago.Balancer
19+
switch config.Balancer {
20+
case api.KafkaRoundRobin:
21+
balancer = &kafkago.RoundRobin{}
22+
case api.KafkaLeastBytes:
23+
balancer = &kafkago.LeastBytes{}
24+
case api.KafkaHash:
25+
balancer = &kafkago.Hash{}
26+
case api.KafkaCrc32:
27+
balancer = &kafkago.CRC32Balancer{}
28+
case api.KafkaMurmur2:
29+
balancer = &kafkago.Murmur2Balancer{}
30+
default:
31+
balancer = nil
32+
}
33+
34+
readTimeoutSecs := defaultReadTimeoutSeconds
35+
if config.ReadTimeout != 0 {
36+
readTimeoutSecs = config.ReadTimeout
37+
}
38+
39+
writeTimeoutSecs := defaultWriteTimeoutSeconds
40+
if config.WriteTimeout != 0 {
41+
writeTimeoutSecs = config.WriteTimeout
42+
}
43+
44+
transport := kafkago.Transport{}
45+
if config.TLS != nil {
46+
log.Infof("Using TLS configuration: %v", config.TLS)
47+
tlsConfig, err := config.TLS.Build()
48+
if err != nil {
49+
return nil, err
50+
}
51+
transport.TLS = tlsConfig
52+
}
53+
54+
if config.SASL != nil {
55+
m, err := utils.SetupSASLMechanism(config.SASL)
56+
if err != nil {
57+
return nil, err
58+
}
59+
transport.SASL = m
60+
}
61+
62+
kafkaWriter := kafkago.Writer{
63+
Addr: kafkago.TCP(config.Address),
64+
Topic: config.Topic,
65+
Balancer: balancer,
66+
ReadTimeout: time.Duration(readTimeoutSecs) * time.Second,
67+
WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second,
68+
BatchSize: config.BatchSize,
69+
BatchBytes: config.BatchBytes,
70+
// Temporary fix may be we should implement a batching systems
71+
// https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403
72+
BatchTimeout: time.Nanosecond,
73+
Transport: &transport,
74+
}
75+
76+
return &kafkaWriter, nil
77+
}

pkg/pipeline/encode/encode_kafka.go

Lines changed: 5 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,17 @@ package encode
1919

2020
import (
2121
"encoding/json"
22-
"time"
2322

2423
"github.com/netobserv/flowlogs-pipeline/pkg/api"
2524
"github.com/netobserv/flowlogs-pipeline/pkg/config"
25+
"github.com/netobserv/flowlogs-pipeline/pkg/kafka"
2626
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
27-
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
2827
"github.com/prometheus/client_golang/prometheus"
2928
kafkago "github.com/segmentio/kafka-go"
3029
log "github.com/sirupsen/logrus"
3130
"golang.org/x/net/context"
3231
)
3332

34-
const (
35-
defaultReadTimeoutSeconds = int64(10)
36-
defaultWriteTimeoutSeconds = int64(10)
37-
)
38-
3933
type kafkaWriteMessage interface {
4034
WriteMessages(ctx context.Context, msgs ...kafkago.Message) error
4135
}
@@ -78,68 +72,14 @@ func NewEncodeKafka(opMetrics *operational.Metrics, params config.StageParam) (E
7872
config = *params.Encode.Kafka
7973
}
8074

81-
var balancer kafkago.Balancer
82-
switch config.Balancer {
83-
case api.KafkaRoundRobin:
84-
balancer = &kafkago.RoundRobin{}
85-
case api.KafkaLeastBytes:
86-
balancer = &kafkago.LeastBytes{}
87-
case api.KafkaHash:
88-
balancer = &kafkago.Hash{}
89-
case api.KafkaCrc32:
90-
balancer = &kafkago.CRC32Balancer{}
91-
case api.KafkaMurmur2:
92-
balancer = &kafkago.Murmur2Balancer{}
93-
default:
94-
balancer = nil
95-
}
96-
97-
readTimeoutSecs := defaultReadTimeoutSeconds
98-
if config.ReadTimeout != 0 {
99-
readTimeoutSecs = config.ReadTimeout
100-
}
101-
102-
writeTimeoutSecs := defaultWriteTimeoutSeconds
103-
if config.WriteTimeout != 0 {
104-
writeTimeoutSecs = config.WriteTimeout
105-
}
106-
107-
transport := kafkago.Transport{}
108-
if config.TLS != nil {
109-
log.Infof("Using TLS configuration: %v", config.TLS)
110-
tlsConfig, err := config.TLS.Build()
111-
if err != nil {
112-
return nil, err
113-
}
114-
transport.TLS = tlsConfig
115-
}
116-
117-
if config.SASL != nil {
118-
m, err := utils.SetupSASLMechanism(config.SASL)
119-
if err != nil {
120-
return nil, err
121-
}
122-
transport.SASL = m
123-
}
124-
125-
// connect to the kafka server
126-
kafkaWriter := kafkago.Writer{
127-
Addr: kafkago.TCP(config.Address),
128-
Topic: config.Topic,
129-
Balancer: balancer,
130-
ReadTimeout: time.Duration(readTimeoutSecs) * time.Second,
131-
WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second,
132-
BatchSize: config.BatchSize,
133-
BatchBytes: config.BatchBytes,
134-
// Temporary fix may be we should implement a batching systems
135-
// https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403
136-
BatchTimeout: time.Nanosecond,
137-
Transport: &transport,
75+
kafkaWriter, err := kafka.NewWriter(&config)
76+
if err != nil {
77+
return nil, err
13878
}
13979

14080
return &encodeKafka{
14181
kafkaParams: config,
142-
kafkaWriter: &kafkaWriter,
82+
kafkaWriter: kafkaWriter,
14383
recordsWritten: opMetrics.CreateRecordsWrittenCounter(params.Name),
14484
}, nil
14585
}

0 commit comments

Comments
 (0)