Skip to content

Commit d7ca83d

Browse files
authored
Merge pull request #53 from KalmanMeth/decode-tests
port decode module to new code structure and add tests
2 parents caf33c2 + 07e48bc commit d7ca83d

File tree

14 files changed

+339
-25
lines changed

14 files changed

+339
-25
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@ Flags:
3636
-h, --help help for flowlogs2metrics
3737
--log-level string Log level: debug, info, warning, error (default "error")
3838
--pipeLine.ingest.collector string Ingest collector API
39+
--pipeline.decode.aws string aws fields
3940
--pipeline.decode.type string Decode type: aws, json, none (default "none")
4041
--pipeline.encode.prom string Prometheus encode API
4142
--pipeline.encode.type string Encode type: prom, none (default "none")
4243
--pipeline.extract.aggregates string Aggregates (see docs)
4344
--pipeline.extract.type string Extract type: aggregates, none (default "none")
44-
--pipeline.ingest.aws.fields strings aws fields
4545
--pipeline.ingest.file.filename string Ingest filename (file)
4646
--pipeline.ingest.type string Ingest type: file, collector,file_loop (required)
4747
--pipeline.transform string Transforms (list) API (default "[{"type": "none"}]")

cmd/flowlogs2metrics/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func initFlags() {
128128
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Type, "pipeline.ingest.type", "", "Ingest type: file, collector,file_loop (required)")
129129
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.File.Filename, "pipeline.ingest.file.filename", "", "Ingest filename (file)")
130130
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeLine.ingest.collector", "", "Ingest collector API")
131-
rootCmd.PersistentFlags().StringSliceVar(&config.Opt.PipeLine.Ingest.Aws.Fields, "pipeline.ingest.aws.fields", nil, "aws fields")
131+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Aws, "pipeline.decode.aws", "", "aws fields")
132132
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Type, "pipeline.decode.type", "none", "Decode type: aws, json, none")
133133
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Transform, "pipeline.transform", "[{\"type\": \"none\"}]", "Transforms (list) API")
134134
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Type, "pipeline.extract.type", "none", "Extract type: aggregates, none")

docs/api.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,13 @@ Following is the supported API format for the netflow collector:
2525
hostName: the hostname to listen on
2626
port: the port number to listen on
2727
</pre>
28+
## Aws ingest API
29+
Following is the supported API format for Aws flow entries:
30+
31+
<pre>
32+
aws:
33+
fields: list of aws flow log fields
34+
</pre>
2835
## Transform Generic API
2936
Following is the supported API format for generic transformations:
3037

pkg/api/api.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const TagEnum = "enum"
2626
type API struct {
2727
PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"`
2828
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"`
29+
EncodeAws EncodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
2930
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
3031
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
3132
}

pkg/api/decode_aws.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package api
19+
20+
type EncodeAws struct {
21+
Fields []string `yaml:"fields" doc:"list of aws flow log fields"`
22+
}

pkg/config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ type Ingest struct {
4040
Type string
4141
File File
4242
Collector string
43-
Aws Aws
4443
}
4544

4645
type File struct {
@@ -53,6 +52,7 @@ type Aws struct {
5352

5453
type Decode struct {
5554
Type string
55+
Aws string
5656
}
5757

5858
type Extract struct {

pkg/pipeline/decode/decode_aws.go

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy ofthe License at
6+
* You may obtain a copy of the License at
77
*
88
* http://www.apache.org/licenses/LICENSE-2.0
99
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specificlanguage governing permissions and
13+
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*
1616
*/
1717

1818
package decode
1919

2020
import (
21+
"encoding/json"
22+
"github.com/netobserv/flowlogs2metrics/pkg/api"
2123
"github.com/netobserv/flowlogs2metrics/pkg/config"
2224
log "github.com/sirupsen/logrus"
2325
"strings"
@@ -51,15 +53,16 @@ func (c *decodeAws) Decode(in []interface{}) []config.GenericMap {
5153
out := make([]config.GenericMap, 0)
5254
nItems := len(in)
5355
log.Debugf("nItems = %d", nItems)
54-
for _, line := range in {
56+
for lineNum, line := range in {
5557
lineSlice := strings.Fields(line.(string))
5658
nFields := len(lineSlice)
57-
if nFields > len(c.keyTags) {
58-
nFields = len(c.keyTags)
59+
if nFields != len(c.keyTags) {
60+
log.Errorf("decodeAws Decode: wrong number of fields in line %d", lineNum+1)
61+
continue
5962
}
6063
record := make(config.GenericMap)
61-
for i := 0; i < nFields; i++ {
62-
record[c.keyTags[i]] = lineSlice[i]
64+
for fieldNum := 0; fieldNum < nFields; fieldNum++ {
65+
record[c.keyTags[fieldNum]] = lineSlice[fieldNum]
6366
}
6467
log.Debugf("record = %v", record)
6568
out = append(out, record)
@@ -71,12 +74,23 @@ func (c *decodeAws) Decode(in []interface{}) []config.GenericMap {
7174
// NewDecodeAws create a new decode
7275
func NewDecodeAws() (Decoder, error) {
7376
log.Debugf("entering NewDecodeAws")
74-
RecordKeys := config.Opt.PipeLine.Ingest.Aws.Fields
75-
if RecordKeys == nil {
76-
RecordKeys = defaultKeys
77+
var recordKeys []string
78+
fieldsString := config.Opt.PipeLine.Decode.Aws
79+
log.Debugf("fieldsString = %v", fieldsString)
80+
if fieldsString != "" {
81+
var awsFields api.EncodeAws
82+
err := json.Unmarshal([]byte(fieldsString), &awsFields)
83+
if err != nil {
84+
log.Errorf("NewDecodeAws: error in unmarshalling fields: %v", err)
85+
return nil, err
86+
}
87+
recordKeys = awsFields.Fields
88+
} else {
89+
recordKeys = defaultKeys
7790
}
78-
log.Debugf("RecordKeys = %v", RecordKeys)
91+
92+
log.Debugf("recordKeys = %v", recordKeys)
7993
return &decodeAws{
80-
keyTags: RecordKeys,
94+
keyTags: recordKeys,
8195
}, nil
8296
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package decode
19+
20+
import (
21+
"bufio"
22+
jsoniter "github.com/json-iterator/go"
23+
"github.com/netobserv/flowlogs2metrics/pkg/config"
24+
"github.com/netobserv/flowlogs2metrics/pkg/test"
25+
"github.com/stretchr/testify/require"
26+
"strings"
27+
"testing"
28+
)
29+
30+
const testConfig1 = `
31+
pipeline:
32+
decode:
33+
type: aws
34+
`
35+
const testConfig2 = `
36+
pipeline:
37+
decode:
38+
type: aws
39+
aws:
40+
fields:
41+
- version
42+
- vpc-id
43+
- subnet-id
44+
- instance-id
45+
- interface-id
46+
- account-id
47+
- type
48+
- srcaddr
49+
- dstaddr
50+
- srcport
51+
- dstport
52+
- pkt-srcaddr
53+
- pkt-dstaddr
54+
- protocol
55+
- bytes
56+
- packets
57+
- start
58+
- end
59+
- action
60+
- tcp-flags
61+
- log-status
62+
`
63+
64+
const testConfigErr = `
65+
pipeline:
66+
decode:
67+
type: aws
68+
aws:
69+
fields:
70+
version
71+
vpc-id
72+
`
73+
74+
// aws version 2 standard format
75+
const input1 = `2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK
76+
2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK
77+
2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK
78+
`
79+
80+
const inputErr = `2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK
81+
2 12345 6789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK
82+
2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK
83+
`
84+
85+
// aws version 3 custom format
86+
const input2 = `3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK
87+
3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK
88+
`
89+
90+
func initNewDecodeAws(t *testing.T, testConfig string) Decoder {
91+
v := test.InitConfig(t, testConfig)
92+
val := v.Get("pipeline.decode.type")
93+
require.Equal(t, "aws", val)
94+
95+
tmp := v.Get("pipeline.decode.aws")
96+
if tmp != nil {
97+
var json = jsoniter.ConfigCompatibleWithStandardLibrary
98+
b, err := json.Marshal(&tmp)
99+
require.Equal(t, err, nil)
100+
// perform initializations usually done in main.go
101+
config.Opt.PipeLine.Decode.Aws = string(b)
102+
} else {
103+
config.Opt.PipeLine.Decode.Aws = ""
104+
}
105+
106+
newDecode, err := NewDecodeAws()
107+
require.Equal(t, nil, err)
108+
return newDecode
109+
}
110+
111+
func breakIntoLines(input string) []interface{} {
112+
lines := make([]interface{}, 0)
113+
r := strings.NewReader(input)
114+
scanner := bufio.NewScanner(r)
115+
for scanner.Scan() {
116+
text := scanner.Text()
117+
lines = append(lines, text)
118+
}
119+
return lines
120+
}
121+
122+
func TestDecodeAwsDefault(t *testing.T) {
123+
newDecode := initNewDecodeAws(t, testConfig1)
124+
decodeAws := newDecode.(*decodeAws)
125+
require.Equal(t, defaultKeys, decodeAws.keyTags)
126+
127+
lines := breakIntoLines(input1)
128+
nLines := len(lines)
129+
output := decodeAws.Decode(lines)
130+
require.Equal(t, nLines, len(output))
131+
132+
expectedResult := config.GenericMap{
133+
"version": "2",
134+
"account-id": "123456789010",
135+
"interface-id": "eni-1235b8ca123456789",
136+
"srcaddr": "172.31.16.139",
137+
"dstaddr": "172.31.16.21",
138+
"srcport": "20641",
139+
"dstport": "22",
140+
"protocol": "6",
141+
"packets": "20",
142+
"bytes": "4249",
143+
"start": "1418530010",
144+
"end": "1418530070",
145+
"action": "ACCEPT",
146+
"log-status": "OK",
147+
}
148+
require.Equal(t, expectedResult, output[0])
149+
}
150+
151+
func TestDecodeAwsDefaultErr(t *testing.T) {
152+
newDecode := initNewDecodeAws(t, testConfig1)
153+
decodeAws := newDecode.(*decodeAws)
154+
require.Equal(t, defaultKeys, decodeAws.keyTags)
155+
156+
lines := breakIntoLines(inputErr)
157+
nLines := len(lines) - 1
158+
output := decodeAws.Decode(lines)
159+
require.Equal(t, nLines, len(output))
160+
}
161+
162+
func TestDecodeAwsCustom(t *testing.T) {
163+
newDecode := initNewDecodeAws(t, testConfig2)
164+
decodeAws := newDecode.(*decodeAws)
165+
166+
lines := breakIntoLines(input2)
167+
nLines := len(lines)
168+
output := decodeAws.Decode(lines)
169+
require.Equal(t, nLines, len(output))
170+
171+
expectedResult := config.GenericMap{
172+
"version": "3",
173+
"vpc-id": "vpc-abcdefab012345678",
174+
"subnet-id": "subnet-aaaaaaaa012345678",
175+
"instance-id": "i-01234567890123456",
176+
"interface-id": "eni-1235b8ca123456789",
177+
"account-id": "123456789010",
178+
"type": "IPv4",
179+
"srcaddr": "52.213.180.42",
180+
"dstaddr": "10.0.0.62",
181+
"srcport": "43416",
182+
"dstport": "5001",
183+
"pkt-srcaddr": "52.213.180.42",
184+
"pkt-dstaddr": "10.0.0.62",
185+
"protocol": "6",
186+
"bytes": "568",
187+
"packets": "8",
188+
"start": "1566848875",
189+
"end": "1566848933",
190+
"action": "ACCEPT",
191+
"tcp-flags": "2",
192+
"log-status": "OK",
193+
}
194+
require.Equal(t, expectedResult, output[0])
195+
}
196+
197+
func TestConfigErr(t *testing.T) {
198+
v := test.InitConfig(t, testConfigErr)
199+
val := v.Get("pipeline.decode.type")
200+
require.Equal(t, "aws", val)
201+
202+
tmp := v.Get("pipeline.decode.aws")
203+
var json = jsoniter.ConfigCompatibleWithStandardLibrary
204+
b, err := json.Marshal(&tmp)
205+
require.Equal(t, err, nil)
206+
// perform initializations usually done in main.go
207+
config.Opt.PipeLine.Decode.Aws = string(b)
208+
209+
newDecode, _ := NewDecodeAws()
210+
require.Equal(t, nil, newDecode)
211+
}

pkg/pipeline/decode/decode_json.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (c *decodeJson) Decode(in []interface{}) []config.GenericMap {
3737
var decodedLine map[string]interface{}
3838
err := json.Unmarshal(line2, &decodedLine)
3939
if err != nil {
40-
log.Errorf("error unmarshalling a line: %v", err)
40+
log.Errorf("decodeJson Decode: error unmarshalling a line: %v", err)
4141
continue
4242
}
4343
decodedLine2 := make(config.GenericMap, len(decodedLine))

0 commit comments

Comments
 (0)