Skip to content

Commit cc1510b

Browse files
author
andreaxia
committed
add cached transaction gather test
1 parent 112c8d4 commit cc1510b

File tree

2 files changed

+137
-3
lines changed

2 files changed

+137
-3
lines changed

pkg/cachedtransactiongather/cachedtransactiongather.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,10 @@ type cachedTransactionGather struct {
4040
}
4141

4242
func (c *cachedTransactionGather) Gather() ([]*io_prometheus_client.MetricFamily, func(), error) {
43-
c.lock.RLock()
43+
c.lock.Lock()
4444
shouldGather := time.Now().After(c.nextCollectionTime)
45-
c.lock.RUnlock()
4645
if shouldGather {
4746
begin := time.Now()
48-
c.lock.Lock()
4947
c.nextCollectionTime = c.nextCollectionTime.Add(c.cacheInterval)
5048
metrics, done, err := c.gather.Gather()
5149
if err != nil {
@@ -60,6 +58,8 @@ func (c *cachedTransactionGather) Gather() ([]*io_prometheus_client.MetricFamily
6058
c.lock.Unlock()
6159
duration := time.Since(begin)
6260
level.Info(c.logger).Log("msg", "Collect all products done", "duration_seconds", duration.Seconds())
61+
} else {
62+
c.lock.Unlock()
6363
}
6464
c.lock.RLock()
6565
defer c.lock.RUnlock()
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package cachedtransactiongather
2+
3+
import (
4+
"fmt"
5+
"github.com/prometheus/client_golang/prometheus"
6+
io_prometheus_client "github.com/prometheus/client_model/go"
7+
"github.com/prometheus/common/promlog"
8+
"sort"
9+
"sync"
10+
"testing"
11+
"time"
12+
)
13+
14+
type mockGatherer struct {
15+
sleepUntil time.Duration
16+
}
17+
18+
func (m mockGatherer) Gather() ([]*io_prometheus_client.MetricFamily, error) {
19+
fmt.Println("start gather: " + m.sleepUntil.String())
20+
time.Sleep(m.sleepUntil)
21+
fmt.Sprintf("end gather: " + m.sleepUntil.String())
22+
return []*io_prometheus_client.MetricFamily{}, nil
23+
}
24+
25+
func newMockGatherer(duration time.Duration) prometheus.Gatherer {
26+
return &mockGatherer{
27+
sleepUntil: duration,
28+
}
29+
}
30+
31+
type multiTRegistry struct {
32+
tGatherers []prometheus.TransactionalGatherer
33+
}
34+
35+
func newMultiConcurrencyRegistry(tGatherers ...prometheus.TransactionalGatherer) *multiTRegistry {
36+
return &multiTRegistry{
37+
tGatherers: tGatherers,
38+
}
39+
}
40+
41+
// Gather implements TransactionalGatherer interface.
42+
func (r *multiTRegistry) Gather() (mfs []*io_prometheus_client.MetricFamily, done func(), err error) {
43+
dFns := make([]func(), 0, len(r.tGatherers))
44+
wait := sync.WaitGroup{}
45+
wait.Add(len(r.tGatherers))
46+
for i := range r.tGatherers {
47+
go func(i int) {
48+
_, _, _ = r.tGatherers[i].Gather()
49+
wait.Done()
50+
}(i)
51+
}
52+
wait.Wait()
53+
54+
sort.Slice(mfs, func(i, j int) bool {
55+
return *mfs[i].Name < *mfs[j].Name
56+
})
57+
return mfs, func() {
58+
for _, d := range dFns {
59+
d()
60+
}
61+
}, nil
62+
}
63+
64+
func TestCache(t *testing.T) {
65+
promlogConfig := &promlog.Config{}
66+
cacheInterval := 60 * time.Second
67+
logger := promlog.New(promlogConfig)
68+
gather := NewCachedTransactionGather(
69+
newMultiConcurrencyRegistry(
70+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*40)),
71+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*23)),
72+
prometheus.ToTransactionalGatherer(newMockGatherer(time.Second*7)),
73+
),
74+
cacheInterval, logger,
75+
)
76+
77+
t.Run("gather with multiple calls should not error", func(t *testing.T) {
78+
wait := sync.WaitGroup{}
79+
wait.Add(10)
80+
for range [10]int{} {
81+
go func() {
82+
begin := time.Now()
83+
mfs, done, err := gather.Gather()
84+
defer done()
85+
if err != nil {
86+
logger.Log("err", err)
87+
t.Errorf("gather error: %v", err)
88+
}
89+
logger.Log("mfs", mfs, "done", "err", err)
90+
if time.Since(begin) > cacheInterval {
91+
t.Errorf("gather cost more than cacheInterval %v", time.Since(begin).String())
92+
}
93+
wait.Done()
94+
}()
95+
}
96+
wait.Wait()
97+
})
98+
99+
t.Run("gather success", func(t *testing.T) {
100+
wait := sync.WaitGroup{}
101+
wait.Add(3)
102+
go func() {
103+
mfs, done, err := gather.Gather()
104+
defer done()
105+
if err != nil {
106+
logger.Log("err", err)
107+
t.Errorf("gather error: %v", err)
108+
}
109+
logger.Log("mfs", mfs, "done", "err", err)
110+
wait.Done()
111+
}()
112+
go func() {
113+
mfs, done, err := gather.Gather()
114+
defer done()
115+
if err != nil {
116+
logger.Log("err", err)
117+
t.Errorf("gather error: %v", err)
118+
}
119+
logger.Log("mfs", mfs, "done", "err", err)
120+
wait.Done()
121+
}()
122+
go func() {
123+
mfs, done, err := gather.Gather()
124+
defer done()
125+
if err != nil {
126+
logger.Log("err", err)
127+
t.Errorf("gather error: %v", err)
128+
}
129+
logger.Log("mfs", mfs, "done", "err", err)
130+
wait.Done()
131+
}()
132+
wait.Wait()
133+
})
134+
}

0 commit comments

Comments
 (0)