Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions pkg/kubelet/asyncinvoker/invoker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package asyncinvoker

import (
"context"
"fmt"
"sync"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)

func NewAsyncInvoker[T any](f func() T) *asyncInvoker[T] {
invoker := &asyncInvoker[T]{
f: f,
// a buffer of length 1, so
invoke: make(chan struct{}, 1),
}
return invoker
}

type AsyncInvoker[T any] interface {
Invoke() <-chan Return[T]
}

type Return[T any] struct {
Panicked any
Result T
Latency time.Duration
}

type asyncInvoker[T any] struct {
f func() T

lock sync.Mutex
invoke chan struct{}
waiters []chan Return[T]
}

// Invoke returns a channel
func (i *asyncInvoker[T]) Invoke() <-chan Return[T] {
// the waiter is a buffered channel of length 1, so neither the
// caller, nor the async invoker hangs on each other
waiter := make(chan Return[T], 1)

i.lock.Lock()
defer i.lock.Unlock()

// let's add the caller to the waiting list
i.waiters = append(i.waiters, waiter)
// signal the async invoker that it should invoke the function:
// a) the async invoker has not started yet, and the channel
// is empty, we send to the channel and wait
// b) the async invoker has not started yet, and the channel
// is not empty, we have already added the caller to the waiting list
// c) the async invoker is blocked, waiting to receive on the channel
// d) the async is unblocked, and is in making the call
// e) the async invoker is sending the result to each waiter
//
// since this caller has the lock now, e is impossible
//
select {
case i.invoke <- struct{}{}:
default:
}

return waiter
}

func (i *asyncInvoker[T]) Run(stopCtx context.Context) context.Context {
done, cancel := context.WithCancel(context.Background())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of context.Background(), perhaps pass in the stopCtx.

go func() {
klog.InfoS("AsyncInvoker: start")
defer func() {
klog.InfoS("AsyncInvoker: end")
cancel()
}()

for {
select {
case <-stopCtx.Done():
return
case _, ok := <-i.invoke:
if !ok {
return
}
}

var empty bool
i.lock.Lock()
empty = len(i.waiters) == 0
i.lock.Unlock()
if empty {
continue
}

// while the call is in progress, we allow any new
// caller to add itslef to the waiting list.
ret := Return[T]{}
func() {
defer func() {
if recovered := recover(); recovered != nil {
ret.Panicked = recovered
utilruntime.HandleError(fmt.Errorf("panic from AsyncInvoker Run: %v", recovered))
}
}()

func() {
now := time.Now()
defer func() {
ret.Latency = time.Since(now)
}()
ret.Result = i.f()
}()
}()

// we have just invoked the function, return the result
// to the callers waiting, some callers might have given
// up already,
func() {
i.lock.Lock()
defer i.lock.Unlock()

for _, waiter := range i.waiters {
// this should never block, we created
// this channel with a buffer of 1
waiter <- ret
close(waiter)
}
// reset the slice to zero-length
i.waiters = i.waiters[:0]
}()
}
}()

return done
}
213 changes: 213 additions & 0 deletions pkg/kubelet/asyncinvoker/invoker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package asyncinvoker

import (
"context"
"errors"
"fmt"
"sync/atomic"
"testing"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/server/stats"
)

func TestAsyncInvoker(t *testing.T) {
// we have a slow provider
slow := &slowSummaryProvider{t: t}

// we wrap the slow provider with an async invoker, and pass
// the instance with the async invoker to the client.
// NOTE: this only works because GetCPUAndMemoryStats does not
// accept any request scoped data
asyncInvoker := NewAsyncInvoker[result](func() result {
summary, err := slow.GetCPUAndMemoryStats(context.TODO())
return result{summary: summary, err: err}
})
async := &asyncSummaryProvider{ResourceAnalyzer: slow, async: asyncInvoker}

// run the async invoker
stopCtx, cancel := context.WithCancel(context.Background())
defer cancel()
exit := asyncInvoker.Run(stopCtx)

t.Run("serial callers", func(t *testing.T) {
slow.invoked.Swap(0)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// the slow provider should return the following result once invoked
// and the call should not block at all
ch := make(chan struct{})
close(ch)
slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch}

for i := 1; i <= 10; i++ {
got, err := async.GetCPUAndMemoryStats(ctx)
if err != nil {
t.Errorf("expected no error, but got: %v", err)
}
if want := slow.r.want; want != got {
t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got)
}
}

if want, got := 10, int(slow.invoked.Load()); want != got {
t.Errorf("expected the invoke count to be %d, but got: %d", want, got)
}
})

t.Run("call in progress", func(t *testing.T) {
// reset the invoke count
slow.invoked.Swap(0)

// the slow provider should return the following result once invoked
// and the call is taking longer
ch := make(chan struct{})
slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch}

firstDone := make(chan struct{})
go func() {
defer close(firstDone)
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

_, err := async.GetCPUAndMemoryStats(ctx)
if want := context.DeadlineExceeded; !errors.Is(err, want) {
t.Errorf("expected error: %v, but got: %v", want, err)
}
}()

// wait for the first caller to time out
<-firstDone
t.Logf("first caller has timed out")
// the slow call should still be in progress
if want, got := 1, int(slow.progress.Load()); want != got {
t.Fatalf("expected the call to be in progress: %d, but got: %d", want, got)
}

// fire off a second call
secondDone := make(chan struct{})
go func() {
defer close(secondDone)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()

t.Logf("second caller making a call")
got, err := async.GetCPUAndMemoryStats(ctx)
if err != nil {
t.Errorf("expected no error, but got: %v", err)
}
if want := slow.r.want; want != got {
t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got)
}
}()

// unblock the second call, after some wait
<-time.After(100 * time.Millisecond)
t.Logf("unblocking the slow provider")
close(ch)

<-secondDone
// we expect the call in progress to have finished
if want, got := 0, int(slow.progress.Load()); want != got {
t.Errorf("did not expect the call to be in progress: %d, but got: %d", want, got)
}
if want, got := 1, int(slow.invoked.Load()); want != got {
t.Errorf("expected the call to be invoked: %d, but got: %d", want, got)
}

// a new call should return immediately
ch = make(chan struct{})
close(ch)
slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

got, err := async.GetCPUAndMemoryStats(ctx)
if err != nil {
t.Errorf("expected no error, but got: %v", err)
}
if want := slow.r.want; want != got {
t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got)
}

if want, got := 2, int(slow.invoked.Load()); want != got {
t.Errorf("expected the call to be invoked: %d, but got: %d", want, got)
}
})

t.Run("async runner exits gracefully", func(t *testing.T) {
cancel()

select {
case <-exit.Done():
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("expected the async invoker to exit gracefully")
}
})
}

type result struct {
summary *statsapi.Summary
err error
}

type asyncSummaryProvider struct {
stats.ResourceAnalyzer
async AsyncInvoker[result]
}

func (p *asyncSummaryProvider) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) {
wait := p.async.Invoke()
select {
case ret, ok := <-wait:
if ok {
if ret.Panicked != nil {
panic(ret.Panicked)
}
return ret.Result.summary, ret.Result.err
}
return nil, fmt.Errorf("we should never be here")
case <-ctx.Done():
return nil, ctx.Err()
}
}

type slowProviderReturn struct {
// the test can be notified when the call starts and ends
blocked <-chan struct{}
want *statsapi.Summary
}

type slowSummaryProvider struct {
t *testing.T
invoked, progress atomic.Int32
r slowProviderReturn
}

func (slow *slowSummaryProvider) GetCPUAndMemoryStats(_ context.Context) (*statsapi.Summary, error) {
slow.invoked.Add(1)
// we never expect this call to be made concurrent
slow.progress.Add(1)
defer func() {
slow.progress.Add(-1)
}()
// it blocks indefinitely, until the test writes to this channel
now := time.Now()
<-slow.r.blocked
slow.t.Logf("slept for: %s", time.Since(now))
return slow.r.want, nil
}

func (slow *slowSummaryProvider) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) {
return &statsapi.Summary{}, nil
}

func (slow *slowSummaryProvider) Start() {}
func (slow *slowSummaryProvider) GetPodVolumeStats(uid types.UID) (stats.PodVolumeStats, bool) {
return stats.PodVolumeStats{}, false
}
1 change: 1 addition & 0 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.allocationManager = allocation.NewManager(klet.getRootDir())

klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
klet.resourceAnalyzer = NewAsyncInvokerForGetCPUAndMemoryStats(ctx, klet.resourceAnalyzer)

klet.runtimeService = kubeDeps.RemoteRuntimeService

Expand Down
Loading