Skip to content

Commit 757c754

Browse files
authored
added a simple end-to-end pipeline test (#92)
* added a simple end-to-end pipeline test * fixed data on copyright * removed extra print statements * combined pipeline tests into single file * used data from file in repository
1 parent 446bbc3 commit 757c754

File tree

6 files changed

+90
-17
lines changed

6 files changed

+90
-17
lines changed

pkg/pipeline/decode/decode_json.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,13 @@ import (
2323
log "github.com/sirupsen/logrus"
2424
)
2525

26-
type decodeJson struct {
26+
type DecodeJson struct {
27+
PrevRecords []interface{}
2728
}
2829

2930
// Decode decodes input strings to a list of flow entries
3031
// All entries should be saved as strings
31-
func (c *decodeJson) Decode(in []interface{}) []config.GenericMap {
32+
func (c *DecodeJson) Decode(in []interface{}) []config.GenericMap {
3233
out := make([]config.GenericMap, 0)
3334
for _, line := range in {
3435
log.Debugf("decodeJson: line = %v", line)
@@ -48,11 +49,12 @@ func (c *decodeJson) Decode(in []interface{}) []config.GenericMap {
4849
}
4950
out = append(out, decodedLine2)
5051
}
52+
c.PrevRecords = in
5153
return out
5254
}
5355

5456
// NewDecodeJson create a new decode
5557
func NewDecodeJson() (Decoder, error) {
5658
log.Debugf("entering NewDecodeJson")
57-
return &decodeJson{}, nil
59+
return &DecodeJson{}, nil
5860
}

pkg/pipeline/decode/decode_json_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func initNewDecodeJson(t *testing.T) Decoder {
3131

3232
func TestDecodeJson(t *testing.T) {
3333
newDecode := initNewDecodeJson(t)
34-
decodeJson := newDecode.(*decodeJson)
34+
decodeJson := newDecode.(*DecodeJson)
3535
inputString1 := "{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"
3636
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true}"
3737
inputString3 := "{}"
@@ -55,7 +55,7 @@ func TestDecodeJson(t *testing.T) {
5555

5656
func TestDecodeJsonTimestamps(t *testing.T) {
5757
newDecode := initNewDecodeJson(t)
58-
decodeJson := newDecode.(*decodeJson)
58+
decodeJson := newDecode.(*DecodeJson)
5959
inputString1 := "{\"unixTime\": 1645104030 }"
6060
var in []interface{}
6161
var out []config.GenericMap

pkg/pipeline/ingest/ingest_file.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,16 @@ import (
2727
"time"
2828
)
2929

30-
type ingestFile struct {
31-
fileName string
32-
exitChan chan bool
30+
type IngestFile struct {
31+
fileName string
32+
exitChan chan bool
33+
PrevRecords []interface{}
3334
}
3435

3536
const delaySeconds = 10
3637

3738
// Ingest ingests entries from a file and resends the same data every delaySeconds seconds
38-
func (r *ingestFile) Ingest(process ProcessFunction) {
39+
func (r *IngestFile) Ingest(process ProcessFunction) {
3940
lines := make([]interface{}, 0)
4041
file, err := os.Open(r.fileName)
4142
if err != nil {
@@ -54,6 +55,7 @@ func (r *ingestFile) Ingest(process ProcessFunction) {
5455
log.Debugf("Ingesting %d log lines from %s", len(lines), r.fileName)
5556
switch config.Opt.PipeLine.Ingest.Type {
5657
case "file":
58+
r.PrevRecords = lines
5759
process(lines)
5860
case "file_loop":
5961
// loop forever
@@ -65,6 +67,7 @@ func (r *ingestFile) Ingest(process ProcessFunction) {
6567
return
6668
case <-ticker.C:
6769
log.Debugf("ingestFile; for loop; before process")
70+
r.PrevRecords = lines
6871
process(lines)
6972
}
7073
}
@@ -82,7 +85,7 @@ func NewIngestFile() (Ingester, error) {
8285

8386
ch := make(chan bool, 1)
8487
utils.RegisterExitChannel(ch)
85-
return &ingestFile{
88+
return &IngestFile{
8689
fileName: config.Opt.PipeLine.Ingest.File.Filename,
8790
exitChan: ch,
8891
}, nil

pkg/pipeline/pipeline_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,13 @@
1818
package pipeline
1919

2020
import (
21+
"github.com/json-iterator/go"
2122
"github.com/netobserv/flowlogs2metrics/pkg/config"
23+
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/decode"
24+
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/ingest"
2225
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/transform"
2326
"github.com/netobserv/flowlogs2metrics/pkg/pipeline/write"
27+
"github.com/netobserv/flowlogs2metrics/pkg/test"
2428
"github.com/stretchr/testify/require"
2529
"testing"
2630
)
@@ -37,3 +41,67 @@ func Test_transformToLoki(t *testing.T) {
3741
loki.Write(transformed)
3842
require.NoError(t, err)
3943
}
44+
45+
const configTemplate = `---
46+
log-level: debug
47+
pipeline:
48+
ingest:
49+
type: file
50+
file:
51+
filename: ../../hack/examples/ocp-ipfix-flowlogs.json
52+
decode:
53+
type: json
54+
transform:
55+
- type: generic
56+
generic:
57+
- input: Bytes
58+
output: fl2m_bytes
59+
- input: DstAddr
60+
output: fl2m_dstAddr
61+
- input: DstPort
62+
output: fl2m_dstPort
63+
- input: Packets
64+
output: fl2m_packets
65+
- input: SrcAddr
66+
output: fl2m_srcAddr
67+
- input: SrcPort
68+
output: fl2m_srcPort
69+
extract:
70+
type: none
71+
encode:
72+
type: none
73+
write:
74+
type: none
75+
`
76+
77+
func Test_SimplePipeline(t *testing.T) {
78+
var json = jsoniter.ConfigCompatibleWithStandardLibrary
79+
var mainPipeline *Pipeline
80+
var err error
81+
var b []byte
82+
v := test.InitConfig(t, configTemplate)
83+
config.Opt.PipeLine.Ingest.Type = "file"
84+
config.Opt.PipeLine.Decode.Type = "json"
85+
config.Opt.PipeLine.Extract.Type = "none"
86+
config.Opt.PipeLine.Encode.Type = "none"
87+
config.Opt.PipeLine.Write.Type = "none"
88+
config.Opt.PipeLine.Ingest.File.Filename = "../../hack/examples/ocp-ipfix-flowlogs.json"
89+
90+
val := v.Get("pipeline.transform\n")
91+
b, err = json.Marshal(&val)
92+
require.NoError(t, err)
93+
config.Opt.PipeLine.Transform = string(b)
94+
95+
mainPipeline, err = NewPipeline()
96+
require.NoError(t, err)
97+
98+
// The file ingester reads the entire file, pushes it down the pipeline, and then exits
99+
// So we don't need to run it in a separate go-routine
100+
mainPipeline.Run()
101+
// What is there left to check? Check length of saved data of each stage in private structure.
102+
ingester := mainPipeline.Ingester.(*ingest.IngestFile)
103+
decoder := mainPipeline.Decoder.(*decode.DecodeJson)
104+
writer := mainPipeline.Writer.(*write.WriteNone)
105+
require.Equal(t, len(ingester.PrevRecords), len(decoder.PrevRecords))
106+
require.Equal(t, len(ingester.PrevRecords), len(writer.PrevRecords))
107+
}

pkg/pipeline/write/write.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,18 @@ import (
2525
type Writer interface {
2626
Write(in []config.GenericMap) []config.GenericMap
2727
}
28-
29-
type writeNone struct {
28+
type WriteNone struct {
29+
PrevRecords []config.GenericMap
3030
}
3131

3232
// Write writes entries
33-
func (t *writeNone) Write(in []config.GenericMap) []config.GenericMap {
33+
func (t *WriteNone) Write(in []config.GenericMap) []config.GenericMap {
3434
log.Debugf("entering Write none, in = %v", in)
35-
35+
t.PrevRecords = in
3636
return in
3737
}
3838

3939
// NewWriteNone create a new write
4040
func NewWriteNone() (Writer, error) {
41-
return &writeNone{}, nil
41+
return &WriteNone{}, nil
4242
}

pkg/pipeline/write/write_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import (
2424
)
2525

2626
func Test_Write(t *testing.T) {
27-
wn := writeNone{}
27+
wn := WriteNone{}
2828
wn.Write([]config.GenericMap{{"key": "test"}})
2929
}
3030

3131
func Test_NewWriteNone(t *testing.T) {
3232
writer, err := NewWriteNone()
3333
require.Nil(t, err)
34-
require.Equal(t, writer, &writeNone{})
34+
require.Equal(t, writer, &WriteNone{})
3535

3636
}

0 commit comments

Comments
 (0)