Skip to content

Commit 650220f

Browse files
authored
Merge pull request kubernetes#88134 from jennybuckley/rate-limit-log-2
Improve rate limiter latency logging and add component-base metric
2 parents 13beb9b + 2bcf99f commit 650220f

File tree

5 files changed

+95
-17
lines changed

5 files changed

+95
-17
lines changed

staging/src/k8s.io/client-go/rest/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ go_library(
6565
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
6666
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
6767
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
68+
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
6869
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
6970
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
7071
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",

staging/src/k8s.io/client-go/rest/request.go

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"k8s.io/apimachinery/pkg/runtime"
4040
"k8s.io/apimachinery/pkg/runtime/schema"
4141
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
42+
utilclock "k8s.io/apimachinery/pkg/util/clock"
4243
"k8s.io/apimachinery/pkg/util/net"
4344
"k8s.io/apimachinery/pkg/watch"
4445
restclientwatch "k8s.io/client-go/rest/watch"
@@ -556,37 +557,69 @@ func (r *Request) tryThrottle(ctx context.Context) error {
556557
klog.V(3).Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
557558
}
558559
if latency > extraLongThrottleLatency {
559-
globalThrottledLogger.Log(2, fmt.Sprintf("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String()))
560+
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
561+
// but we use a throttled logger to prevent spamming.
562+
globalThrottledLogger.Infof("Throttling request took %v, request: %s:%s", latency, r.verb, r.URL().String())
560563
}
564+
metrics.RateLimiterLatency.Observe(r.verb, r.finalURLTemplate(), latency)
561565

562566
return err
563567
}
564568

565-
type throttledLogger struct {
566-
logTimeLock sync.RWMutex
567-
lastLogTime time.Time
569+
type throttleSettings struct {
570+
logLevel klog.Level
568571
minLogInterval time.Duration
572+
573+
lastLogTime time.Time
574+
lock sync.RWMutex
575+
}
576+
577+
type throttledLogger struct {
578+
clock utilclock.PassiveClock
579+
settings []*throttleSettings
569580
}
570581

571582
var globalThrottledLogger = &throttledLogger{
572-
minLogInterval: 1 * time.Second,
583+
clock: utilclock.RealClock{},
584+
settings: []*throttleSettings{
585+
{
586+
logLevel: 2,
587+
minLogInterval: 1 * time.Second,
588+
}, {
589+
logLevel: 0,
590+
minLogInterval: 10 * time.Second,
591+
},
592+
},
573593
}
574594

575-
func (b *throttledLogger) Log(level klog.Level, message string) {
576-
if bool(klog.V(level)) {
577-
if func() bool {
578-
b.logTimeLock.RLock()
579-
defer b.logTimeLock.RUnlock()
580-
return time.Since(b.lastLogTime) > b.minLogInterval
581-
}() {
582-
b.logTimeLock.Lock()
583-
defer b.logTimeLock.Unlock()
584-
if time.Since(b.lastLogTime) > b.minLogInterval {
585-
klog.V(level).Info(message)
586-
b.lastLogTime = time.Now()
595+
func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
596+
for _, setting := range b.settings {
597+
if bool(klog.V(setting.logLevel)) {
598+
// Return early without write locking if possible.
599+
if func() bool {
600+
setting.lock.RLock()
601+
defer setting.lock.RUnlock()
602+
return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
603+
}() {
604+
setting.lock.Lock()
605+
defer setting.lock.Unlock()
606+
if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
607+
setting.lastLogTime = b.clock.Now()
608+
return setting.logLevel, true
609+
}
587610
}
611+
return -1, false
588612
}
589613
}
614+
return -1, false
615+
}
616+
617+
// Infof will write a log message at each logLevel specified by the reciever's throttleSettings
618+
// as long as it hasn't written a log message more recently than minLogInterval.
619+
func (b *throttledLogger) Infof(message string, args ...interface{}) {
620+
if logLevel, ok := b.attemptToLog(); ok {
621+
klog.V(logLevel).Infof(message, args...)
622+
}
590623
}
591624

592625
// Watch attempts to begin watching the requested location.

staging/src/k8s.io/client-go/rest/request_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"os"
3232
"reflect"
3333
"strings"
34+
"sync"
3435
"syscall"
3536
"testing"
3637
"time"
@@ -2190,3 +2191,30 @@ func TestRequestPreflightCheck(t *testing.T) {
21902191
})
21912192
}
21922193
}
2194+
2195+
func TestThrottledLogger(t *testing.T) {
2196+
now := time.Now()
2197+
clock := clock.NewFakeClock(now)
2198+
globalThrottledLogger.clock = clock
2199+
2200+
logMessages := 0
2201+
for i := 0; i < 10000; i++ {
2202+
var wg sync.WaitGroup
2203+
wg.Add(100)
2204+
for j := 0; j < 100; j++ {
2205+
go func() {
2206+
if _, ok := globalThrottledLogger.attemptToLog(); ok {
2207+
logMessages++
2208+
}
2209+
wg.Done()
2210+
}()
2211+
}
2212+
wg.Wait()
2213+
now = now.Add(1 * time.Second)
2214+
clock.SetTime(now)
2215+
}
2216+
2217+
if a, e := logMessages, 1000; a != e {
2218+
t.Fatalf("expected %v log messages, but got %v", e, a)
2219+
}
2220+
}

staging/src/k8s.io/client-go/tools/metrics/metrics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ var (
5353
ClientCertRotationAge DurationMetric = noopDuration{}
5454
// RequestLatency is the latency metric that rest clients will update.
5555
RequestLatency LatencyMetric = noopLatency{}
56+
// RateLimiterLatency is the client side rate limiter latency metric.
57+
RateLimiterLatency LatencyMetric = noopLatency{}
5658
// RequestResult is the result metric that rest clients will update.
5759
RequestResult ResultMetric = noopResult{}
5860
)
@@ -62,6 +64,7 @@ type RegisterOpts struct {
6264
ClientCertExpiry ExpiryMetric
6365
ClientCertRotationAge DurationMetric
6466
RequestLatency LatencyMetric
67+
RateLimiterLatency LatencyMetric
6568
RequestResult ResultMetric
6669
}
6770

@@ -78,6 +81,9 @@ func Register(opts RegisterOpts) {
7881
if opts.RequestLatency != nil {
7982
RequestLatency = opts.RequestLatency
8083
}
84+
if opts.RateLimiterLatency != nil {
85+
RateLimiterLatency = opts.RateLimiterLatency
86+
}
8187
if opts.RequestResult != nil {
8288
RequestResult = opts.RequestResult
8389
}

staging/src/k8s.io/component-base/metrics/prometheus/restclient/metrics.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ var (
3838
[]string{"verb", "url"},
3939
)
4040

41+
rateLimiterLatency = k8smetrics.NewHistogramVec(
42+
&k8smetrics.HistogramOpts{
43+
Name: "rest_client_rate_limiter_duration_seconds",
44+
Help: "Client side rate limiter latency in seconds. Broken down by verb and URL.",
45+
Buckets: k8smetrics.ExponentialBuckets(0.001, 2, 10),
46+
},
47+
[]string{"verb", "url"},
48+
)
49+
4150
requestResult = k8smetrics.NewCounterVec(
4251
&k8smetrics.CounterOpts{
4352
Name: "rest_client_requests_total",
@@ -106,6 +115,7 @@ func init() {
106115
ClientCertExpiry: execPluginCertTTLAdapter,
107116
ClientCertRotationAge: &rotationAdapter{m: execPluginCertRotation},
108117
RequestLatency: &latencyAdapter{m: requestLatency},
118+
RateLimiterLatency: &latencyAdapter{m: rateLimiterLatency},
109119
RequestResult: &resultAdapter{requestResult},
110120
})
111121
}

0 commit comments

Comments
 (0)