Skip to content

Commit 1cf0584

Browse files
authored
add periodic/Doer for frequency-limited action execution (#220)
The Doer limits the execution of an action to at most once within a specified period. It's designed for managing frequent events by providing a summary with the number of occurrences within the given period. If no event happen, the action is not executed.
1 parent 1160ccd commit 1cf0584

File tree

2 files changed

+314
-0
lines changed

2 files changed

+314
-0
lines changed

periodic/ratelimited.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package periodic
19+
20+
import (
21+
"sync"
22+
"sync/atomic"
23+
"time"
24+
)
25+
26+
// Doer limits an action to be executed at most once within a specified period.
27+
// It is intended for managing events that occur frequently, but instead of an
28+
// action being taken for every event, the action should be executed at most
29+
// once within a given period of time.
30+
//
31+
// Doer takes a function to execute, doFn, which is called every time
32+
// the specified period has elapsed with the number of events and the period.
33+
type Doer struct {
34+
count atomic.Uint64
35+
36+
period time.Duration
37+
38+
// doFn is called for executing the action every period if at least one
39+
// event happened. It receives the count of events and the period.
40+
doFn func(count uint64, d time.Duration)
41+
lastDone time.Time
42+
done chan struct{}
43+
44+
// nowFn is used to acquire the current time instead of time.Now so it can
45+
// be mocked for tests.
46+
nowFn func() time.Time
47+
// newTickerFn is used to acquire a *time.Ticker instead of time.NewTicker
48+
// so it can be mocked for tests.
49+
newTickerFn func(duration time.Duration) *time.Ticker
50+
51+
started atomic.Bool
52+
wg sync.WaitGroup
53+
ticker *time.Ticker
54+
}
55+
56+
// NewDoer returns a new Doer. It takes a doFn, which is
57+
// called with the current count of events and the period.
58+
func NewDoer(period time.Duration, doFn func(count uint64, d time.Duration)) *Doer {
59+
return &Doer{
60+
period: period,
61+
doFn: doFn,
62+
63+
nowFn: time.Now,
64+
newTickerFn: time.NewTicker,
65+
}
66+
}
67+
68+
func (r *Doer) Add() {
69+
r.count.Add(1)
70+
}
71+
72+
func (r *Doer) AddN(n uint64) {
73+
r.count.Add(n)
74+
}
75+
76+
func (r *Doer) Start() {
77+
if r.started.Swap(true) {
78+
return
79+
}
80+
81+
r.done = make(chan struct{})
82+
r.lastDone = r.nowFn()
83+
r.ticker = r.newTickerFn(r.period)
84+
85+
r.wg.Add(1)
86+
go func() {
87+
defer r.wg.Done()
88+
defer r.ticker.Stop()
89+
90+
for {
91+
select {
92+
case <-r.ticker.C:
93+
r.do()
94+
case <-r.done:
95+
r.do()
96+
return
97+
}
98+
}
99+
}()
100+
}
101+
102+
func (r *Doer) Stop() {
103+
if !r.started.Swap(false) {
104+
return
105+
}
106+
107+
close(r.done)
108+
r.wg.Wait()
109+
}
110+
111+
func (r *Doer) do() {
112+
count := r.count.Swap(0)
113+
if count == 0 {
114+
return
115+
}
116+
117+
r.lastDone = r.nowFn()
118+
r.doFn(count, r.period)
119+
120+
}

periodic/ratelimited_test.go

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package periodic
19+
20+
import (
21+
"bytes"
22+
"fmt"
23+
"io"
24+
"math"
25+
"strings"
26+
"sync"
27+
"testing"
28+
"time"
29+
30+
"github.com/stretchr/testify/assert"
31+
"github.com/stretchr/testify/require"
32+
)
33+
34+
type syncBuffer struct {
35+
buff bytes.Buffer
36+
mu sync.Mutex
37+
}
38+
39+
func (s *syncBuffer) Read(p []byte) (n int, err error) {
40+
s.mu.Lock()
41+
defer s.mu.Unlock()
42+
43+
return s.buff.Read(p)
44+
}
45+
46+
func (s *syncBuffer) Write(p []byte) (n int, err error) {
47+
s.mu.Lock()
48+
defer s.mu.Unlock()
49+
50+
return fmt.Fprintf(&s.buff, "%s", p)
51+
}
52+
53+
func TestRateLimitedLogger(t *testing.T) {
54+
pattern := "%d occurrences in the last %s"
55+
56+
newLogger := func() (io.Reader, func(count uint64, d time.Duration)) {
57+
sbuff := &syncBuffer{}
58+
59+
logFn := func(count uint64, d time.Duration) {
60+
fmt.Fprintf(sbuff, pattern, count, d)
61+
}
62+
return sbuff, logFn
63+
}
64+
65+
now := time.Now()
66+
67+
t.Run("Start", func(t *testing.T) {
68+
r := NewDoer(math.MaxInt64, func(count uint64, d time.Duration) {})
69+
defer r.Stop()
70+
r.nowFn = func() time.Time { return now }
71+
72+
r.Start()
73+
74+
assert.True(t, r.started.Load(),
75+
"Start() was called, thus 'started' should be true")
76+
assert.NotEmpty(t, r.lastDone, "lastDone should have been set")
77+
})
78+
79+
t.Run("Start twice", func(t *testing.T) {
80+
r := NewDoer(math.MaxInt64, func(count uint64, d time.Duration) {})
81+
defer r.Stop()
82+
83+
r.nowFn = func() time.Time { return now }
84+
85+
r.Start()
86+
r.nowFn = func() time.Time { return now.Add(time.Minute) }
87+
r.Start()
88+
89+
assert.Equal(t, now, r.lastDone, "lastDone should have been set a second time")
90+
})
91+
92+
t.Run("Stop", func(t *testing.T) {
93+
tcs := []struct {
94+
name string
95+
count int
96+
}{
97+
{name: "once", count: 1},
98+
{name: "twice", count: 2},
99+
}
100+
101+
for _, tc := range tcs {
102+
t.Run(tc.name, func(t *testing.T) {
103+
buff, logFn := newLogger()
104+
r := NewDoer(42*time.Second, logFn)
105+
r.nowFn = func() time.Time { return now }
106+
107+
tch := make(chan time.Time)
108+
r.newTickerFn = func(duration time.Duration) *time.Ticker {
109+
return &time.Ticker{C: tch}
110+
}
111+
112+
r.Start()
113+
114+
r.nowFn = func() time.Time { return now.Add(42 * time.Second) }
115+
116+
r.count.Add(1)
117+
for i := 0; i < tc.count; i++ {
118+
r.Stop()
119+
}
120+
121+
bs, err := io.ReadAll(buff)
122+
require.NoError(t, err, "failed reading logs")
123+
logs := string(bs)
124+
got := strings.TrimSpace(logs)
125+
126+
assert.False(t, r.started.Load(),
127+
"Stop() was called, thus 'started' should be false")
128+
assert.Len(t, strings.Split(got, "\n"), 1)
129+
assert.Contains(t, logs, fmt.Sprintf(pattern, 1, 42*time.Second))
130+
131+
})
132+
}
133+
})
134+
135+
t.Run("Add", func(t *testing.T) {
136+
buff, logFn := newLogger()
137+
r := NewDoer(42*time.Second, logFn)
138+
defer r.Stop()
139+
140+
r.nowFn = func() time.Time { return now }
141+
142+
tch := make(chan time.Time)
143+
r.newTickerFn = func(duration time.Duration) *time.Ticker {
144+
return &time.Ticker{C: tch}
145+
}
146+
147+
r.Start()
148+
r.Add()
149+
150+
r.nowFn = func() time.Time { return now.Add(42 * time.Second) }
151+
tch <- now.Add(42 * time.Second)
152+
153+
var logs string
154+
assert.Eventually(t, func() bool {
155+
bs, err := io.ReadAll(buff)
156+
require.NoError(t, err, "failed reading logs")
157+
logs = strings.TrimSpace(string(bs))
158+
159+
return len(strings.Split(logs, "\n")) == 1
160+
}, time.Second, 100*time.Millisecond, "should have found 1 do")
161+
162+
assert.Contains(t, logs, fmt.Sprintf(pattern, 1, 42*time.Second))
163+
})
164+
165+
t.Run("AddN", func(t *testing.T) {
166+
buff, logFn := newLogger()
167+
r := NewDoer(42*time.Second, logFn)
168+
defer r.Stop()
169+
170+
r.nowFn = func() time.Time { return now }
171+
172+
tch := make(chan time.Time)
173+
r.newTickerFn = func(duration time.Duration) *time.Ticker {
174+
return &time.Ticker{C: tch}
175+
}
176+
177+
r.Start()
178+
r.AddN(42)
179+
180+
r.nowFn = func() time.Time { return now.Add(42 * time.Second) }
181+
tch <- now.Add(42 * time.Second)
182+
183+
var logs string
184+
assert.Eventually(t, func() bool {
185+
bs, err := io.ReadAll(buff)
186+
require.NoError(t, err, "failed reading logs")
187+
logs = strings.TrimSpace(string(bs))
188+
189+
return len(strings.Split(logs, "\n")) == 1
190+
}, time.Second, 100*time.Millisecond, "should have found 1 do")
191+
192+
assert.Contains(t, logs, fmt.Sprintf(pattern, 42, 42*time.Second))
193+
})
194+
}

0 commit comments

Comments
 (0)