Skip to content

Commit 587553f

Browse files
committed
leak detector
1 parent 20805b0 commit 587553f

File tree

6 files changed

+478
-8
lines changed

6 files changed

+478
-8
lines changed

framework/.changeset/v0.13.1.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Add resource leak detector automation

framework/leak/detector.go

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
package leak
2+
3+
/*
4+
Resource leak detector
5+
This module provides a Prometheus-based leak detector for long-running soak tests. It detects leaks by comparing the median resource usage at the start and end of a test and flags any increases that breach configured thresholds.
6+
7+
Usage Note: Set the WarmUpDuration to at least 20% of your test length for reliable metrics.
8+
It is also recommend to use it with 3h+ soak tests for less false-positives.
9+
*/
10+
11+
import (
12+
"fmt"
13+
"strconv"
14+
"time"
15+
16+
f "github.com/smartcontractkit/chainlink-testing-framework/framework"
17+
)
18+
19+
// ResourceLeakCheckerConfig is resource leak checker config with Prometheus base URL
20+
type ResourceLeakCheckerConfig struct {
21+
PrometheusBaseURL string
22+
}
23+
24+
// ResourceLeakChecker is resource leak cheker instance
25+
type ResourceLeakChecker struct {
26+
PrometheusURL string
27+
c PromQuerier
28+
}
29+
30+
// WithPrometheusBaseURL sets Prometheus base URL, example http://localhost:9099
31+
func WithPrometheusBaseURL(url string) func(*ResourceLeakChecker) {
32+
return func(rlc *ResourceLeakChecker) {
33+
rlc.PrometheusURL = url
34+
}
35+
}
36+
37+
// WithQueryClient sets Prometheus query client
38+
func WithQueryClient(c PromQuerier) func(*ResourceLeakChecker) {
39+
return func(rlc *ResourceLeakChecker) {
40+
rlc.c = c
41+
}
42+
}
43+
44+
// PromQueries is an interface for querying Prometheus containing only methods we need for detecting resource leaks
45+
type PromQuerier interface {
46+
Query(query string, timestamp time.Time) (*f.PrometheusQueryResponse, error)
47+
}
48+
49+
// NewResourceLeakChecker creates a new resource leak checker
50+
func NewResourceLeakChecker(opts ...func(*ResourceLeakChecker)) *ResourceLeakChecker {
51+
lc := &ResourceLeakChecker{}
52+
for _, o := range opts {
53+
o(lc)
54+
}
55+
if lc.c == nil {
56+
lc.c = f.NewPrometheusQueryClient(f.LocalPrometheusBaseURL)
57+
}
58+
return lc
59+
}
60+
61+
// CheckConfig describes leak check configuration
62+
type CheckConfig struct {
63+
Query string
64+
Start time.Time
65+
End time.Time
66+
WarmUpDuration time.Duration
67+
}
68+
69+
// MeasureLeak measures resource leak between start and end timestamps
70+
// WarmUpDuration is used to ignore warm up interval results for more stable comparison
71+
func (rc *ResourceLeakChecker) MeasureLeak(
72+
c *CheckConfig,
73+
) (float64, error) {
74+
if c.Start.After(c.End) {
75+
return 0, fmt.Errorf("start time is greated than end time: %s -> %s", c.Start, c.End)
76+
}
77+
if c.WarmUpDuration > c.End.Sub(c.Start)/2 {
78+
return 0, fmt.Errorf("warm up duration can't be more than 50 percent of test interval between start and end timestamps: %s", c.WarmUpDuration)
79+
}
80+
startWithWarmUp := c.Start.Add(c.WarmUpDuration)
81+
memStart, err := rc.c.Query(c.Query, startWithWarmUp)
82+
if err != nil {
83+
return 0, fmt.Errorf("failed to get memory for the test start: %w", err)
84+
}
85+
86+
memEnd, err := rc.c.Query(c.Query, c.End)
87+
if err != nil {
88+
return 0, fmt.Errorf("failed to get memory for the test end: %w", err)
89+
}
90+
91+
resStart := memStart.Data.Result
92+
resEnd := memEnd.Data.Result
93+
if len(resStart) == 0 {
94+
return 0, fmt.Errorf("no results for start timestamp: %s", c.Start)
95+
}
96+
if len(resEnd) == 0 {
97+
return 0, fmt.Errorf("no results for end timestamp: %s", c.End)
98+
}
99+
100+
if len(resStart[0].Value) < 2 {
101+
return 0, fmt.Errorf("invalid Prometheus response for start timestamp, should have timestamp and value: %s", c.Start)
102+
}
103+
if len(resEnd[0].Value) < 2 {
104+
return 0, fmt.Errorf("invalid Prometheus response for end timestamp, should have timestamp and value: %s", c.End)
105+
}
106+
107+
memStartVal, startOk := memStart.Data.Result[0].Value[1].(string)
108+
if !startOk {
109+
return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.Start, memStart.Data.Result[0].Value[1])
110+
}
111+
memEndVal, endOk := memEnd.Data.Result[0].Value[1].(string)
112+
if !endOk {
113+
return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.End, memEnd.Data.Result[0].Value[1])
114+
}
115+
116+
memStartValFloat, err := strconv.ParseFloat(memStartVal, 64)
117+
if err != nil {
118+
return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err)
119+
}
120+
memEndValFloat, err := strconv.ParseFloat(memEndVal, 64)
121+
if err != nil {
122+
return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err)
123+
}
124+
125+
totalIncreasePercentage := (memEndValFloat / memStartValFloat * 100) - 100
126+
127+
f.L.Debug().
128+
Float64("Start", memStartValFloat).
129+
Float64("End", memEndValFloat).
130+
Float64("Increase", totalIncreasePercentage).
131+
Msg("Memory increase total (percentage)")
132+
return totalIncreasePercentage, nil
133+
}

framework/leak/detector_cl_node.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package leak
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"time"
7+
8+
"github.com/smartcontractkit/chainlink-testing-framework/framework"
9+
)
10+
11+
// ClNodesCheck contains thresholds which can be verified for each Chainlink node
12+
// it is recommended to set some WarmUpDuration, 20% of overall test time
13+
// to have more stable results
14+
type CLNodesCheck struct {
15+
NumNodes int
16+
Start time.Time
17+
End time.Time
18+
WarmUpDuration time.Duration
19+
CPUThreshold float64
20+
MemoryThreshold float64
21+
}
22+
23+
// CLNodesLeakDetector is Chainlink node specific resource leak detector
24+
// can be used with both local and remote Chainlink node sets (DONs)
25+
type CLNodesLeakDetector struct {
26+
Mode string
27+
CPUQuery, MemoryQuery string
28+
c *ResourceLeakChecker
29+
}
30+
31+
// WithCPUQuery allows to override CPU leak query (Prometheus)
32+
func WithCPUQuery(q string) func(*CLNodesLeakDetector) {
33+
return func(cd *CLNodesLeakDetector) {
34+
cd.CPUQuery = q
35+
}
36+
}
37+
38+
// WithCPUQuery allows to override Memory leak query (Prometheus)
39+
func WithMemoryQuery(q string) func(*CLNodesLeakDetector) {
40+
return func(cd *CLNodesLeakDetector) {
41+
cd.MemoryQuery = q
42+
}
43+
}
44+
45+
// NewCLNodesLeakDetector create new Chainlink node specific resource leak detector with Prometheus client
46+
func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDetector)) (*CLNodesLeakDetector, error) {
47+
cd := &CLNodesLeakDetector{
48+
c: c,
49+
}
50+
for _, o := range opts {
51+
o(cd)
52+
}
53+
if cd.Mode == "" {
54+
cd.Mode = "devenv"
55+
}
56+
switch cd.Mode {
57+
case "devenv":
58+
cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name=~"don-node%d"}[5m])) * 100`
59+
cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`
60+
case "griddle":
61+
return nil, fmt.Errorf("not implemented yet")
62+
default:
63+
return nil, fmt.Errorf("invalid mode, use: 'devenv' or 'griddle'")
64+
}
65+
return cd, nil
66+
}
67+
68+
// Check runs all resource leak checks and returns errors if threshold reached for any of them
69+
func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error {
70+
if t.NumNodes == 0 {
71+
return fmt.Errorf("cl nodes num must be > 0")
72+
}
73+
memoryDiffs := make([]float64, 0)
74+
cpuDiffs := make([]float64, 0)
75+
errs := make([]error, 0)
76+
for i := range t.NumNodes {
77+
memoryDiff, err := cd.c.MeasureLeak(&CheckConfig{
78+
Query: fmt.Sprintf(cd.MemoryQuery, i),
79+
Start: t.Start,
80+
End: t.End,
81+
WarmUpDuration: t.WarmUpDuration,
82+
})
83+
if err != nil {
84+
return fmt.Errorf("memory leak check failed: %w", err)
85+
}
86+
memoryDiffs = append(memoryDiffs, memoryDiff)
87+
cpuDiff, err := cd.c.MeasureLeak(&CheckConfig{
88+
Query: fmt.Sprintf(cd.CPUQuery, i),
89+
Start: t.Start,
90+
End: t.End,
91+
WarmUpDuration: t.WarmUpDuration,
92+
})
93+
if err != nil {
94+
return fmt.Errorf("cpu leak check failed: %w", err)
95+
}
96+
cpuDiffs = append(cpuDiffs, cpuDiff)
97+
98+
if memoryDiff >= t.MemoryThreshold {
99+
errs = append(errs, fmt.Errorf(
100+
"Memory leak detected for node %d and interval: [%s -> %s], diff: %.f",
101+
i, t.Start, t.End, memoryDiff,
102+
))
103+
}
104+
if cpuDiff >= t.CPUThreshold {
105+
errs = append(errs, fmt.Errorf(
106+
"CPU leak detected for node %d and interval: [%s -> %s], diff: %.f",
107+
i, t.Start, t.End, cpuDiff,
108+
))
109+
}
110+
}
111+
framework.L.Info().
112+
Any("MemoryDiffs", memoryDiffs).
113+
Any("CPUDiffs", cpuDiffs).
114+
Msg("Leaks info")
115+
return errors.Join(errs...)
116+
}

framework/leak/detector_fake.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package leak
2+
3+
import (
4+
"time"
5+
6+
f "github.com/smartcontractkit/chainlink-testing-framework/framework"
7+
)
8+
9+
var _ PromQuerier = (*FakeQueryClient)(nil)
10+
11+
type FakeQueryClient struct {
12+
startRespCalled bool
13+
endRespCalled bool
14+
isStartResp bool
15+
startResp *f.PrometheusQueryResponse
16+
endResp *f.PrometheusQueryResponse
17+
}
18+
19+
func NewFakeQueryClient() *FakeQueryClient {
20+
return &FakeQueryClient{}
21+
}
22+
23+
func (qc *FakeQueryClient) SetResponses(sr *f.PrometheusQueryResponse, er *f.PrometheusQueryResponse) {
24+
qc.isStartResp = true
25+
qc.startResp = sr
26+
qc.endResp = er
27+
}
28+
29+
func (qc *FakeQueryClient) Query(query string, timestamp time.Time) (*f.PrometheusQueryResponse, error) {
30+
if qc.isStartResp {
31+
qc.isStartResp = false
32+
return qc.startResp, nil
33+
}
34+
qc.isStartResp = true
35+
return qc.endResp, nil
36+
}
37+
38+
func PromSingleValueResponse(val string) *f.PrometheusQueryResponse {
39+
return &f.PrometheusQueryResponse{
40+
Status: "",
41+
Data: &f.PromQueryResponseData{
42+
Result: []f.PromQueryResponseResult{
43+
{
44+
Metric: map[string]string{},
45+
Value: []interface{}{"", val},
46+
},
47+
},
48+
},
49+
}
50+
}

0 commit comments

Comments
 (0)