Skip to content

Commit f0b0220

Browse files
authored
NETOBSERV-786 Query: Table Histogram (#270)
* histogram query * force step as interval for count_over_time
1 parent 27c4fec commit f0b0220

File tree

2 files changed

+58
-6
lines changed

2 files changed

+58
-6
lines changed

pkg/loki/topology_query.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Topology struct {
1515
limit string
1616
rateInterval string
1717
step string
18+
function string
1819
dataField string
1920
fields string
2021
}
@@ -30,11 +31,15 @@ func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step, metric
3031
l = topologyDefaultLimit
3132
}
3233

33-
var t string
34+
var f, t string
3435
switch metricType {
36+
case "count":
37+
f = "count_over_time"
3538
case "packets":
39+
f = "rate"
3640
t = "Packets"
3741
default:
42+
f = "rate"
3843
t = "Bytes"
3944
}
4045

@@ -44,6 +49,7 @@ func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step, metric
4449
rateInterval: rateInterval,
4550
step: step,
4651
limit: l,
52+
function: f,
4753
dataField: t,
4854
fields: getFields(scope, groups),
4955
},
@@ -94,19 +100,21 @@ func (q *TopologyQueryBuilder) Build() string {
94100
// topk(
95101
// <k>,
96102
// sum by(<aggregations>) (
97-
// rate(
103+
// <function>(
98104
// {<label filters>}|<line filters>|json|<json filters>
99-
// |unwrap Bytes|__error__=""[300s]
105+
// |unwrap Bytes|__error__=""[<interval>]
100106
// )
101107
// )
102108
// )
103-
// &<query params>&step=300s
109+
// &<query params>&step=<step>
104110
sb := q.createStringBuilderURL()
105111
sb.WriteString("topk(")
106112
sb.WriteString(q.topology.limit)
107113
sb.WriteString(",sum by(")
108114
sb.WriteString(q.topology.fields)
109-
sb.WriteString(") (rate(")
115+
sb.WriteString(") (")
116+
sb.WriteString(q.topology.function)
117+
sb.WriteString("(")
110118
q.appendLabels(sb)
111119
q.appendLineFilters(sb)
112120
q.appendDeduplicateFilter(sb)
@@ -117,7 +125,11 @@ func (q *TopologyQueryBuilder) Build() string {
117125
sb.WriteString(`|__error__=""`)
118126
}
119127
sb.WriteRune('[')
120-
sb.WriteString(q.topology.rateInterval)
128+
if q.topology.function == "count_over_time" {
129+
sb.WriteString(q.topology.step)
130+
} else {
131+
sb.WriteString(q.topology.rateInterval)
132+
}
121133
sb.WriteString("])))")
122134
q.appendQueryParams(sb)
123135
sb.WriteString("&step=")

pkg/server/server_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,46 @@ func TestLokiConfigurationForTopology(t *testing.T) {
267267
assert.NotNil(t, qr.Result)
268268
}
269269

270+
func TestLokiConfigurationForTableHistogram(t *testing.T) {
271+
// GIVEN a Loki service
272+
lokiMock := httpMock{}
273+
lokiMock.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
274+
_, _ = args.Get(0).(http.ResponseWriter).Write([]byte(`{"status":"","data":{"resultType":"matrix","result":[]}}`))
275+
})
276+
lokiSvc := httptest.NewServer(&lokiMock)
277+
defer lokiSvc.Close()
278+
lokiURL, err := url.Parse(lokiSvc.URL)
279+
require.NoError(t, err)
280+
281+
// THAT is accessed behind the NOO console plugin backend
282+
backendRoutes := setupRoutes(&Config{
283+
Loki: loki.Config{
284+
URL: lokiURL,
285+
Timeout: time.Second,
286+
},
287+
})
288+
backendSvc := httptest.NewServer(backendRoutes)
289+
defer backendSvc.Close()
290+
291+
// WHEN the Loki flows endpoint is queried in the backend using count type
292+
resp, err := backendSvc.Client().Get(backendSvc.URL + "/api/loki/topology?type=count")
293+
require.NoError(t, err)
294+
295+
// THEN the query has been properly forwarded to Loki
296+
req := lokiMock.Calls[0].Arguments[1].(*http.Request)
297+
assert.Equal(t, `topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (count_over_time({app="netobserv-flowcollector"}|~`+"`"+`Duplicate":false`+"`"+`|json[30s])))`, req.URL.Query().Get("query"))
298+
// without any multi-tenancy header
299+
assert.Empty(t, req.Header.Get("X-Scope-OrgID"))
300+
301+
// AND the response is sent back to the client
302+
body, err := ioutil.ReadAll(resp.Body)
303+
require.NoError(t, err)
304+
var qr model.AggregatedQueryResponse
305+
err = json.Unmarshal(body, &qr)
306+
require.NoError(t, err)
307+
assert.NotNil(t, qr.Result)
308+
}
309+
270310
func TestLokiConfiguration_MultiTenant(t *testing.T) {
271311
lokiMock := httpMock{}
272312
lokiMock.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {

0 commit comments

Comments
 (0)