Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions internal/legacy/reqcounter/reqcounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,68 +28,78 @@ import (

// RequestCounter records the number of HTTP requests to GCR.
type RequestCounter struct {
Mutex sync.Mutex // Lock to prevent race-conditions with concurrent processes.
Requests uint64 // Number of HTTP requests since recording started.
Since time.Time // When the current request counter began recording requests.
Interval time.Duration // The duration of time between each log.
Threshold uint64 // When to warn of a high request count during a logging cycle. Setting a
// Lock to prevent race-conditions with concurrent processes.
mutex sync.Mutex

// Number of HTTP requests since recording started.
requests uint64

// When the current request counter began recording requests.
since time.Time

// The duration of time between each log.
interval time.Duration

// When to warn of a high request count during a logging cycle. Setting a
// non-zero threshold allows the request counter to reset each interval. If left uninitialized,
// the request counter will be persistent and never warn or reset.
threshold uint64
}

// increment adds 1 to the request counter, signifying another call to GCR.
func (rc *RequestCounter) Increment() {
rc.Mutex.Lock()
rc.Requests++
rc.Mutex.Unlock()
rc.mutex.Lock()
rc.requests++
rc.mutex.Unlock()
}

// Flush records the number of HTTP requests found and resets the request counter.
func (rc *RequestCounter) Flush() {
func (rc *RequestCounter) flush() {
// Hold onto the lock when reading & writing the request counter.
rc.Mutex.Lock()
defer rc.Mutex.Unlock()
rc.mutex.Lock()
defer rc.mutex.Unlock()

rc.log()

// Only allow request counters wi
if rc.Threshold > 0 {
if rc.threshold > 0 {
// Reset the request counter.
rc.reset()
rc.resetWithLockHeld()
}
}

// log the number of HTTP requests found. If the number of requests exceeds the
// threshold, log an additional warning message.
func (rc *RequestCounter) log() {
msg := fmt.Sprintf("From %s to %s [%d min] there have been %d requests to GCR.", rc.Since.Format(TimestampFormat), Clock.Now().Format(TimestampFormat), rc.Interval/time.Minute, rc.Requests)
msg := fmt.Sprintf("From %s to %s [%d min] there have been %d requests to GCR.", rc.since.Format(TimestampFormat), Clock.Now().Format(TimestampFormat), rc.interval/time.Minute, rc.requests)
Debug(msg)
if rc.Threshold > 0 && rc.Requests > rc.Threshold {
msg = fmt.Sprintf("The threshold of %d requests has been surpassed.", rc.Threshold)
if rc.threshold > 0 && rc.requests > rc.threshold {
msg = fmt.Sprintf("The threshold of %d requests has been surpassed.", rc.threshold)
Warn(msg)
}
}

// reset clears the request counter and stamps the current time of reset.
func (rc *RequestCounter) reset() {
rc.Requests = 0
rc.Since = Clock.Now()
// this function should be called with the mutex held.
func (rc *RequestCounter) resetWithLockHeld() {
rc.requests = 0
rc.since = Clock.Now()
}

// watch indefinitely performs repeated sleep/log cycles.
func (rc *RequestCounter) watch() {
// TODO: @tylerferrara create a way to cleanly terminate this goroutine.
go func() {
for {
rc.Cycle()
rc.cycle()
}
}()
}

// Cycle sleeps for the request counter's interval and flushes itself.
func (rc *RequestCounter) Cycle() {
Clock.Sleep(rc.Interval)
rc.Flush()
// cycle sleeps for the request counter's interval and flushes itself.
func (rc *RequestCounter) cycle() {
Clock.Sleep(rc.interval)
rc.flush()
}

// RequestCounters holds multiple request counters.
Expand All @@ -108,7 +118,7 @@ func (nm *NetworkMonitor) increment() {
}

// Log begins logging each request counter at their specified intervals.
func (nm *NetworkMonitor) Log() {
func (nm *NetworkMonitor) log() {
for _, rc := range nm.RequestCounters {
rc.watch()
}
Expand Down Expand Up @@ -151,24 +161,21 @@ func Init() {
// GCR quota, but acts as a rough estimation of this quota, indicating when throttling may occur.
requestCounters := RequestCounters{
{
Mutex: sync.Mutex{},
Requests: 0,
Since: Clock.Now(),
Interval: QuotaWindowShort,
Threshold: 50000,
requests: 0,
since: Clock.Now(),
interval: QuotaWindowShort,
threshold: 50000,
},
{
Mutex: sync.Mutex{},
Requests: 0,
Since: Clock.Now(),
Interval: QuotaWindowLong,
Threshold: 1000000,
requests: 0,
since: Clock.Now(),
interval: QuotaWindowLong,
threshold: 1000000,
},
{
Mutex: sync.Mutex{},
Requests: 0,
Since: Clock.Now(),
Interval: QuotaWindowShort,
requests: 0,
since: Clock.Now(),
interval: QuotaWindowShort,
},
}

Expand All @@ -178,7 +185,7 @@ func Init() {
}

// Begin logging network traffic.
NetMonitor.Log()
NetMonitor.log()
}

// Increment increases the all request counters by 1, signifying an HTTP
Expand Down
106 changes: 52 additions & 54 deletions internal/legacy/reqcounter/reqcounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,51 +14,48 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package reqcounter_test
package reqcounter

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

rc "sigs.k8s.io/promo-tools/v4/internal/legacy/reqcounter"
tw "sigs.k8s.io/promo-tools/v4/internal/legacy/timewrapper"
)

// defaultThreshold should be used as a default request counter threshold.
const defaultThreshold = 10000

// defaultTime should be used as a timestamp for all request counters.
// The actual time represents: September 22, 2002 at 05:03:16.
var defaultTime, _ = time.Parse("020106 150405", "020106 150405") //nolint: errcheck
// The actual time represents: April 1, 2025 at 12:34:56.
var defaultTime, _ = time.Parse("020106 150405", "010425 123456") //nolint: errcheck

// NewRequestCounter returns a new request counter with the given number of requests.
// All other object fields are set to default values.
func NewRequestCounter(requests uint64) rc.RequestCounter {
return rc.RequestCounter{
Mutex: sync.Mutex{},
Requests: requests,
Since: defaultTime,
Interval: time.Second,
Threshold: defaultThreshold,
func NewRequestCounter(requests uint64) RequestCounter {
return RequestCounter{
requests: requests,
since: defaultTime,
interval: time.Second,
threshold: defaultThreshold,
}
}

func TestInit(t *testing.T) {
rc.Init()
Init()
// Ensure request counting is enabled.
require.True(t, rc.EnableCounting, "Init did not enable counting.")
require.True(t, EnableCounting, "Init did not enable counting.")
// Ensure request counters are created.
require.NotEmpty(t, rc.NetMonitor.RequestCounters, "Init did not create any request counters within the global Monitor.")
require.NotEmpty(t, NetMonitor.RequestCounters, "Init did not create any request counters within the global Monitor.")
// Ensure at least one request counter uses the QuotaWindowShort.
foundQuotaWindowShort, foundQuotaWindowLong := false, false
for _, requestCounter := range rc.NetMonitor.RequestCounters {
if requestCounter.Interval == rc.QuotaWindowShort {
for _, requestCounter := range NetMonitor.RequestCounters {
if requestCounter.interval == QuotaWindowShort {
foundQuotaWindowShort = true
} else if requestCounter.Interval == rc.QuotaWindowLong {
} else if requestCounter.interval == QuotaWindowLong {
foundQuotaWindowLong = true
}
}
Expand All @@ -69,78 +66,78 @@ func TestInit(t *testing.T) {
func TestIncrement(t *testing.T) {
// Create request counters which use these request counts and
// populate the Monitor global variable.
requestCounters := []rc.RequestCounter{
requestCounters := []RequestCounter{
NewRequestCounter(0),
NewRequestCounter(9),
NewRequestCounter(2839),
}
netMonitor := &rc.NetworkMonitor{
RequestCounters: rc.RequestCounters{
netMonitor := &NetworkMonitor{
RequestCounters: RequestCounters{
&requestCounters[0],
&requestCounters[1],
&requestCounters[2],
},
}
// Create the request counters we expect to get after calling rc.Increment.
expectedCounters := []rc.RequestCounter{
// Create the request counters we expect to get after calling Increment.
expectedCounters := []RequestCounter{
NewRequestCounter(1),
NewRequestCounter(10),
NewRequestCounter(2840),
}
expectedNetMonitor := &rc.NetworkMonitor{
RequestCounters: rc.RequestCounters{
expectedNetMonitor := &NetworkMonitor{
RequestCounters: RequestCounters{
&expectedCounters[0],
&expectedCounters[1],
&expectedCounters[2],
},
}
// Set the global network monitor.
rc.NetMonitor = netMonitor
NetMonitor = netMonitor
// Ensure request counter modification can only occur when counting is enabled. Therefore,
// the global network monitor should not be mutated with this call to Increment.
rc.EnableCounting = false
rc.Increment()
require.EqualValues(t, netMonitor, rc.NetMonitor, "Request counters were modified while counting was disabled.")
EnableCounting = false
Increment()
require.EqualValues(t, netMonitor, NetMonitor, "Request counters were modified while counting was disabled.")
// Ensure the Increment function actually increments each request counter's requests field.
rc.EnableCounting = true
rc.Increment()
require.EqualValues(t, expectedNetMonitor, rc.NetMonitor, "Request counters were not incremented correctly.")
EnableCounting = true
Increment()
require.EqualValues(t, expectedNetMonitor, NetMonitor, "Request counters were not incremented correctly.")
}

func TestFlush(t *testing.T) {
// Create a local invocation of time.
requestCounter := NewRequestCounter(33)
// Mock the logrus.Debug function.
debugCalls := 0
rc.Debug = func(_ ...interface{}) {
Debug = func(_ ...interface{}) {
debugCalls++
}
requestCounter.Flush()
requestCounter.flush()
// Ensure logrus.Debug was called.
require.Equal(t, 1, debugCalls, "Flush() failed to trigger a debug statement.")
// Ensure the request counter is reset, where time advances and the requests are zeroed.
require.Equal(t, uint64(0), requestCounter.Requests, "Calling Flush() did not reset the request counter to 0.")
require.True(t, defaultTime.Before(requestCounter.Since), "Calling Flush() did not reset the request counter timestamp.")
require.Equal(t, uint64(0), requestCounter.requests, "Calling Flush() did not reset the request counter to 0.")
require.True(t, defaultTime.Before(requestCounter.since), "Calling Flush() did not reset the request counter timestamp.")

// Create a persistent request counter.
requestCounter = NewRequestCounter(66)
requestCounter.Threshold = 0
requestCounter.Flush()
requestCounter.threshold = 0
requestCounter.flush()
// Ensure the request counter did not reset.
require.Equal(t, uint64(66), requestCounter.Requests, "Calling Flush() reset the requests of a non-resettable request counter.")
require.True(t, defaultTime.Equal(requestCounter.Since), "Calling Flush() reset the timestamp of a non-resettable request counter.")
require.Equal(t, uint64(66), requestCounter.requests, "Calling Flush() reset the requests of a non-resettable request counter.")
require.True(t, defaultTime.Equal(requestCounter.since), "Calling Flush() reset the timestamp of a non-resettable request counter.")

// Create a request counter that exceeded a threshold.
requestCounter = NewRequestCounter(600)
requestCounter.Threshold = 599
requestCounter.threshold = 599
// Reset debug counter.
debugCalls = 0
// Mock logrus.Warn.
warnCalls := 0
rc.Warn = func(_ ...interface{}) {
Warn = func(_ ...interface{}) {
warnCalls++
}
requestCounter.Flush()
requestCounter.flush()
// Ensure both logrus.Debug and logrus.Warn was called.
require.Equal(t, 1, debugCalls, "Flush() failed to trigger a debug statement after exceeding the threshold.")
require.Equal(t, 1, warnCalls, "Flush() failed to trigger a warning statement after exceeding the threshold.")
Expand All @@ -167,12 +164,12 @@ func TestCycle(t *testing.T) {
// Define all tests.
cycleTests := []cycleTest{
{
interval: rc.QuotaWindowShort,
interval: QuotaWindowShort,
requests: []int{3, 7, 50, 1},
threshold: defaultThreshold,
},
{
interval: rc.QuotaWindowLong,
interval: QuotaWindowLong,
requests: []int{2, 622, 5, 8},
threshold: defaultThreshold,
},
Expand All @@ -198,30 +195,29 @@ func TestCycle(t *testing.T) {
},
}
// Simulate HTTP requests by repeatedly incrementing the request counter.
mockNetworkTraffic := func(requestCounter *rc.RequestCounter, requests int) {
for requests > 0 {
mockNetworkTraffic := func(requestCounter *RequestCounter, requests int) {
for range requests {
requestCounter.Increment()
requests--
}
}

// Run all tests.
for _, ct := range cycleTests {
// Create a simple request counter.
requestCounter := NewRequestCounter(0)
requestCounter.Interval = ct.interval
requestCounter.Threshold = ct.threshold
requestCounter.interval = ct.interval
requestCounter.threshold = ct.threshold
// Collect logging statements.
logs := []string{}
// Mock logrus.Debug calls.
rc.Debug = func(args ...interface{}) {
Debug = func(args ...interface{}) {
logs = append(logs, fmt.Sprint(args[0]))
}
// Mock time.
fakeTime := tw.FakeTime{
Time: defaultTime,
}
rc.Clock = &fakeTime
Clock = &fakeTime
// Collect expected logs.
expected := []string{}
// Repeatedly run sleep/log cycles.
Expand All @@ -239,14 +235,16 @@ func TestCycle(t *testing.T) {
// The starting timestamp must not change.
expectedStartingClock = defaultTime
}
expect := fmt.Sprintf("From %s to %s [%d min] there have been %d requests to GCR.", expectedStartingClock.Format(rc.TimestampFormat), nextClock.Format(rc.TimestampFormat), ct.interval/time.Minute, expectedRequests)
expect := fmt.Sprintf("From %s to %s [%d min] there have been %d requests to GCR.", expectedStartingClock.Format(TimestampFormat), nextClock.Format(TimestampFormat), ct.interval/time.Minute, expectedRequests)
expected = append(expected, expect)
testClock = nextClock
// Simulate HTTP requests.
mockNetworkTraffic(&requestCounter, requests)
// Initiate a sleep/log cycle.
requestCounter.Cycle()
requestCounter.cycle()

}
t.Logf("actual logs: %v", logs)
// Ensure the correct logs were produced.
require.EqualValues(t, expected, logs, "The request counter produced malformed logs.")
}
Expand Down