Skip to content

Commit 1a937f1

Browse files
authored
kafka egress (#74)
* kafka egress * added documenation of kafka encode * fixed merge conflicts * added additional kafka writer parameters and addressed other comments
1 parent ca04db6 commit 1a937f1

File tree

13 files changed

+303
-5
lines changed

13 files changed

+303
-5
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ Flags:
3737
--log-level string Log level: debug, info, warning, error (default "error")
3838
--pipeline.decode.aws string aws fields
3939
--pipeline.decode.type string Decode type: aws, json, none
40+
--pipeline.encode.kafka string Kafka encode API
4041
--pipeline.encode.prom string Prometheus encode API
41-
--pipeline.encode.type string Encode type: prom, json, none
42+
--pipeline.encode.type string Encode type: prom, json, kafka, none
4243
--pipeline.extract.aggregates string Aggregates (see docs)
4344
--pipeline.extract.type string Extract type: aggregates, none
4445
--pipeline.ingest.collector string Ingest collector API

cmd/flowlogs2metrics/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,11 @@ func initFlags() {
136136
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Transform, "pipeline.transform", "[{\"type\": \"none\"}]", "Transforms (list) API")
137137
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Type, "pipeline.extract.type", "", "Extract type: aggregates, none")
138138
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Aggregates, "pipeline.extract.aggregates", "", "Aggregates (see docs)")
139-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Type, "pipeline.encode.type", "", "Encode type: prom, json, none")
140139
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Type, "pipeline.write.type", "", "Write type: stdout, none")
141140
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Loki, "pipeline.write.loki", "", "Loki write API")
141+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Type, "pipeline.encode.type", "", "Encode type: prom, json, kafka, none")
142142
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Prom, "pipeline.encode.prom", "", "Prometheus encode API")
143+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Kafka, "pipeline.encode.kafka", "", "Kafka encode API")
143144

144145
_ = rootCmd.MarkPersistentFlagRequired("pipeline.ingest.type")
145146
}

docs/api.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,24 @@ Following is the supported API format for prometheus encode:
1717
prefix: prefix added to each metric name
1818
expirytime: seconds of no-flow to wait before deleting prometheus data item
1919
</pre>
20+
## Kafka encode API
21+
Following is the supported API format for kafka encode:
22+
23+
<pre>
24+
kafka:
25+
addr: address of kafka server
26+
topic: kafka topic to write to
27+
balancer: (enum) one of the following:
28+
roundRobin: RoundRobin balancer
29+
leastBytes: LeastBytes balancer
30+
hash: Hash balancer
31+
crc32: Crc32 balancer
32+
murmur2: Murmur2 balancer
33+
writeTimeout: timeout (in seconds) for write operation performed by the Writer
34+
readTimeout: timeout (in seconds) for read operation performed by the Writer
35+
batchBytes: limit the maximum size of a request in bytes before being sent to a partition
36+
batchSize: limit on how many messages will be buffered before being sent to a partition
37+
</pre>
2038
## Ingest collector API
2139
Following is the supported API format for the netflow collector:
2240

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ require (
1212
github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c
1313
github.com/prometheus/client_golang v1.12.0
1414
github.com/prometheus/common v0.32.1
15-
github.com/segmentio/kafka-go v0.4.27
15+
github.com/segmentio/kafka-go v0.4.28
1616
github.com/sirupsen/logrus v1.8.1
1717
github.com/spf13/cobra v1.3.0
1818
github.com/spf13/pflag v1.0.5

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,8 @@ github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
776776
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
777777
github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4=
778778
github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
779+
github.com/segmentio/kafka-go v0.4.28 h1:ATYbyenAlsoFxnV+VpIJMF87bvRuRsX7fezHNfpwkdM=
780+
github.com/segmentio/kafka-go v0.4.28/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
779781
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
780782
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
781783
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=

pkg/api/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const TagEnum = "enum"
2525

2626
type API struct {
2727
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
28+
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
2829
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"`
2930
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
3031
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`

pkg/api/encode_kafka.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
type EncodeKafka struct {
21+
Address string `yaml:"addr" doc:"address of kafka server"`
22+
Topic string `yaml:"topic" doc:"kafka topic to write to"`
23+
Balancer string `yaml:"balancer" enum:"KafkaEncodeBalancerEnum" doc:"one of the following:"`
24+
WriteTimeout int64 `yaml:"writeTimeout" doc:"timeout (in seconds) for write operation performed by the Writer"`
25+
ReadTimeout int64 `yaml:"readTimeout" doc:"timeout (in seconds) for read operation performed by the Writer"`
26+
BatchBytes int64 `yaml:"batchBytes" doc:"limit the maximum size of a request in bytes before being sent to a partition"`
27+
BatchSize int `yaml:"batchSize" doc:"limit on how many messages will be buffered before being sent to a partition"`
28+
}
29+
30+
type KafkaEncodeBalancerEnum struct {
31+
RoundRobin string `yaml:"roundRobin" doc:"RoundRobin balancer"`
32+
LeastBytes string `yaml:"leastBytes" doc:"LeastBytes balancer"`
33+
Hash string `yaml:"hash" doc:"Hash balancer"`
34+
Crc32 string `yaml:"crc32" doc:"Crc32 balancer"`
35+
Murmur2 string `yaml:"murmur2" doc:"Murmur2 balancer"`
36+
}
37+
38+
func KafkaEncodeBalancerName(operation string) string {
39+
return GetEnumName(KafkaEncodeBalancerEnum{}, operation)
40+
}

pkg/api/enum.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
type enums struct {
2626
PromEncodeOperationEnum PromEncodeOperationEnum
2727
TransformNetworkOperationEnum TransformNetworkOperationEnum
28+
KafkaEncodeBalancerEnum KafkaEncodeBalancerEnum
2829
}
2930

3031
type enumNameCacheKey struct {

pkg/config/config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,9 @@ type Extract struct {
6262
}
6363

6464
type Encode struct {
65-
Type string
66-
Prom string
65+
Type string
66+
Prom string
67+
Kafka string
6768
}
6869

6970
type Write struct {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package encode
19+
20+
import (
21+
"encoding/json"
22+
"github.com/netobserv/flowlogs2metrics/pkg/api"
23+
"github.com/netobserv/flowlogs2metrics/pkg/config"
24+
kafkago "github.com/segmentio/kafka-go"
25+
log "github.com/sirupsen/logrus"
26+
"golang.org/x/net/context"
27+
"time"
28+
)
29+
30+
const (
31+
defaultReadTimeoutSeconds = int64(10)
32+
defaultWriteTimeoutSeconds = int64(10)
33+
)
34+
35+
type kafkaWriteMessage interface {
36+
WriteMessages(ctx context.Context, msgs ...kafkago.Message) error
37+
}
38+
39+
type encodeKafka struct {
40+
kafkaParams api.EncodeKafka
41+
kafkaWriter kafkaWriteMessage
42+
}
43+
44+
// Encode writes entries to kafka topic
45+
func (r *encodeKafka) Encode(in []config.GenericMap) []interface{} {
46+
log.Debugf("entering encodeKafka Encode, #items = %d", len(in))
47+
var msgs []kafkago.Message
48+
msgs = make([]kafkago.Message, 0)
49+
out := make([]interface{}, 0)
50+
for _, entry := range in {
51+
var entryByteArray []byte
52+
entryByteArray, _ = json.Marshal(entry)
53+
msg := kafkago.Message{
54+
Value: entryByteArray,
55+
}
56+
msgs = append(msgs, msg)
57+
out = append(out, entry)
58+
}
59+
err := r.kafkaWriter.WriteMessages(context.Background(), msgs...)
60+
if err != nil {
61+
log.Errorf("encodeKafka error: %v", err)
62+
}
63+
return out
64+
}
65+
66+
// NewEncodeKafka create a new writer to kafka
67+
func NewEncodeKafka() (Encoder, error) {
68+
log.Debugf("entering NewIngestKafka")
69+
encodeKafkaString := config.Opt.PipeLine.Encode.Kafka
70+
log.Debugf("encodeKafkaString = %s", encodeKafkaString)
71+
var jsonEncodeKafka api.EncodeKafka
72+
err := json.Unmarshal([]byte(encodeKafkaString), &jsonEncodeKafka)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
var balancer kafkago.Balancer
78+
switch jsonEncodeKafka.Balancer {
79+
case api.KafkaEncodeBalancerName("RoundRobin"):
80+
balancer = &kafkago.RoundRobin{}
81+
case api.KafkaEncodeBalancerName("LeastBytes"):
82+
balancer = &kafkago.LeastBytes{}
83+
case api.KafkaEncodeBalancerName("Hash"):
84+
balancer = &kafkago.Hash{}
85+
case api.KafkaEncodeBalancerName("Crc32"):
86+
balancer = &kafkago.CRC32Balancer{}
87+
case api.KafkaEncodeBalancerName("Murmur2"):
88+
balancer = &kafkago.Murmur2Balancer{}
89+
default:
90+
balancer = nil
91+
}
92+
93+
readTimeoutSecs := defaultReadTimeoutSeconds
94+
if jsonEncodeKafka.ReadTimeout != 0 {
95+
readTimeoutSecs = jsonEncodeKafka.ReadTimeout
96+
}
97+
98+
writeTimeoutSecs := defaultWriteTimeoutSeconds
99+
if jsonEncodeKafka.WriteTimeout != 0 {
100+
writeTimeoutSecs = jsonEncodeKafka.WriteTimeout
101+
}
102+
103+
// connect to the kafka server
104+
kafkaWriter := kafkago.Writer{
105+
Addr: kafkago.TCP(jsonEncodeKafka.Address),
106+
Topic: jsonEncodeKafka.Topic,
107+
Balancer: balancer,
108+
ReadTimeout: time.Duration(readTimeoutSecs) * time.Second,
109+
WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second,
110+
BatchSize: jsonEncodeKafka.BatchSize,
111+
BatchBytes: jsonEncodeKafka.BatchBytes,
112+
}
113+
114+
return &encodeKafka{
115+
kafkaParams: jsonEncodeKafka,
116+
kafkaWriter: &kafkaWriter,
117+
}, nil
118+
}

0 commit comments

Comments
 (0)