From 6127a7cf4432ece3d8a1fe75115a327c4b8d409f Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 4 Aug 2025 16:36:27 -0400 Subject: [PATCH 1/2] UPSTREAM: : add http logging for kubelet metrics endpoint --- .../collectors/resource_metrics_test.go | 113 +++++++++++++++ pkg/kubelet/server/server.go | 13 +- pkg/kubelet/server/server_patch.go | 135 ++++++++++++++++++ 3 files changed, 256 insertions(+), 5 deletions(-) create mode 100644 pkg/kubelet/server/server_patch.go diff --git a/pkg/kubelet/metrics/collectors/resource_metrics_test.go b/pkg/kubelet/metrics/collectors/resource_metrics_test.go index 29ab3f3f3093e..c8ef3f53bfe4b 100644 --- a/pkg/kubelet/metrics/collectors/resource_metrics_test.go +++ b/pkg/kubelet/metrics/collectors/resource_metrics_test.go @@ -19,11 +19,15 @@ package collectors import ( "context" "fmt" + "io" + "net/http" + "net/http/httptest" "strings" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + compbasemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/testutil" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" summaryprovidertest "k8s.io/kubernetes/pkg/kubelet/server/stats/testing" @@ -419,3 +423,112 @@ func TestCollectResourceMetrics(t *testing.T) { func uint64Ptr(u uint64) *uint64 { return &u } + +type fakeSummaryProvider struct { + stats *statsapi.Summary +} + +func (p *fakeSummaryProvider) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) { + return nil, fmt.Errorf("should not be invoked") +} + +func (p *fakeSummaryProvider) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) { + return p.stats, nil +} + +func createSummary() *statsapi.Summary { + staticTimestamp := time.Unix(0, 1624396278302091597) + testTime := metav1.NewTime(staticTimestamp) + + pods := make([]statsapi.PodStats, 0) + for i := 0; i < 500; i++ { + podstat := statsapi.PodStats{ + PodRef: statsapi.PodReference{ + Name: fmt.Sprintf("pod_%d", i), + Namespace: fmt.Sprintf("namespace_%d", i), + }, + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + Swap: &statsapi.SwapStats{ + Time: testTime, + SwapUsageBytes: uint64Ptr(5000), + }, + } + for j := 0; j < 10; j++ { + podstat.Containers = append(podstat.Containers, statsapi.ContainerStats{ + Name: fmt.Sprintf("container_%d", j), + StartTime: metav1.NewTime(staticTimestamp.Add(-30 * time.Second)), + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + Swap: &statsapi.SwapStats{ + Time: testTime, + SwapUsageBytes: uint64Ptr(1000), + }, + }) + } + pods = append(pods, podstat) + } + + return &statsapi.Summary{ + Node: statsapi.NodeStats{ + CPU: &statsapi.CPUStats{ + Time: testTime, + UsageCoreNanoSeconds: uint64Ptr(10000000000), + }, + Memory: &statsapi.MemoryStats{ + Time: testTime, + WorkingSetBytes: uint64Ptr(1000), + }, + Swap: &statsapi.SwapStats{ + Time: testTime, + SwapUsageBytes: uint64Ptr(500), + }, + }, + Pods: pods, + } +} + +func TestResourceMetricsCollector(t *testing.T) { + provider := &fakeSummaryProvider{stats: createSummary()} + collector := NewResourceMetricsCollector(provider) + + registry := compbasemetrics.NewKubeRegistry() + registry.CustomMustRegister(collector) + + handler := compbasemetrics.HandlerFor(registry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}) + // handler = http.HandlerFunc(WithHTTPLogging(handler)) + + server := httptest.NewUnstartedServer(handler) + defer server.Close() + server.EnableHTTP2 = false + server.StartTLS() + + client := server.Client() + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, server.URL, nil) + if err != nil { + t.Errorf("failed to create a new http request - %v", err) + return + } + + resp, err := client.Do(req) + if err != nil { + t.Fatalf("unexpected error from client.Do - %v", err) + } + bytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Errorf("unexpected error while reading the response body - %v", err) + } + t.Logf("%v", len(bytes)/1024) +} diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 9ca7f86d9adad..cb03614fde985 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -474,15 +474,18 @@ func (s *Server) InstallAuthNotRequiredHandlers() { r.RawMustRegister(metrics.NewPrometheusCollector(prometheusHostAdapter{s.host}, containerPrometheusLabelsFunc(s.host), includedMetrics, clock.RealClock{}, cadvisorOpts)) } s.restfulCont.Handle(cadvisorMetricsPath, - compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}), - ) + WithHTTPLogging( + compbasemetrics.HandlerFor(r, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}), + )) s.addMetricsBucketMatcher("metrics/resource") resourceRegistry := compbasemetrics.NewKubeRegistry() - resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(s.resourceAnalyzer)) + analyzer := &SummaryProviderTracker{ResourceAnalyzer: s.resourceAnalyzer} + resourceRegistry.CustomMustRegister(collectors.NewResourceMetricsCollector(analyzer)) s.restfulCont.Handle(resourceMetricsPath, - compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}), - ) + WithHTTPLogging( + compbasemetrics.HandlerFor(resourceRegistry, compbasemetrics.HandlerOpts{ErrorHandling: compbasemetrics.ContinueOnError}), + )) // prober metrics are exposed under a different endpoint diff --git a/pkg/kubelet/server/server_patch.go b/pkg/kubelet/server/server_patch.go new file mode 100644 index 0000000000000..e5c027cb71095 --- /dev/null +++ b/pkg/kubelet/server/server_patch.go @@ -0,0 +1,135 @@ +package server + +import ( + "context" + "fmt" + "net/http" + "time" + + "k8s.io/apiserver/pkg/audit" + "k8s.io/apiserver/pkg/endpoints/responsewriter" + "k8s.io/klog/v2" + statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/server/stats" + + "github.com/google/uuid" +) + +func WithHTTPLogging(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + logger := newHTTPLogger(w, req) + defer logger.log() + + w = responsewriter.WrapForHTTP1Or2(logger) + handler.ServeHTTP(w, req) + }) +} + +func newHTTPLogger(w http.ResponseWriter, req *http.Request) *httpLogger { + auditID := audit.GetAuditIDTruncated(req.Context()) + if len(auditID) == 0 { + auditID = uuid.New().String() + } + return &httpLogger{ + w: w, + startedAt: time.Now(), + method: req.Method, + requestURI: req.RequestURI, + auditID: auditID, + userAgent: req.UserAgent(), + srcIP: req.RemoteAddr, + } +} + +type httpLogger struct { + w http.ResponseWriter + + method string + requestURI string + auditID string + userAgent string + srcIP string + + startedAt time.Time + writeLatency time.Duration + flushLatency time.Duration + writeBytes int + statusRecorded bool + statusCode int +} + +var _ http.ResponseWriter = &httpLogger{} +var _ responsewriter.UserProvidedDecorator = &httpLogger{} + +func (l *httpLogger) Unwrap() http.ResponseWriter { + return l.w +} + +// Header implements http.ResponseWriter. +func (l *httpLogger) Header() http.Header { + return l.w.Header() +} + +// Write implements http.ResponseWriter. +func (l *httpLogger) Write(b []byte) (int, error) { + if !l.statusRecorded { + l.record(http.StatusOK) // Default if WriteHeader hasn't been called + } + now := time.Now() + var written int + defer func() { + l.writeLatency += time.Since(now) + l.writeBytes += written + }() + written, err := l.w.Write(b) + return written, err +} + +func (l *httpLogger) Flush() { + now := time.Now() + defer func() { + l.flushLatency += time.Since(now) + }() + l.w.(http.Flusher).Flush() +} + +// WriteHeader implements http.ResponseWriter. +func (l *httpLogger) WriteHeader(status int) { + l.record(status) + l.w.WriteHeader(status) +} + +func (l *httpLogger) record(status int) { + l.statusCode = status + l.statusRecorded = true +} + +func (l *httpLogger) log() { + latency := time.Since(l.startedAt) + kvs := []interface{}{ + "startedAt", l.startedAt, + "method", l.method, + "URI", l.requestURI, + "latency", latency, + "userAgent", l.userAgent, + "audit-ID", l.auditID, + "srcIP", l.srcIP, + "status", l.statusCode, + "writeLatency", l.writeLatency, + "writtenBytes", fmt.Sprintf("%dK", l.writeBytes/1024), + "flushLatency", l.flushLatency, + } + klog.V(1).InfoSDepth(1, "HTTP", kvs...) +} + +type SummaryProviderTracker struct { + stats.ResourceAnalyzer +} + +func (t *SummaryProviderTracker) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) { + now := time.Now() + defer func() { + klog.InfoS("GetCPUAndMemoryStats", "latency", time.Since(now)) + }() + return t.ResourceAnalyzer.GetCPUAndMemoryStats(ctx) +} From 487cf513cbffa2734cbc118fd46ba3d78c5bdbca Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 11 Aug 2025 15:52:26 -0400 Subject: [PATCH 2/2] make the stat gathering async --- pkg/kubelet/asyncinvoker/invoker.go | 137 +++++++++++++++ pkg/kubelet/asyncinvoker/invoker_test.go | 213 +++++++++++++++++++++++ pkg/kubelet/kubelet.go | 1 + pkg/kubelet/kubelet_patch.go | 59 +++++++ pkg/kubelet/server/server_patch.go | 6 +- 5 files changed, 415 insertions(+), 1 deletion(-) create mode 100644 pkg/kubelet/asyncinvoker/invoker.go create mode 100644 pkg/kubelet/asyncinvoker/invoker_test.go create mode 100644 pkg/kubelet/kubelet_patch.go diff --git a/pkg/kubelet/asyncinvoker/invoker.go b/pkg/kubelet/asyncinvoker/invoker.go new file mode 100644 index 0000000000000..38a391d49bec4 --- /dev/null +++ b/pkg/kubelet/asyncinvoker/invoker.go @@ -0,0 +1,137 @@ +package asyncinvoker + +import ( + "context" + "fmt" + "sync" + "time" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" +) + +func NewAsyncInvoker[T any](f func() T) *asyncInvoker[T] { + invoker := &asyncInvoker[T]{ + f: f, + // a buffer of length 1, so + invoke: make(chan struct{}, 1), + } + return invoker +} + +type AsyncInvoker[T any] interface { + Invoke() <-chan Return[T] +} + +type Return[T any] struct { + Panicked any + Result T + Latency time.Duration +} + +type asyncInvoker[T any] struct { + f func() T + + lock sync.Mutex + invoke chan struct{} + waiters []chan Return[T] +} + +// Invoke returns a channel +func (i *asyncInvoker[T]) Invoke() <-chan Return[T] { + // the waiter is a buffered channel of length 1, so neither the + // caller, nor the async invoker hangs on each other + waiter := make(chan Return[T], 1) + + i.lock.Lock() + defer i.lock.Unlock() + + // let's add the caller to the waiting list + i.waiters = append(i.waiters, waiter) + // signal the async invoker that it should invoke the function: + // a) the async invoker has not started yet, and the channel + // is empty, we send to the channel and wait + // b) the async invoker has not started yet, and the channel + // is not empty, we have already added the caller to the waiting list + // c) the async invoker is blocked, waiting to receive on the channel + // d) the async is unblocked, and is in making the call + // e) the async invoker is sending the result to each waiter + // + // since this caller has the lock now, e is impossible + // + select { + case i.invoke <- struct{}{}: + default: + } + + return waiter +} + +func (i *asyncInvoker[T]) Run(stopCtx context.Context) context.Context { + done, cancel := context.WithCancel(context.Background()) + go func() { + klog.InfoS("AsyncInvoker: start") + defer func() { + klog.InfoS("AsyncInvoker: end") + cancel() + }() + + for { + select { + case <-stopCtx.Done(): + return + case _, ok := <-i.invoke: + if !ok { + return + } + } + + var empty bool + i.lock.Lock() + empty = len(i.waiters) == 0 + i.lock.Unlock() + if empty { + continue + } + + // while the call is in progress, we allow any new + // caller to add itslef to the waiting list. + ret := Return[T]{} + func() { + defer func() { + if recovered := recover(); recovered != nil { + ret.Panicked = recovered + utilruntime.HandleError(fmt.Errorf("panic from AsyncInvoker Run: %v", recovered)) + } + }() + + func() { + now := time.Now() + defer func() { + ret.Latency = time.Since(now) + }() + ret.Result = i.f() + }() + }() + + // we have just invoked the function, return the result + // to the callers waiting, some callers might have given + // up already, + func() { + i.lock.Lock() + defer i.lock.Unlock() + + for _, waiter := range i.waiters { + // this should never block, we created + // this channel with a buffer of 1 + waiter <- ret + close(waiter) + } + // reset the slice to zero-length + i.waiters = i.waiters[:0] + }() + } + }() + + return done +} diff --git a/pkg/kubelet/asyncinvoker/invoker_test.go b/pkg/kubelet/asyncinvoker/invoker_test.go new file mode 100644 index 0000000000000..2f3d50604f2a3 --- /dev/null +++ b/pkg/kubelet/asyncinvoker/invoker_test.go @@ -0,0 +1,213 @@ +package asyncinvoker + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "testing" + "time" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/server/stats" +) + +func TestAsyncInvoker(t *testing.T) { + // we have a slow provider + slow := &slowSummaryProvider{t: t} + + // we wrap the slow provider with an async invoker, and pass + // the instance with the async invoker to the client. + // NOTE: this only works because GetCPUAndMemoryStats does not + // accept any request scoped data + asyncInvoker := NewAsyncInvoker[result](func() result { + summary, err := slow.GetCPUAndMemoryStats(context.TODO()) + return result{summary: summary, err: err} + }) + async := &asyncSummaryProvider{ResourceAnalyzer: slow, async: asyncInvoker} + + // run the async invoker + stopCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + exit := asyncInvoker.Run(stopCtx) + + t.Run("serial callers", func(t *testing.T) { + slow.invoked.Swap(0) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // the slow provider should return the following result once invoked + // and the call should not block at all + ch := make(chan struct{}) + close(ch) + slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch} + + for i := 1; i <= 10; i++ { + got, err := async.GetCPUAndMemoryStats(ctx) + if err != nil { + t.Errorf("expected no error, but got: %v", err) + } + if want := slow.r.want; want != got { + t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got) + } + } + + if want, got := 10, int(slow.invoked.Load()); want != got { + t.Errorf("expected the invoke count to be %d, but got: %d", want, got) + } + }) + + t.Run("call in progress", func(t *testing.T) { + // reset the invoke count + slow.invoked.Swap(0) + + // the slow provider should return the following result once invoked + // and the call is taking longer + ch := make(chan struct{}) + slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch} + + firstDone := make(chan struct{}) + go func() { + defer close(firstDone) + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + _, err := async.GetCPUAndMemoryStats(ctx) + if want := context.DeadlineExceeded; !errors.Is(err, want) { + t.Errorf("expected error: %v, but got: %v", want, err) + } + }() + + // wait for the first caller to time out + <-firstDone + t.Logf("first caller has timed out") + // the slow call should still be in progress + if want, got := 1, int(slow.progress.Load()); want != got { + t.Fatalf("expected the call to be in progress: %d, but got: %d", want, got) + } + + // fire off a second call + secondDone := make(chan struct{}) + go func() { + defer close(secondDone) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + t.Logf("second caller making a call") + got, err := async.GetCPUAndMemoryStats(ctx) + if err != nil { + t.Errorf("expected no error, but got: %v", err) + } + if want := slow.r.want; want != got { + t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got) + } + }() + + // unblock the second call, after some wait + <-time.After(100 * time.Millisecond) + t.Logf("unblocking the slow provider") + close(ch) + + <-secondDone + // we expect the call in progress to have finished + if want, got := 0, int(slow.progress.Load()); want != got { + t.Errorf("did not expect the call to be in progress: %d, but got: %d", want, got) + } + if want, got := 1, int(slow.invoked.Load()); want != got { + t.Errorf("expected the call to be invoked: %d, but got: %d", want, got) + } + + // a new call should return immediately + ch = make(chan struct{}) + close(ch) + slow.r = slowProviderReturn{want: &statsapi.Summary{}, blocked: ch} + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + got, err := async.GetCPUAndMemoryStats(ctx) + if err != nil { + t.Errorf("expected no error, but got: %v", err) + } + if want := slow.r.want; want != got { + t.Errorf("expected the summary returned to be identical, want: %p, but got: %p", want, got) + } + + if want, got := 2, int(slow.invoked.Load()); want != got { + t.Errorf("expected the call to be invoked: %d, but got: %d", want, got) + } + }) + + t.Run("async runner exits gracefully", func(t *testing.T) { + cancel() + + select { + case <-exit.Done(): + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("expected the async invoker to exit gracefully") + } + }) +} + +type result struct { + summary *statsapi.Summary + err error +} + +type asyncSummaryProvider struct { + stats.ResourceAnalyzer + async AsyncInvoker[result] +} + +func (p *asyncSummaryProvider) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) { + wait := p.async.Invoke() + select { + case ret, ok := <-wait: + if ok { + if ret.Panicked != nil { + panic(ret.Panicked) + } + return ret.Result.summary, ret.Result.err + } + return nil, fmt.Errorf("we should never be here") + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +type slowProviderReturn struct { + // the test can be notified when the call starts and ends + blocked <-chan struct{} + want *statsapi.Summary +} + +type slowSummaryProvider struct { + t *testing.T + invoked, progress atomic.Int32 + r slowProviderReturn +} + +func (slow *slowSummaryProvider) GetCPUAndMemoryStats(_ context.Context) (*statsapi.Summary, error) { + slow.invoked.Add(1) + // we never expect this call to be made concurrent + slow.progress.Add(1) + defer func() { + slow.progress.Add(-1) + }() + // it blocks indefinitely, until the test writes to this channel + now := time.Now() + <-slow.r.blocked + slow.t.Logf("slept for: %s", time.Since(now)) + return slow.r.want, nil +} + +func (slow *slowSummaryProvider) Get(ctx context.Context, updateStats bool) (*statsapi.Summary, error) { + return &statsapi.Summary{}, nil +} + +func (slow *slowSummaryProvider) Start() {} +func (slow *slowSummaryProvider) GetPodVolumeStats(uid types.UID) (stats.PodVolumeStats, bool) { + return stats.PodVolumeStats{}, false +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 385179a648ff7..59153b8bef1c9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -712,6 +712,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.allocationManager = allocation.NewManager(klet.getRootDir()) klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) + klet.resourceAnalyzer = NewAsyncInvokerForGetCPUAndMemoryStats(ctx, klet.resourceAnalyzer) klet.runtimeService = kubeDeps.RemoteRuntimeService diff --git a/pkg/kubelet/kubelet_patch.go b/pkg/kubelet/kubelet_patch.go new file mode 100644 index 0000000000000..26181d7173b1e --- /dev/null +++ b/pkg/kubelet/kubelet_patch.go @@ -0,0 +1,59 @@ +package kubelet + +import ( + "context" + "fmt" + "time" + + "k8s.io/klog/v2" + statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/asyncinvoker" + "k8s.io/kubernetes/pkg/kubelet/server/stats" +) + +func NewAsyncInvokerForGetCPUAndMemoryStats(ctx context.Context, slow stats.ResourceAnalyzer) stats.ResourceAnalyzer { + // we wrap the slow provider with an async invoker, and pass + // the instance with the async invoker to the client. + // NOTE: this only works because GetCPUAndMemoryStats does not + // accept any request scoped data + asyncInvoker := asyncinvoker.NewAsyncInvoker[result](func() result { + now := time.Now() + summary, err := slow.GetCPUAndMemoryStats(context.TODO()) + klog.InfoS("slow.GetCPUAndMemoryStats", "latency", time.Since(now)) + return result{summary: summary, err: err} + }) + go asyncInvoker.Run(ctx) + return &asyncSummaryProvider{ResourceAnalyzer: slow, async: asyncInvoker} +} + +type result struct { + summary *statsapi.Summary + err error +} + +type asyncSummaryProvider struct { + stats.ResourceAnalyzer + async asyncinvoker.AsyncInvoker[result] +} + +func (p *asyncSummaryProvider) GetCPUAndMemoryStats(ctx context.Context) (*statsapi.Summary, error) { + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(context.Background(), 7*time.Second) + defer cancel() + } + + wait := p.async.Invoke() + select { + case ret, ok := <-wait: + if ok { + if ret.Panicked != nil { + panic(ret.Panicked) + } + return ret.Result.summary, ret.Result.err + } + return nil, fmt.Errorf("we should never be here") + case <-ctx.Done(): + return nil, fmt.Errorf("did not return within the time allotted - %w", ctx.Err()) + } +} diff --git a/pkg/kubelet/server/server_patch.go b/pkg/kubelet/server/server_patch.go index e5c027cb71095..ced2af89caaf3 100644 --- a/pkg/kubelet/server/server_patch.go +++ b/pkg/kubelet/server/server_patch.go @@ -131,5 +131,9 @@ func (t *SummaryProviderTracker) GetCPUAndMemoryStats(ctx context.Context) (*sta defer func() { klog.InfoS("GetCPUAndMemoryStats", "latency", time.Since(now)) }() - return t.ResourceAnalyzer.GetCPUAndMemoryStats(ctx) + summary, err := t.ResourceAnalyzer.GetCPUAndMemoryStats(ctx) + if err != nil { + klog.Infof("GetCPUAndMemoryStats: err - %v", err) + } + return summary, err }