Skip to content

Commit 649a0f7

Browse files
authored
Move github.com/elastic/beats/v7/libbeat/monitoring/report/buffer (#42)
1 parent 7503b28 commit 649a0f7

File tree

4 files changed

+365
-0
lines changed

4 files changed

+365
-0
lines changed

monitoring/report/buffer/buffer.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package buffer
19+
20+
import "sync"
21+
22+
// ringBuffer is a buffer with a fixed number of items that can be tracked.
23+
//
24+
// We assume that the size of the buffer is greater than one.
25+
// the buffer should be thread-safe.
26+
type ringBuffer struct {
27+
mu sync.Mutex
28+
entries []interface{}
29+
i int
30+
full bool
31+
}
32+
33+
// newBuffer returns a reference to a new ringBuffer with set size.
34+
func newBuffer(size int) *ringBuffer {
35+
return &ringBuffer{
36+
entries: make([]interface{}, size),
37+
}
38+
}
39+
40+
// add will add the passed entry to the buffer.
41+
func (r *ringBuffer) add(entry interface{}) {
42+
r.mu.Lock()
43+
defer r.mu.Unlock()
44+
r.entries[r.i] = entry
45+
r.i = (r.i + 1) % len(r.entries)
46+
if r.i == 0 {
47+
r.full = true
48+
}
49+
}
50+
51+
// getAll returns all entries in the buffer in order
52+
func (r *ringBuffer) getAll() []interface{} {
53+
r.mu.Lock()
54+
defer r.mu.Unlock()
55+
if r.i == 0 && !r.full {
56+
return []interface{}{}
57+
}
58+
if !r.full {
59+
return r.entries[:r.i]
60+
}
61+
if r.full && r.i == 0 {
62+
return r.entries
63+
}
64+
return append(r.entries[r.i:], r.entries[:r.i]...)
65+
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package buffer
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
)
25+
26+
func Test_ringBuffer(t *testing.T) {
27+
t.Run("Len 2 buffer", func(t *testing.T) {
28+
r := newBuffer(2)
29+
assert.Equal(t, 2, len(r.entries))
30+
assert.False(t, r.full)
31+
assert.Equal(t, 0, r.i)
32+
33+
assert.Empty(t, r.getAll())
34+
35+
r.add("1")
36+
assert.False(t, r.full)
37+
assert.Equal(t, 1, r.i)
38+
assert.Equal(t, r.entries[0], "1")
39+
assert.ElementsMatch(t, []string{"1"}, r.getAll())
40+
41+
r.add("2")
42+
assert.True(t, r.full)
43+
assert.Equal(t, 0, r.i)
44+
assert.Equal(t, r.entries[1], "2")
45+
assert.ElementsMatch(t, []string{"1", "2"}, r.getAll())
46+
47+
r.add("3")
48+
assert.True(t, r.full)
49+
assert.Equal(t, 1, r.i)
50+
assert.Equal(t, r.entries[0], "3")
51+
assert.ElementsMatch(t, []string{"2", "3"}, r.getAll())
52+
53+
r.add("4")
54+
assert.True(t, r.full)
55+
assert.Equal(t, 0, r.i)
56+
assert.Equal(t, r.entries[1], "4")
57+
assert.ElementsMatch(t, []string{"3", "4"}, r.getAll())
58+
})
59+
60+
t.Run("Len 3 buffer", func(t *testing.T) {
61+
r := newBuffer(3)
62+
assert.Empty(t, r.getAll())
63+
64+
r.add("1")
65+
assert.ElementsMatch(t, []string{"1"}, r.getAll())
66+
67+
r.add("2")
68+
assert.ElementsMatch(t, []string{"1", "2"}, r.getAll())
69+
70+
r.add("3")
71+
assert.ElementsMatch(t, []string{"1", "2", "3"}, r.getAll())
72+
73+
r.add("4")
74+
assert.ElementsMatch(t, []string{"2", "3", "4"}, r.getAll())
75+
76+
r.add("5")
77+
assert.ElementsMatch(t, []string{"3", "4", "5"}, r.getAll())
78+
79+
r.add("6")
80+
assert.ElementsMatch(t, []string{"4", "5", "6"}, r.getAll())
81+
})
82+
}
83+
84+
func Benchmark_ringBuffer_add(b *testing.B) {
85+
b.Run("size 6", func(b *testing.B) {
86+
r := newBuffer(6)
87+
for i := 0; i < b.N; i++ {
88+
r.add(i)
89+
}
90+
})
91+
b.Run("size 60", func(b *testing.B) {
92+
r := newBuffer(60)
93+
for i := 0; i < b.N; i++ {
94+
r.add(i)
95+
}
96+
})
97+
b.Run("size 600", func(b *testing.B) {
98+
r := newBuffer(600)
99+
for i := 0; i < b.N; i++ {
100+
r.add(i)
101+
}
102+
})
103+
b.Run("size 6000", func(b *testing.B) {
104+
r := newBuffer(6000)
105+
for i := 0; i < b.N; i++ {
106+
r.add(i)
107+
}
108+
})
109+
}
110+
111+
func Benchmark_ringBuffer_add_filled(b *testing.B) {
112+
b.Run("size 6", func(b *testing.B) {
113+
r := newFullBuffer(b, 6)
114+
for i := 0; i < b.N; i++ {
115+
r.add(i)
116+
}
117+
})
118+
b.Run("size 60", func(b *testing.B) {
119+
r := newFullBuffer(b, 60)
120+
for i := 0; i < b.N; i++ {
121+
r.add(i)
122+
}
123+
})
124+
b.Run("size 600", func(b *testing.B) {
125+
r := newFullBuffer(b, 600)
126+
for i := 0; i < b.N; i++ {
127+
r.add(i)
128+
}
129+
})
130+
b.Run("size 6000", func(b *testing.B) {
131+
r := newFullBuffer(b, 6000)
132+
for i := 0; i < b.N; i++ {
133+
r.add(i)
134+
}
135+
})
136+
}
137+
138+
func newFullBuffer(b *testing.B, size int) *ringBuffer {
139+
b.Helper()
140+
r := newBuffer(size)
141+
// fill size +1 so full flag is toggled
142+
for i := 0; i < size+1; i++ {
143+
r.add(-1)
144+
}
145+
return r
146+
}

monitoring/report/buffer/config.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package buffer
19+
20+
import (
21+
"time"
22+
)
23+
24+
type config struct {
25+
Period time.Duration `config:"period"`
26+
Size int `config:"size" validate:"min=2"`
27+
Namespaces []string `config:"namespaces"`
28+
}
29+
30+
// defaultConfig will gather 10m of data (every 10s) for the stats registry.
31+
func defaultConfig() config {
32+
return config{
33+
Period: 10 * time.Second,
34+
Size: 60,
35+
Namespaces: []string{"stats"},
36+
}
37+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package buffer
19+
20+
import (
21+
"encoding/json"
22+
"fmt"
23+
"net/http"
24+
"sync"
25+
"time"
26+
27+
c "github.com/elastic/elastic-agent-libs/config"
28+
"github.com/elastic/elastic-agent-libs/monitoring"
29+
)
30+
31+
// reporter is a struct that will fill a ring buffer for each monitored registry.
32+
type reporter struct {
33+
config
34+
wg sync.WaitGroup
35+
done chan struct{}
36+
registries map[string]*monitoring.Registry
37+
38+
// ring buffers for namespaces
39+
entries map[string]*ringBuffer
40+
}
41+
42+
// MakeReporter creates and starts a reporter with the given config.
43+
func MakeReporter(cfg *c.C) (*reporter, error) {
44+
config := defaultConfig()
45+
if cfg != nil {
46+
if err := cfg.Unpack(&config); err != nil {
47+
return nil, err
48+
}
49+
}
50+
51+
r := &reporter{
52+
config: config,
53+
done: make(chan struct{}),
54+
registries: map[string]*monitoring.Registry{},
55+
entries: map[string]*ringBuffer{},
56+
}
57+
58+
for _, ns := range r.config.Namespaces {
59+
reg := monitoring.GetNamespace(ns).GetRegistry()
60+
r.registries[ns] = reg
61+
r.entries[ns] = newBuffer(r.config.Size)
62+
}
63+
64+
r.wg.Add(1)
65+
go func() {
66+
defer r.wg.Done()
67+
r.snapshotLoop()
68+
}()
69+
return r, nil
70+
}
71+
72+
// Stop will stop the reporter from collecting new information.
73+
// It will not clear any previously collected data.
74+
func (r *reporter) Stop() {
75+
close(r.done)
76+
r.wg.Wait()
77+
}
78+
79+
// snapshotLoop will collect a snapshot for each monitored registry for the configured period and store them in the correct buffer.
80+
func (r *reporter) snapshotLoop() {
81+
ticker := time.NewTicker(r.config.Period)
82+
defer ticker.Stop()
83+
84+
for {
85+
var ts time.Time
86+
select {
87+
case <-r.done:
88+
return
89+
case ts = <-ticker.C:
90+
}
91+
92+
for name, reg := range r.registries {
93+
snap := monitoring.CollectStructSnapshot(reg, monitoring.Full, false)
94+
if _, ok := snap["@timestamp"]; !ok {
95+
snap["@timestamp"] = ts.UTC()
96+
}
97+
r.entries[name].add(snap)
98+
}
99+
}
100+
}
101+
102+
// ServeHTTP is an http.Handler that will respond with the monitored registries buffer's contents in JSON.
103+
func (r *reporter) ServeHTTP(w http.ResponseWriter, req *http.Request) {
104+
resp := make(map[string][]interface{}, len(r.entries))
105+
for name, entries := range r.entries {
106+
resp[name] = entries.getAll()
107+
}
108+
109+
p, err := json.Marshal(resp)
110+
if err != nil {
111+
w.WriteHeader(500)
112+
fmt.Fprintf(w, "Unable to encode JSON response: %v", err)
113+
return
114+
}
115+
w.Header().Set("Content-Type", "application/json; charset=utf-8")
116+
_, _ = w.Write(p)
117+
}

0 commit comments

Comments
 (0)