Skip to content

Commit cfbd5e9

Browse files
authored
Merge pull request #72 from NETWAYS/feature/flow-metrics
Add Flow Metrics
2 parents 812dd5c + 244bc1f commit cfbd5e9

File tree

5 files changed

+211
-16
lines changed

5 files changed

+211
-16
lines changed

README.md

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ Flags:
6262
6363
### Pipeline
6464
65-
Checks the status of Logstash pipelines.
65+
Determines the health of Logstash pipelines via "inflight events". These events are calculated as such: `inflight events = events.In - events.Out`
66+
67+
Hint: Use the queue backpressure for Logstash 8.
6668
6769
```bash
6870
Usage:
@@ -86,6 +88,34 @@ Flags:
8688
-h, --help help for pipeline
8789
```
8890
91+
### Pipeline Flow Metrics
92+
93+
Checks the status of a Logstash pipeline's flow metrics (currently queue backpressure).
94+
95+
Hint: Requires Logstash 8.5.0
96+
97+
```bash
98+
99+
Usage:
100+
check_logstash pipeline flow [flags]
101+
102+
Examples:
103+
104+
$ check_logstash pipeline flow --warning 5 --critical 10
105+
OK - Flow metrics alright
106+
\_[OK] queue_backpressure_example:0.34;
107+
108+
$ check_logstash pipeline flow --pipeline example --warning 5 --critical 10
109+
CRITICAL - Flow metrics alright
110+
\_[CRITICAL] queue_backpressure_example:11.23;
111+
112+
Flags:
113+
-c, --critical string Critical threshold for queue Backpressure
114+
-h, --help help for flow
115+
-P, --pipeline string Pipeline Name (default "/")
116+
-w, --warning string Warning threshold for queue Backpressure
117+
```
118+
89119
### Pipeline Reload
90120

91121
Checks the status of Logstash pipelines configuration reload.

cmd/pipeline.go

Lines changed: 130 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ import (
1616

1717
// To store the CLI parameters
1818
type PipelineConfig struct {
19-
PipelineName string
20-
InflightEventsWarn string
21-
InflightEventsCrit string
19+
PipelineName string
20+
Warning string
21+
Critical string
2222
}
2323

2424
// To store the parsed CLI parameters
2525
type PipelineThreshold struct {
26-
inflightEventsWarn *check.Threshold
27-
inflightEventsCrit *check.Threshold
26+
Warning *check.Threshold
27+
Critical *check.Threshold
2828
}
2929

3030
var cliPipelineConfig PipelineConfig
@@ -33,19 +33,19 @@ func parsePipeThresholds(config PipelineConfig) (PipelineThreshold, error) {
3333
// Parses the CLI parameters
3434
var t PipelineThreshold
3535

36-
inflightEventsWarn, err := check.ParseThreshold(config.InflightEventsWarn)
36+
warn, err := check.ParseThreshold(config.Warning)
3737
if err != nil {
3838
return t, err
3939
}
4040

41-
t.inflightEventsWarn = inflightEventsWarn
41+
t.Warning = warn
4242

43-
inflightEventsCrit, err := check.ParseThreshold(config.InflightEventsCrit)
43+
crit, err := check.ParseThreshold(config.Critical)
4444
if err != nil {
4545
return t, err
4646
}
4747

48-
t.inflightEventsCrit = inflightEventsCrit
48+
t.Critical = crit
4949

5050
return t, nil
5151
}
@@ -109,10 +109,10 @@ var pipelineCmd = &cobra.Command{
109109
inflightEvents := pipe.Events.In - pipe.Events.Out
110110

111111
summary.WriteString("\n \\_")
112-
if thresholds.inflightEventsCrit.DoesViolate(float64(inflightEvents)) {
112+
if thresholds.Critical.DoesViolate(float64(inflightEvents)) {
113113
states = append(states, check.Critical)
114114
summary.WriteString(fmt.Sprintf("[CRITICAL] inflight_events_%s:%d;", name, inflightEvents))
115-
} else if thresholds.inflightEventsWarn.DoesViolate(float64(inflightEvents)) {
115+
} else if thresholds.Warning.DoesViolate(float64(inflightEvents)) {
116116
states = append(states, check.Warning)
117117
summary.WriteString(fmt.Sprintf("[WARNING] inflight_events_%s:%d;", name, inflightEvents))
118118
} else {
@@ -131,8 +131,8 @@ var pipelineCmd = &cobra.Command{
131131
Value: pipe.Events.Out})
132132
perfList.Add(&perfdata.Perfdata{
133133
Label: fmt.Sprintf("inflight_events_%s", name),
134-
Warn: thresholds.inflightEventsWarn,
135-
Crit: thresholds.inflightEventsCrit,
134+
Warn: thresholds.Warning,
135+
Crit: thresholds.Critical,
136136
Value: inflightEvents})
137137
perfList.Add(&perfdata.Perfdata{
138138
Label: fmt.Sprintf("pipelines.%s.reloads.failures", name),
@@ -253,13 +253,128 @@ var pipelineReloadCmd = &cobra.Command{
253253
},
254254
}
255255

256+
var pipelineFlowCmd = &cobra.Command{
257+
Use: "flow",
258+
Short: "Checks the flow metrics of the Logstash Pipelines",
259+
Long: `Checks the flow metrics of the Logstash Pipelines`,
260+
Example: `
261+
$ check_logstash pipeline flow --warning 5 --critical 10
262+
OK - Flow metrics alright
263+
\_[OK] queue_backpressure_example:0.34;
264+
265+
$ check_logstash pipeline flow --pipeline example --warning 5 --critical 10
266+
CRITICAL - Flow metrics not alright
267+
\_[CRITICAL] queue_backpressure_example:11.23;`,
268+
Run: func(cmd *cobra.Command, args []string) {
269+
var (
270+
output string
271+
rc int
272+
thresholds PipelineThreshold
273+
pp logstash.Pipeline
274+
perfList perfdata.PerfdataList
275+
)
276+
277+
// Parse the thresholds into a central var since we need them later
278+
thresholds, err := parsePipeThresholds(cliPipelineConfig)
279+
if err != nil {
280+
check.ExitError(err)
281+
}
282+
283+
// Creating an client and connecting to the API
284+
c := cliConfig.NewClient()
285+
// localhost:9600/_node/stats/pipelines/ will return all Pipelines
286+
// localhost:9600/_node/stats/pipelines/foo will return the foo Pipeline
287+
u, _ := url.JoinPath(c.Url, "/_node/stats/pipelines", cliPipelineConfig.PipelineName)
288+
resp, err := c.Client.Get(u)
289+
290+
if err != nil {
291+
check.ExitError(err)
292+
}
293+
294+
if resp.StatusCode != http.StatusOK {
295+
check.ExitError(fmt.Errorf("Could not get %s - Error: %d", u, resp.StatusCode))
296+
}
297+
298+
defer resp.Body.Close()
299+
err = json.NewDecoder(resp.Body).Decode(&pp)
300+
301+
if err != nil {
302+
check.ExitError(err)
303+
}
304+
305+
states := make([]int, 0, len(pp.Pipelines))
306+
307+
// Check the flow metrics for each pipeline
308+
var summary strings.Builder
309+
310+
for name, pipe := range pp.Pipelines {
311+
summary.WriteString("\n \\_")
312+
if thresholds.Critical.DoesViolate(pipe.Flow.QueueBackpressure.Current) {
313+
states = append(states, check.Critical)
314+
summary.WriteString(fmt.Sprintf("[CRITICAL] queue_backpressure_%s:%.2f;", name, pipe.Flow.QueueBackpressure.Current))
315+
} else if thresholds.Warning.DoesViolate(pipe.Flow.QueueBackpressure.Current) {
316+
states = append(states, check.Warning)
317+
summary.WriteString(fmt.Sprintf("[WARNING] queue_backpressure_%s:%.2f;", name, pipe.Flow.QueueBackpressure.Current))
318+
} else {
319+
states = append(states, check.OK)
320+
summary.WriteString(fmt.Sprintf("[OK] queue_backpressure_%s:%.2f;", name, pipe.Flow.QueueBackpressure.Current))
321+
}
322+
323+
// Generate perfdata for each event
324+
perfList.Add(&perfdata.Perfdata{
325+
Label: fmt.Sprintf("pipelines.queue_backpressure_%s", name),
326+
Warn: thresholds.Warning,
327+
Crit: thresholds.Critical,
328+
Value: pipe.Flow.QueueBackpressure.Current})
329+
perfList.Add(&perfdata.Perfdata{
330+
Label: fmt.Sprintf("pipelines.%s.output_throughput", name),
331+
Value: pipe.Flow.OutputThroughput.Current})
332+
perfList.Add(&perfdata.Perfdata{
333+
Label: fmt.Sprintf("pipelines.%s.input_throughput", name),
334+
Value: pipe.Flow.InputThroughput.Current})
335+
perfList.Add(&perfdata.Perfdata{
336+
Label: fmt.Sprintf("pipelines.%s.filter_throughput", name),
337+
Value: pipe.Flow.FilterThroughput.Current})
338+
}
339+
340+
// Validate the various subchecks and use the worst state as return code
341+
switch result.WorstState(states...) {
342+
case 0:
343+
rc = check.OK
344+
output = "Flow metrics alright"
345+
case 1:
346+
rc = check.Warning
347+
output = "Flow metrics may not be alright"
348+
case 2:
349+
rc = check.Critical
350+
output = "Flow metrics not alright"
351+
default:
352+
rc = check.Unknown
353+
output = "Flow metrics status unknown"
354+
}
355+
356+
check.ExitRaw(rc, output, summary.String(), "|", perfList.String())
357+
},
358+
}
359+
256360
func init() {
257361
rootCmd.AddCommand(pipelineCmd)
258362

259363
pipelineReloadCmd.Flags().StringVarP(&cliPipelineConfig.PipelineName, "pipeline", "P", "/",
260364
"Pipeline Name")
261365

366+
pipelineFlowCmd.Flags().StringVarP(&cliPipelineConfig.PipelineName, "pipeline", "P", "/",
367+
"Pipeline Name")
368+
pipelineFlowCmd.Flags().StringVarP(&cliPipelineConfig.Warning, "warning", "w", "",
369+
"Warning threshold for queue Backpressure")
370+
pipelineFlowCmd.Flags().StringVarP(&cliPipelineConfig.Critical, "critical", "c", "",
371+
"Critical threshold for queue Backpressure")
372+
373+
_ = pipelineFlowCmd.MarkFlagRequired("warning")
374+
_ = pipelineFlowCmd.MarkFlagRequired("critical")
375+
262376
pipelineCmd.AddCommand(pipelineReloadCmd)
377+
pipelineCmd.AddCommand(pipelineFlowCmd)
263378

264379
fs := pipelineCmd.Flags()
265380

@@ -268,9 +383,9 @@ func init() {
268383
fs.StringVarP(&cliPipelineConfig.PipelineName, "pipeline", "P", "/",
269384
"Pipeline Name")
270385

271-
fs.StringVar(&cliPipelineConfig.InflightEventsWarn, "inflight-events-warn", "",
386+
fs.StringVar(&cliPipelineConfig.Warning, "inflight-events-warn", "",
272387
"Warning threshold for inflight events to be a warning result. Use min:max for a range.")
273-
fs.StringVar(&cliPipelineConfig.InflightEventsCrit, "inflight-events-crit", "",
388+
fs.StringVar(&cliPipelineConfig.Critical, "inflight-events-crit", "",
274389
"Critical threshold for inflight events to be a critical result. Use min:max for a range.")
275390

276391
_ = pipelineCmd.MarkFlagRequired("inflight-events-warn")

cmd/pipeline_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,24 @@ func TestPipelineCmd_Logstash8(t *testing.T) {
162162
args: []string{"run", "../main.go", "pipeline", "reload", "--pipeline", "foo"},
163163
expected: "UNKNOWN - Could not get",
164164
},
165+
{
166+
name: "pipeline-flow-ok",
167+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
168+
w.WriteHeader(http.StatusOK)
169+
w.Write([]byte(`{"host":"foobar","version":"8.7.1","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"ansible-input":{"flow":{"queue_backpressure":{"current":12.34,"last_1_minute":0,"lifetime":2.503e-05},"output_throughput":{"current":0,"last_1_minute":0.344,"lifetime":0.7051},"input_throughput":{"current":10,"last_1_minute":0.5734,"lifetime":1.089},"worker_concurrency":{"current":0.0001815,"last_1_minute":0.0009501,"lifetime":0.003384},"filter_throughput":{"current":0,"last_1_minute":0.5734,"lifetime":1.089}},"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`))
170+
})),
171+
args: []string{"run", "../main.go", "pipeline", "flow", "--warning", "15", "--critical", "20"},
172+
expected: "[OK] queue_backpressure_ansible-input:12.34; | pipelines.queue_backpressure_ansible-input=12.34;15;20 pipelines.ansible-input.output_throughput=0 pipelines.ansible-input.input_throughput=10 pipelines.ansible-input.filter_throughput=0",
173+
},
174+
{
175+
name: "pipeline-flow-critical",
176+
server: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
177+
w.WriteHeader(http.StatusOK)
178+
w.Write([]byte(`{"host":"foobar","version":"8.7.1","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"ansible-input":{"flow":{"queue_backpressure":{"current":10,"last_1_minute":0,"lifetime":2.503e-05},"output_throughput":{"current":0,"last_1_minute":0.344,"lifetime":0.7051},"input_throughput":{"current":10,"last_1_minute":0.5734,"lifetime":1.089},"worker_concurrency":{"current":0.0001815,"last_1_minute":0.0009501,"lifetime":0.003384},"filter_throughput":{"current":0,"last_1_minute":0.5734,"lifetime":1.089}},"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`))
179+
})),
180+
args: []string{"run", "../main.go", "pipeline", "flow", "--warning", "1", "--critical", "2"},
181+
expected: "CRITICAL - Flow metrics not alright",
182+
},
165183
}
166184

167185
for _, test := range tests {

internal/logstash/api.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ type Pipeline struct {
1818
Successes int `json:"successes"`
1919
Failures int `json:"failures"`
2020
} `json:"reloads"`
21+
Flow struct {
22+
QueueBackpressure FlowMetric `json:"queue_backpressure"`
23+
OutputThroughput FlowMetric `json:"output_throughput"`
24+
InputThroughput FlowMetric `json:"input_throughput"`
25+
FilterThroughput FlowMetric `json:"filter_throughput"`
26+
} `json:"flow"`
2127
Queue struct {
2228
Type string `json:"type"`
2329
EventsCount int `json:"events_count"`
@@ -34,6 +40,12 @@ type Pipeline struct {
3440
} `json:"pipelines"`
3541
}
3642

43+
type FlowMetric struct {
44+
Current float64 `json:"current"`
45+
Last1Minute float64 `json:"last_1_minute"`
46+
Lifetime float64 `json:"lifetime"`
47+
}
48+
3749
type Process struct {
3850
MaxFileDescriptors float64 `json:"max_file_descriptors"`
3951
OpenFileDescriptors float64 `json:"open_file_descriptors"`

internal/logstash/api_test.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,26 @@ import (
55
"testing"
66
)
77

8+
func TestUmarshallPipelineFlow(t *testing.T) {
9+
10+
j := `{"host":"foobar","version":"8.7.1","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"ansible-input":{"flow":{"queue_backpressure":{"current":10,"last_1_minute":0,"lifetime":2.503e-05},"output_throughput":{"current":0,"last_1_minute":0.344,"lifetime":0.7051},"input_throughput":{"current":10,"last_1_minute":0.5734,"lifetime":1.089},"worker_concurrency":{"current":0.0001815,"last_1_minute":0.0009501,"lifetime":0.003384},"filter_throughput":{"current":0,"last_1_minute":0.5734,"lifetime":1.089}},"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"plugins":{"inputs":[{"id":"b","name":"beats","events":{"queue_push_duration_in_millis":0,"out":0}}],"codecs":[{"id":"plain","name":"plain","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}},{"id":"json","name":"json","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}}],"filters":[],"outputs":[{"id":"f","name":"redis","events":{"duration_in_millis":18,"out":50,"in":100}}]},"reloads":{"successes":0,"last_success_timestamp":null,"last_error":null,"last_failure_timestamp":null,"failures":0},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`
11+
12+
var pl Pipeline
13+
err := json.Unmarshal([]byte(j), &pl)
14+
15+
if err != nil {
16+
t.Error(err)
17+
}
18+
19+
if pl.Pipelines["ansible-input"].Flow.QueueBackpressure.Current != 10 {
20+
t.Error("\nActual: ", pl.Pipelines["ansible-input"].Flow.QueueBackpressure.Current, "\nExpected: ", "10")
21+
}
22+
23+
if pl.Pipelines["ansible-input"].Flow.InputThroughput.Current != 10 {
24+
t.Error("\nActual: ", pl.Pipelines["ansible-input"].Flow.InputThroughput.Current, "\nExpected: ", "10")
25+
}
26+
}
27+
828
func TestUmarshallPipeline(t *testing.T) {
929

1030
j := `{"host":"foobar","version":"7.17.8","http_address":"127.0.0.1:9600","id":"4","name":"test","ephemeral_id":"5","status":"green","snapshot":false,"pipeline":{"workers":2,"batch_size":125,"batch_delay":50},"pipelines":{"ansible-input":{"events":{"filtered":0,"duration_in_millis":0,"queue_push_duration_in_millis":0,"out":50,"in":100},"plugins":{"inputs":[{"id":"b","name":"beats","events":{"queue_push_duration_in_millis":0,"out":0}}],"codecs":[{"id":"plain","name":"plain","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}},{"id":"json","name":"json","decode":{"writes_in":0,"duration_in_millis":0,"out":0},"encode":{"writes_in":0,"duration_in_millis":0}}],"filters":[],"outputs":[{"id":"f","name":"redis","events":{"duration_in_millis":18,"out":50,"in":100}}]},"reloads":{"successes":0,"last_success_timestamp":null,"last_error":null,"last_failure_timestamp":null,"failures":0},"queue":{"type":"memory","events_count":0,"queue_size_in_bytes":0,"max_queue_size_in_bytes":0},"hash":"f","ephemeral_id":"f"}}}`

0 commit comments

Comments
 (0)