Skip to content

Commit c418a01

Browse files
author
Mario Macias
authored
NETOBSERV-258: add TimeReceived in Kafka transformer (#235)
1 parent 24bcbf2 commit c418a01

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

pkg/pipeline/decode/decode_json.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package decode
1919

2020
import (
2121
"encoding/json"
22+
"time"
2223

2324
"github.com/netobserv/flowlogs-pipeline/pkg/config"
2425
log "github.com/sirupsen/logrus"
@@ -42,6 +43,9 @@ func (c *DecodeJson) Decode(in []interface{}) []config.GenericMap {
4243
continue
4344
}
4445
decodedLine2 := make(config.GenericMap, len(decodedLine))
46+
// flows directly ingested by flp-transformer won't have this field, so we need to add it
47+
// here. If the received line already contains the field, it will be overridden later
48+
decodedLine2["TimeReceived"] = time.Now().Unix()
4549
for k, v := range decodedLine {
4650
if v == nil {
4751
continue

pkg/pipeline/decode/decode_json_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestDecodeJson(t *testing.T) {
3434
newDecode := initNewDecodeJson(t)
3535
decodeJson := newDecode.(*DecodeJson)
3636
inputString1 := "{\"varInt\": 12, \"varString\":\"testString\", \"varBool\":false}"
37-
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true}"
37+
inputString2 := "{\"varInt\": 14, \"varString\":\"testString2\", \"varBool\":true, \"TimeReceived\":12345}"
3838
inputString3 := "{}"
3939
inputStringErr := "{\"varInt\": 14, \"varString\",\"testString2\", \"varBool\":true}"
4040
var in []interface{}
@@ -49,7 +49,11 @@ func TestDecodeJson(t *testing.T) {
4949
require.Equal(t, len(out), 3)
5050
require.Equal(t, float64(12), out[0]["varInt"])
5151
require.Equal(t, "testString", out[0]["varString"])
52-
require.Equal(t, bool(false), out[0]["varBool"])
52+
require.Equal(t, false, out[0]["varBool"])
53+
// TimeReceived is added if it does not exist
54+
require.NotZero(t, out[0]["TimeReceived"])
55+
// TimeReceived is kept if it already existed
56+
require.EqualValues(t, 12345, out[1]["TimeReceived"])
5357

5458
// TODO: Check for more complicated json structures
5559
}

0 commit comments

Comments
 (0)