Skip to content

Commit fc3bafe

Browse files
authored
Kafka ingress (#55)
* kafka ingest * added clean exit support to kafka ingest plus unit test * added test of kafka listener * updated go.mod from rebase of code * fixed typos and addressed other review comments
1 parent 63f8926 commit fc3bafe

File tree

17 files changed

+473
-9
lines changed

17 files changed

+473
-9
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,15 @@ Flags:
3535
--config string config file (default is $HOME/.flowlogs2metrics)
3636
-h, --help help for flowlogs2metrics
3737
--log-level string Log level: debug, info, warning, error (default "error")
38-
--pipeLine.ingest.collector string Ingest collector API
3938
--pipeline.decode.aws string aws fields
4039
--pipeline.decode.type string Decode type: aws, json, none (default "none")
4140
--pipeline.encode.prom string Prometheus encode API
4241
--pipeline.encode.type string Encode type: prom, none (default "none")
4342
--pipeline.extract.aggregates string Aggregates (see docs)
4443
--pipeline.extract.type string Extract type: aggregates, none (default "none")
44+
--pipeline.ingest.collector string Ingest collector API
4545
--pipeline.ingest.file.filename string Ingest filename (file)
46+
--pipeline.ingest.kafka string Ingest Kafka API
4647
--pipeline.ingest.type string Ingest type: file, collector,file_loop (required)
4748
--pipeline.transform string Transforms (list) API (default "[{"type": "none"}]")
4849
--pipeline.write.loki string Loki write API

cmd/flowlogs2metrics/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ func initFlags() {
129129
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "error", "Log level: debug, info, warning, error")
130130
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)")
131131
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)")
132-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeLine.ingest.collector", "", "Ingest collector API")
132+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API")
133+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API")
133134
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Aws, "pipeline.decode.aws", "", "aws fields")
134135
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Type, "pipeline.decode.type", "none", "Decode type: aws, json, none")
135136
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Transform, "pipeline.transform", "[{\"type\": \"none\"}]", "Transforms (list) API")

docs/api.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ Following is the supported API format for the netflow collector:
2525
hostName: the hostname to listen on
2626
port: the port number to listen on
2727
</pre>
28+
## Ingest Kafka API
29+
Following is the supported API format for the kafka ingest:
30+
31+
<pre>
32+
kafka:
33+
brokers: list of kafka broker addresses
34+
topic: kafka topic to listen on
35+
groupid: separate groupid for each consumer on specified topic
36+
groupbalancers: list of balancing strategies (range, roundRobin, rackAffinity)
37+
startoffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
38+
batchreadtimeout: how often (in milliseconds) to process input
39+
</pre>
2840
## Aws ingest API
2941
Following is the supported API format for Aws flow entries:
3042

go.mod

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@ require (
1212
github.com/netsampler/goflow2 v1.0.4
1313
github.com/prometheus/client_golang v1.12.0
1414
github.com/prometheus/common v0.32.1
15+
github.com/segmentio/kafka-go v0.4.27
1516
github.com/sirupsen/logrus v1.8.1
1617
github.com/spf13/cobra v1.3.0
1718
github.com/spf13/pflag v1.0.5
1819
github.com/spf13/viper v1.10.1
1920
github.com/stretchr/testify v1.7.0
21+
golang.org/x/net v0.0.0-20211209124913-491a49abca63
2022
google.golang.org/protobuf v1.27.1
2123
gopkg.in/yaml.v2 v2.4.0
2224
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d
@@ -42,13 +44,15 @@ require (
4244
github.com/imdario/mergo v0.3.5 // indirect
4345
github.com/inconshreveable/mousetrap v1.0.0 // indirect
4446
github.com/jpillora/backoff v1.0.0 // indirect
47+
github.com/klauspost/compress v1.13.6 // indirect
4548
github.com/libp2p/go-reuseport v0.0.2 // indirect
4649
github.com/magiconair/properties v1.8.5 // indirect
4750
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
4851
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
4952
github.com/modern-go/reflect2 v1.0.2 // indirect
5053
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
5154
github.com/pelletier/go-toml v1.9.4 // indirect
55+
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
5256
github.com/pkg/errors v0.9.1 // indirect
5357
github.com/pmezard/go-difflib v1.0.0 // indirect
5458
github.com/prometheus/client_model v0.2.0 // indirect
@@ -60,7 +64,6 @@ require (
6064
github.com/stretchr/objx v0.2.0 // indirect
6165
github.com/subosito/gotenv v1.2.0 // indirect
6266
go.uber.org/atomic v1.9.0 // indirect
63-
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
6467
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
6568
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
6669
golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b // indirect

go.sum

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoD
216216
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
217217
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
218218
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
219+
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
219220
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
220221
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
221222
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
@@ -552,6 +553,10 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
552553
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
553554
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
554555
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
556+
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
557+
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
558+
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
559+
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
555560
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
556561
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
557562
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
@@ -703,6 +708,8 @@ github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHu
703708
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
704709
github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
705710
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
711+
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
712+
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
706713
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
707714
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
708715
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -778,6 +785,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
778785
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
779786
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
780787
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
788+
github.com/segmentio/kafka-go v0.4.27 h1:sIhEozeL/TLN2mZ5dkG462vcGEWYKS+u31sXPjKhAM4=
789+
github.com/segmentio/kafka-go v0.4.27/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg=
781790
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
782791
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
783792
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
@@ -824,6 +833,7 @@ github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3
824833
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
825834
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
826835
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
836+
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
827837
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
828838
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
829839
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
@@ -849,6 +859,7 @@ github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv
849859
github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
850860
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
851861
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
862+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
852863
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
853864
github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg=
854865
github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cimt1uhCg=
@@ -900,6 +911,7 @@ golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnf
900911
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
901912
golang.org/x/crypto v0.0.0-20190320223903-b7391e95e576/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
902913
golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
914+
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
903915
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
904916
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
905917
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=

pkg/api/api.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ const TagEnum = "enum"
2626
type API struct {
2727
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
2828
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"`
29-
EncodeAws EncodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
29+
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
30+
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
3031
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
3132
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
3233
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`

pkg/api/decode_aws.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@
1717

1818
package api
1919

20-
type EncodeAws struct {
20+
type DecodeAws struct {
2121
Fields []string `yaml:"fields" doc:"list of aws flow log fields"`
2222
}

pkg/api/ingest_kafka.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
type IngestKafka struct {
21+
Brokers []string `yaml:"brokers" doc:"list of kafka broker addresses"`
22+
Topic string `yaml:"topic" doc:"kafka topic to listen on"`
23+
GroupId string `yaml:"groupid" doc:"separate groupid for each consumer on specified topic"`
24+
GroupBalancers []string `yaml:"groupbalancers" doc:"list of balancing strategies (range, roundRobin, rackAffinity)"`
25+
StartOffset string `yaml:"startoffset" doc:"FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition"`
26+
BatchReadTimeout int64 `yaml:"batchreadtimeout" doc:"how often (in milliseconds) to process input"`
27+
}

pkg/config/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type Ingest struct {
4040
Type string
4141
File File
4242
Collector string
43+
Kafka string
4344
}
4445

4546
type File struct {

pkg/pipeline/decode/decode_aws.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func NewDecodeAws() (Decoder, error) {
7878
fieldsString := config.Opt.PipeLine.Decode.Aws
7979
log.Debugf("fieldsString = %v", fieldsString)
8080
if fieldsString != "" {
81-
var awsFields api.EncodeAws
81+
var awsFields api.DecodeAws
8282
err := json.Unmarshal([]byte(fieldsString), &awsFields)
8383
if err != nil {
8484
log.Errorf("NewDecodeAws: error in unmarshalling fields: %v", err)

0 commit comments

Comments
 (0)