Skip to content

Commit 8cb43e4

Browse files
authored
added encode_json support plus test (#60)
* added encode_json support plus test * added encode json to README removed default none from options
1 parent fc3bafe commit 8cb43e4

File tree

7 files changed

+205
-28
lines changed

7 files changed

+205
-28
lines changed

README.md

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,18 @@ Flags:
3636
-h, --help help for flowlogs2metrics
3737
--log-level string Log level: debug, info, warning, error (default "error")
3838
--pipeline.decode.aws string aws fields
39-
--pipeline.decode.type string Decode type: aws, json, none (default "none")
39+
--pipeline.decode.type string Decode type: aws, json, none
4040
--pipeline.encode.prom string Prometheus encode API
41-
--pipeline.encode.type string Encode type: prom, none (default "none")
41+
--pipeline.encode.type string Encode type: prom, json, none
4242
--pipeline.extract.aggregates string Aggregates (see docs)
43-
--pipeline.extract.type string Extract type: aggregates, none (default "none")
43+
--pipeline.extract.type string Extract type: aggregates, none
4444
--pipeline.ingest.collector string Ingest collector API
4545
--pipeline.ingest.file.filename string Ingest filename (file)
4646
--pipeline.ingest.kafka string Ingest Kafka API
4747
--pipeline.ingest.type string Ingest type: file, collector,file_loop (required)
4848
--pipeline.transform string Transforms (list) API (default "[{"type": "none"}]")
4949
--pipeline.write.loki string Loki write API
50-
--pipeline.write.type string Write type: stdout, none (default "none")
50+
--pipeline.write.type string Write type: stdout, none
5151
```
5252
<!---END-AUTO-flowlogs2metrics_help--->
5353

@@ -361,6 +361,12 @@ pipeline:
361361
RecordKey: "value"
362362
```
363363

364+
### Json Encoder
365+
366+
The json encoder takes each entry in the internal representation of the data and converts it to a json byte array.
367+
These byte arrays may then be output by a `write` stage.
368+
369+
364370
### Prometheus encoder
365371

366372
The prometheus encoder specifies which metrics to export to prometheus and which labels should be associated with those metrics.

cmd/flowlogs2metrics/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,12 @@ func initFlags() {
132132
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Collector, "pipeline.ingest.collector", "", "Ingest collector API")
133133
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Ingest.Kafka, "pipeline.ingest.kafka", "", "Ingest Kafka API")
134134
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Aws, "pipeline.decode.aws", "", "aws fields")
135-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Type, "pipeline.decode.type", "none", "Decode type: aws, json, none")
135+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Decode.Type, "pipeline.decode.type", "", "Decode type: aws, json, none")
136136
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Transform, "pipeline.transform", "[{\"type\": \"none\"}]", "Transforms (list) API")
137-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Type, "pipeline.extract.type", "none", "Extract type: aggregates, none")
137+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Type, "pipeline.extract.type", "", "Extract type: aggregates, none")
138138
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Extract.Aggregates, "pipeline.extract.aggregates", "", "Aggregates (see docs)")
139-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Type, "pipeline.encode.type", "none", "Encode type: prom, none")
140-
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Type, "pipeline.write.type", "none", "Write type: stdout, none")
139+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Type, "pipeline.encode.type", "", "Encode type: prom, json, none")
140+
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Type, "pipeline.write.type", "", "Write type: stdout, none")
141141
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Write.Loki, "pipeline.write.loki", "", "Loki write API")
142142
rootCmd.PersistentFlags().StringVar(&config.Opt.PipeLine.Encode.Prom, "pipeline.encode.prom", "", "Prometheus encode API")
143143

pkg/pipeline/encode/encode_json.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (C) 2021 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package encode
19+
20+
import (
21+
"encoding/json"
22+
"github.com/netobserv/flowlogs2metrics/pkg/config"
23+
log "github.com/sirupsen/logrus"
24+
)
25+
26+
type encodeJson struct {
27+
}
28+
29+
// Encode encodes json to byte array
30+
// All entries should be saved as strings
31+
func (e *encodeJson) Encode(inputMetrics []config.GenericMap) []interface{} {
32+
out := make([]interface{}, 0)
33+
for _, metric := range inputMetrics {
34+
log.Debugf("encodeJson, metric = %v", metric)
35+
var line []byte
36+
var err error
37+
line, err = json.Marshal(metric)
38+
if err != nil {
39+
log.Errorf("encodeJson Decode: error marshalling a line: %v", err)
40+
continue
41+
}
42+
out = append(out, line)
43+
}
44+
return out
45+
}
46+
47+
// NewEndcodeJson create a new encode
48+
func NewEncodeJson() (Encoder, error) {
49+
log.Debugf("entering NewEncodeJson")
50+
return &encodeJson{}, nil
51+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package encode
19+
20+
import (
21+
"github.com/netobserv/flowlogs2metrics/pkg/config"
22+
"github.com/stretchr/testify/require"
23+
"testing"
24+
)
25+
26+
func initNewEncodeJson(t *testing.T) Encoder {
27+
newEncode, err := NewEncodeJson()
28+
require.NoError(t, err)
29+
return newEncode
30+
}
31+
32+
func TestEncodeJson(t *testing.T) {
33+
newEncode := initNewEncodeJson(t)
34+
encodeByteArray := newEncode.(*encodeJson)
35+
map1 := config.GenericMap{
36+
"varInt": 12,
37+
"varString": "testString1",
38+
"varBool": true,
39+
}
40+
map2 := config.GenericMap{
41+
"varString": "testString2",
42+
"varInt": 14,
43+
"varBool": false,
44+
}
45+
map3 := config.GenericMap{}
46+
var out []interface{}
47+
var in []config.GenericMap
48+
out = encodeByteArray.Encode(in)
49+
require.Equal(t, 0, len(out))
50+
in = append(in, map1)
51+
in = append(in, map2)
52+
in = append(in, map3)
53+
out = encodeByteArray.Encode(in)
54+
require.Equal(t, len(in), len(out))
55+
expected1 := []byte(`{"varInt":12,"varBool":true,"varString":"testString1"}`)
56+
expected2 := []byte(`{"varInt":14,"varBool":false,"varString":"testString2"}`)
57+
expected3 := []byte(`{}`)
58+
require.JSONEq(t, string(expected1), string(out[0].([]byte)))
59+
require.JSONEq(t, string(expected2), string(out[1].([]byte)))
60+
require.JSONEq(t, string(expected3), string(out[2].([]byte)))
61+
}

pkg/pipeline/encode/encode_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright (C) 2022 IBM, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package encode
19+
20+
import (
21+
"github.com/netobserv/flowlogs2metrics/pkg/config"
22+
"github.com/stretchr/testify/require"
23+
"testing"
24+
)
25+
26+
func initNewEncodeNone(t *testing.T) Encoder {
27+
newEncode, err := NewEncodeNone()
28+
require.Equal(t, err, nil)
29+
return newEncode
30+
}
31+
32+
func TestEncodeNone(t *testing.T) {
33+
newEncode := initNewEncodeNone(t)
34+
encodeNone := newEncode.(*encodeNone)
35+
map1 := config.GenericMap{
36+
"varInt": 12,
37+
"varString": "string1",
38+
"varbool": true,
39+
}
40+
map2 := config.GenericMap{
41+
"varInt": 14,
42+
"varString": "string2",
43+
"varbool": false,
44+
}
45+
map3 := config.GenericMap{}
46+
var out []interface{}
47+
var in []config.GenericMap
48+
out = encodeNone.Encode(in)
49+
require.Equal(t, 0, len(out))
50+
in = append(in, map1)
51+
in = append(in, map2)
52+
in = append(in, map3)
53+
out = encodeNone.Encode(in)
54+
require.Equal(t, len(in), len(out))
55+
}

pkg/pipeline/pipeline.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ func getEncoder() (encode.Encoder, error) {
115115
switch config.Opt.PipeLine.Encode.Type {
116116
case "prom":
117117
encoder, _ = encode.NewEncodeProm()
118+
case "json":
119+
encoder, _ = encode.NewEncodeJson()
118120
case "none":
119121
encoder, _ = encode.NewEncodeNone()
120122
default:

playground/goflow4.yml

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,40 @@
11
log-level: debug
22
pipeline:
33
ingest:
4-
type: file
4+
type: file_loop
55
file:
66
filename: playground/goflow2_input.txt
77
decode:
88
type: json
99
transform:
1010
- type: generic
1111
generic:
12-
- input: Bytes
13-
output: v1_bytes
14-
- input: DstAddr
15-
output: v1_dstAddr
16-
- input: Packets
17-
output: v1_packets
18-
- input: SrcPort
19-
output: v1_srcPort
12+
rules:
13+
- input: Bytes
14+
output: v1_bytes
15+
- input: DstAddr
16+
output: v1_dstAddr
17+
- input: Packets
18+
output: v1_packets
19+
- input: SrcPort
20+
output: v1_srcPort
2021
- type: generic
2122
generic:
22-
- input: v1_srcPort
23-
output: v1_srcPort
24-
- input: v1_packets
25-
output: v1_packets
26-
- input: v1_packets
27-
output: v2_packets
28-
- input: v1_bytes
29-
output: v2_bytes
30-
- input: v1_dstAddr
31-
output: v2_dstAddr
23+
rules:
24+
- input: v1_srcPort
25+
output: v1_srcPort
26+
- input: v1_packets
27+
output: v1_packets
28+
- input: v1_packets
29+
output: v2_packets
30+
- input: v1_bytes
31+
output: v2_bytes
32+
- input: v1_dstAddr
33+
output: v2_dstAddr
3234
extract:
3335
type: none
3436
encode:
35-
type: none
37+
type: json
3638
write:
3739
type: stdout
3840

0 commit comments

Comments
 (0)