Skip to content

Commit 25c2f5b

Browse files
committed
Use ovnk-lib to get event string
1 parent 3d4db9c commit 25c2f5b

File tree

5 files changed

+172
-0
lines changed

5 files changed

+172
-0
lines changed

cmd/plugin-backend.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ import (
55
"flag"
66
"fmt"
77
"os"
8+
"slices"
89

910
"github.com/sirupsen/logrus"
1011

1112
"github.com/netobserv/network-observability-console-plugin/pkg/config"
13+
"github.com/netobserv/network-observability-console-plugin/pkg/decoders"
14+
"github.com/netobserv/network-observability-console-plugin/pkg/model"
1215
"github.com/netobserv/network-observability-console-plugin/pkg/server"
1316
)
1417

@@ -54,6 +57,11 @@ func main() {
5457
log.WithError(err).Fatal("auth checker error")
5558
}
5659

60+
if slices.Contains(cfg.Frontend.Features, "NetworkEvents") {
61+
// Add decoder hook
62+
model.AddFlowLineMapping(decoders.NetworkEventsToString)
63+
}
64+
5765
go server.StartMetrics(&server.MetricsConfig{
5866
Port: cfg.Server.MetricsPort,
5967
CertPath: cfg.Server.CertPath,

pkg/decoders/network-events.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package decoders
2+
3+
import (
4+
"encoding/json"
5+
6+
"github.com/netobserv/network-observability-console-plugin/pkg/model/fields"
7+
ovnmodel "github.com/ovn-org/ovn-kubernetes/go-controller/observability-lib/model"
8+
"github.com/sirupsen/logrus"
9+
)
10+
11+
var dlog = logrus.WithField("module", "decoders")
12+
13+
func NetworkEventsToString(in string) string {
14+
line := make(map[string]any)
15+
if err := json.Unmarshal([]byte(in), &line); err != nil {
16+
dlog.Errorf("Could not decode NetworkEvent: %v", err)
17+
return in
18+
}
19+
if ne, found := line[fields.NetworkEvents]; found {
20+
if neList, isList := ne.([]any); isList {
21+
var messages []string
22+
for _, item := range neList {
23+
if neItem, isMap := item.(map[string]any); isMap {
24+
messages = append(messages, networkEventItemToString(neItem))
25+
}
26+
}
27+
line[fields.NetworkEvents] = messages
28+
b, err := json.Marshal(line)
29+
if err != nil {
30+
dlog.Errorf("Could not reencode NetworkEvent: %v", err)
31+
return in
32+
}
33+
return string(b)
34+
}
35+
}
36+
return in
37+
}
38+
39+
func networkEventItemToString(in map[string]any) string {
40+
if msg := getAsString(in, "Message"); msg != "" {
41+
return msg
42+
}
43+
if feat := getAsString(in, "Feature"); feat == "acl" {
44+
aclObj := ovnmodel.ACLEvent{
45+
Action: getAsString(in, "Action"),
46+
Actor: getAsString(in, "Type"),
47+
Name: getAsString(in, "Name"),
48+
Namespace: getAsString(in, "Namespace"),
49+
Direction: getAsString(in, "Direction"),
50+
}
51+
return aclObj.String()
52+
}
53+
return ""
54+
}
55+
56+
func getAsString(in map[string]any, key string) string {
57+
if anyV, hasKey := in[key]; hasKey {
58+
if v, isStr := anyV.(string); isStr {
59+
return v
60+
}
61+
}
62+
return ""
63+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package decoders
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestReencode_NoChange(t *testing.T) {
10+
js := `{"SrcK8S_Name":"ip-10-0-1-7.ec2.internal","Bytes":66,"Packets":1,"Interfaces":["br-ex"]}`
11+
out := NetworkEventsToString(js)
12+
assert.Equal(t, js, out)
13+
}
14+
15+
func TestReencode_UpdateEvent(t *testing.T) {
16+
js := `{"SrcK8S_Name":"ip-10-0-1-7.ec2.internal","Bytes":66,"Packets":1,"Interfaces":["br-ex"],"NetworkEvents":[{"Feature":"acl","Type":"NetpolNode","Action":"allow","Direction":"Ingress"}]}`
17+
out := NetworkEventsToString(js)
18+
assert.Equal(
19+
t,
20+
`{"Bytes":66,"Interfaces":["br-ex"],"NetworkEvents":["Allowed by default allow from local node policy, direction ingress"],"Packets":1,"SrcK8S_Name":"ip-10-0-1-7.ec2.internal"}`,
21+
out,
22+
)
23+
24+
js = `{"SrcK8S_Name":"ip-10-0-1-7.ec2.internal","Bytes":66,"Packets":1,"Interfaces":["br-ex"],"NetworkEvents":[{"Message":"custom message"}]}`
25+
out = NetworkEventsToString(js)
26+
assert.Equal(
27+
t,
28+
`{"Bytes":66,"Interfaces":["br-ex"],"NetworkEvents":["custom message"],"Packets":1,"SrcK8S_Name":"ip-10-0-1-7.ec2.internal"}`,
29+
out,
30+
)
31+
}

pkg/model/loki.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,16 @@ import (
1010
"github.com/prometheus/common/model"
1111
)
1212

13+
type FlowLineMapping = func(string) string
14+
15+
var (
16+
flowLineMappings = []FlowLineMapping{}
17+
)
18+
19+
func AddFlowLineMapping(f FlowLineMapping) {
20+
flowLineMappings = append(flowLineMappings, f)
21+
}
22+
1323
// QueryResponse represents the http json response to a logQL query
1424
type QueryResponse struct {
1525
Status string `json:"status"`
@@ -128,6 +138,13 @@ func unmarshalQueryResponseData(data []byte) (ResultType, ResultValue, interface
128138
case ResultTypeStream:
129139
var s Streams
130140
err = json.Unmarshal(unmarshal.Result, &s)
141+
for _, mapping := range flowLineMappings {
142+
for i := range s {
143+
for ii := range s[i].Entries {
144+
s[i].Entries[ii].Line = mapping(s[i].Entries[ii].Line)
145+
}
146+
}
147+
}
131148
value = s
132149
case ResultTypeMatrix:
133150
var m Matrix

pkg/model/loki_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package model
22

33
import (
44
"encoding/json"
5+
"strings"
56
"testing"
7+
"time"
68

79
"github.com/netobserv/network-observability-console-plugin/pkg/utils/constants"
810
"github.com/stretchr/testify/assert"
@@ -34,6 +36,57 @@ func TestQueryResponseUnmarshal(t *testing.T) {
3436
assert.IsType(t, expType, qr.Data.Result)
3537
}
3638

39+
func TestQueryResponseUnmarshalLineMapping(t *testing.T) {
40+
AddFlowLineMapping(func(in string) string {
41+
return strings.ReplaceAll(in, "Bytes", "Bytezz")
42+
})
43+
js := `
44+
{
45+
"status": "success",
46+
"data": {
47+
"resultType": "streams",
48+
"result": [
49+
{
50+
"stream": {
51+
"app": "netobserv-flowcollector"
52+
},
53+
"values": [
54+
[
55+
"1731926700000000000",
56+
"{\"SrcK8S_Name\":\"ip-10-0-1-7.ec2.internal\",\"Bytes\":66,\"Packets\":1,\"Interfaces\":[\"br-ex\"]}"
57+
]
58+
]
59+
}
60+
],
61+
"stats": {
62+
"summary": {}
63+
}
64+
}
65+
}
66+
`
67+
68+
var qr QueryResponse
69+
err := json.Unmarshal([]byte(js), &qr)
70+
require.NoError(t, err)
71+
assert.Equal(t, ResultTypeStream, string(qr.Data.ResultType))
72+
assert.NotNil(t, qr.Data.Result)
73+
var expType Streams
74+
assert.IsType(t, expType, qr.Data.Result)
75+
data := qr.Data.Result.(Streams)
76+
assert.Len(t, data, 1)
77+
assert.Equal(t, Stream{
78+
Labels: map[string]string{
79+
"app": "netobserv-flowcollector",
80+
},
81+
Entries: []Entry{
82+
{
83+
Timestamp: time.Date(2024, time.November, 18, 11, 45, 0, 0, time.Local),
84+
Line: `{"SrcK8S_Name":"ip-10-0-1-7.ec2.internal","Bytezz":66,"Packets":1,"Interfaces":["br-ex"]}`,
85+
},
86+
},
87+
}, data[0])
88+
}
89+
3790
func TestAggregatedQueryResponseMarshal(t *testing.T) {
3891
qr := AggregatedQueryResponse{
3992
ResultType: ResultTypeStream,

0 commit comments

Comments
 (0)