forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 126
[WIP] UPSTREAM: <carry>: add http logging for kubelet metrics endpoint #2396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
tkashem
wants to merge
2
commits into
openshift:master
Choose a base branch
from
tkashem:kubelet-metrics-logger
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
package asyncinvoker | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
"time" | ||
|
||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/klog/v2" | ||
) | ||
|
||
func NewAsyncInvoker[T any](f func() T) *asyncInvoker[T] { | ||
invoker := &asyncInvoker[T]{ | ||
f: f, | ||
// a buffer of length 1, so | ||
invoke: make(chan struct{}, 1), | ||
} | ||
return invoker | ||
} | ||
|
||
type AsyncInvoker[T any] interface { | ||
Invoke() <-chan Return[T] | ||
} | ||
|
||
type Return[T any] struct { | ||
Panicked any | ||
Result T | ||
Latency time.Duration | ||
} | ||
|
||
type asyncInvoker[T any] struct { | ||
f func() T | ||
|
||
lock sync.Mutex | ||
invoke chan struct{} | ||
waiters []chan Return[T] | ||
} | ||
|
||
// Invoke returns a channel | ||
func (i *asyncInvoker[T]) Invoke() <-chan Return[T] { | ||
// the waiter is a buffered channel of length 1, so neither the | ||
// caller, nor the async invoker hangs on each other | ||
waiter := make(chan Return[T], 1) | ||
|
||
i.lock.Lock() | ||
defer i.lock.Unlock() | ||
|
||
// let's add the caller to the waiting list | ||
i.waiters = append(i.waiters, waiter) | ||
// signal the async invoker that it should invoke the function: | ||
// a) the async invoker has not started yet, and the channel | ||
// is empty, we send to the channel and wait | ||
// b) the async invoker has not started yet, and the channel | ||
// is not empty, we have already added the caller to the waiting list | ||
// c) the async invoker is blocked, waiting to receive on the channel | ||
// d) the async is unblocked, and is in making the call | ||
// e) the async invoker is sending the result to each waiter | ||
// | ||
// since this caller has the lock now, e is impossible | ||
// | ||
select { | ||
case i.invoke <- struct{}{}: | ||
default: | ||
} | ||
|
||
return waiter | ||
} | ||
|
||
func (i *asyncInvoker[T]) Run(stopCtx context.Context) context.Context { | ||
done, cancel := context.WithCancel(context.Background()) | ||
go func() { | ||
klog.InfoS("AsyncInvoker: start") | ||
defer func() { | ||
klog.InfoS("AsyncInvoker: end") | ||
cancel() | ||
}() | ||
|
||
for { | ||
select { | ||
case <-stopCtx.Done(): | ||
return | ||
case _, ok := <-i.invoke: | ||
if !ok { | ||
return | ||
} | ||
} | ||
|
||
var empty bool | ||
i.lock.Lock() | ||
empty = len(i.waiters) == 0 | ||
i.lock.Unlock() | ||
if empty { | ||
continue | ||
} | ||
|
||
// while the call is in progress, we allow any new | ||
// caller to add itslef to the waiting list. | ||
ret := Return[T]{} | ||
func() { | ||
defer func() { | ||
if recovered := recover(); recovered != nil { | ||
ret.Panicked = recovered | ||
utilruntime.HandleError(fmt.Errorf("panic from AsyncInvoker Run: %v", recovered)) | ||
} | ||
}() | ||
|
||
func() { | ||
now := time.Now() | ||
defer func() { | ||
ret.Latency = time.Since(now) | ||
}() | ||
ret.Result = i.f() | ||
}() | ||
}() | ||
|
||
// we have just invoked the function, return the result | ||
// to the callers waiting, some callers might have given | ||
// up already, | ||
func() { | ||
i.lock.Lock() | ||
defer i.lock.Unlock() | ||
|
||
for _, waiter := range i.waiters { | ||
// this should never block, we created | ||
// this channel with a buffer of 1 | ||
waiter <- ret | ||
close(waiter) | ||
} | ||
// reset the slice to zero-length | ||
i.waiters = i.waiters[:0] | ||
}() | ||
} | ||
}() | ||
|
||
return done | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,213 @@ | ||
package asyncinvoker | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/types" | ||
"k8s.io/apimachinery/pkg/util/wait" | ||
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" | ||
"k8s.io/kubernetes/pkg/kubelet/server/stats" | ||
) | ||
|
||
func TestAsyncInvoker(t *testing.T) { | ||
// we have a slow provider | ||
slow := &slowSummaryProvider{t: t} | ||
|
||
// we wrap the slow provider with an async invoker, and pass | ||
// the instance with the async invoker to the client. | ||
// NOTE: this only works because GetCPUAndMemoryStats does not | ||
// accept any request scoped data | ||
asyncInvoker := NewAsyncInvoker[result](func() result { | ||
summary, err := slow.GetCPUAndMemoryStats(context.TODO()) | ||
return result{summary: summary, err: err} | ||
}) | ||
async := &asyncSummaryProvider{ResourceAnalyzer: slow, async: asyncInvoker} | ||
|
||
// run the async invoker | ||
stopCtx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
exit := asyncInvoker.Run(stopCtx) | ||
|
||
t.Run("serial callers", func(t *testing.T) { | ||
slow.invoked.Swap(0) | ||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancel() | ||
|
||
// the slow provider should return the following result once invoked | ||
// and the call should not block at all | ||
ch := make(chan struct{}) | ||
close(ch) | ||
slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch} | ||
|
||
for i := 1; i <= 10; i++ { | ||
got, err := async.GetCPUAndMemoryStats(ctx) | ||
if err != nil { | ||
t.Errorf("expected no error, but got: %v", err) | ||
} | ||
if want := slow.r.want; want != got { | ||
t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got) | ||
} | ||
} | ||
|
||
if want, got := 10, int(slow.invoked.Load()); want != got { | ||
t.Errorf("expected the invoke count to be %d, but got: %d", want, got) | ||
} | ||
}) | ||
|
||
t.Run("call in progress", func(t *testing.T) { | ||
// reset the invoke count | ||
slow.invoked.Swap(0) | ||
|
||
// the slow provider should return the following result once invoked | ||
// and the call is taking longer | ||
ch := make(chan struct{}) | ||
slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch} | ||
|
||
firstDone := make(chan struct{}) | ||
go func() { | ||
defer close(firstDone) | ||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) | ||
defer cancel() | ||
|
||
_, err := async.GetCPUAndMemoryStats(ctx) | ||
if want := context.DeadlineExceeded; !errors.Is(err, want) { | ||
t.Errorf("expected error: %v, but got: %v", want, err) | ||
} | ||
}() | ||
|
||
// wait for the first caller to time out | ||
<-firstDone | ||
t.Logf("first caller has timed out") | ||
// the slow call should still be in progress | ||
if want, got := 1, int(slow.progress.Load()); want != got { | ||
t.Fatalf("expected the call to be in progress: %d, but got: %d", want, got) | ||
} | ||
|
||
// fire off a second call | ||
secondDone := make(chan struct{}) | ||
go func() { | ||
defer close(secondDone) | ||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) | ||
defer cancel() | ||
|
||
t.Logf("second caller making a call") | ||
got, err := async.GetCPUAndMemoryStats(ctx) | ||
if err != nil { | ||
t.Errorf("expected no error, but got: %v", err) | ||
} | ||
if want := slow.r.want; want != got { | ||
t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got) | ||
} | ||
}() | ||
|
||
// unblock the second call, after some wait | ||
<-time.After(100 * time.Millisecond) | ||
t.Logf("unblocking the slow provider") | ||
close(ch) | ||
|
||
<-secondDone | ||
// we expect the call in progress to have finished | ||
if want, got := 0, int(slow.progress.Load()); want != got { | ||
t.Errorf("did not expect the call to be in progress: %d, but got: %d", want, got) | ||
} | ||
if want, got := 1, int(slow.invoked.Load()); want != got { | ||
t.Errorf("expected the call to be invoked: %d, but got: %d", want, got) | ||
} | ||
|
||
// a new call should return immediately | ||
ch = make(chan struct{}) | ||
close(ch) | ||
slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch} | ||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) | ||
defer cancel() | ||
|
||
got, err := async.GetCPUAndMemoryStats(ctx) | ||
if err != nil { | ||
t.Errorf("expected no error, but got: %v", err) | ||
} | ||
if want := slow.r.want; want != got { | ||
t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got) | ||
} | ||
|
||
if want, got := 2, int(slow.invoked.Load()); want != got { | ||
t.Errorf("expected the call to be invoked: %d, but got: %d", want, got) | ||
} | ||
}) | ||
|
||
t.Run("async runner exits gracefully", func(t *testing.T) { | ||
cancel() | ||
|
||
select { | ||
case <-exit.Done(): | ||
case <-time.After(wait.ForeverTestTimeout): | ||
t.Errorf("expected the async invoker to exit gracefully") | ||
} | ||
}) | ||
} | ||
|
||
type result struct { | ||
summary *statsapi.Summary | ||
err error | ||
} | ||
|
||
type asyncSummaryProvider struct { | ||
stats.ResourceAnalyzer | ||
async AsyncInvoker[result] | ||
} | ||
|
||
func (p *asyncSummaryProvider) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) { | ||
wait := p.async.Invoke() | ||
select { | ||
case ret, ok := <-wait: | ||
if ok { | ||
if ret.Panicked != nil { | ||
panic(ret.Panicked) | ||
} | ||
return ret.Result.summary, ret.Result.err | ||
} | ||
return nil, fmt.Errorf("we should never be here") | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
} | ||
} | ||
|
||
type slowProviderReturn struct { | ||
// the test can be notified when the call starts and ends | ||
blocked <-chan struct{} | ||
want *statsapi.Summary | ||
} | ||
|
||
type slowSummaryProvider struct { | ||
t *testing.T | ||
invoked, progress atomic.Int32 | ||
r slowProviderReturn | ||
} | ||
|
||
func (slow *slowSummaryProvider) GetCPUAndMemoryStats(_ context.Context) (*statsapi.Summary, error) { | ||
slow.invoked.Add(1) | ||
// we never expect this call to be made concurrent | ||
slow.progress.Add(1) | ||
defer func() { | ||
slow.progress.Add(-1) | ||
}() | ||
// it blocks indefinitely, until the test writes to this channel | ||
now := time.Now() | ||
<-slow.r.blocked | ||
slow.t.Logf("slept for: %s", time.Since(now)) | ||
return slow.r.want, nil | ||
} | ||
|
||
func (slow *slowSummaryProvider) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) { | ||
return &statsapi.Summary{}, nil | ||
} | ||
|
||
func (slow *slowSummaryProvider) Start() {} | ||
func (slow *slowSummaryProvider) GetPodVolumeStats(uid types.UID) (stats.PodVolumeStats, bool) { | ||
return stats.PodVolumeStats{}, false | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of context.Background(), perhaps pass in the stopCtx.