Skip to content

Commit 5826868

Browse files
authored
Wire context to logs command and add interrupt handler (kubernetes#127503)
* Wire context to logs command and add interrupt handler * Move conditional outside of interrupt handler
1 parent 1caf9a1 commit 5826868

File tree

4 files changed

+29
-21
lines changed

4 files changed

+29
-21
lines changed

staging/src/k8s.io/kubectl/pkg/cmd/debug/debug.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -945,12 +945,12 @@ func (o *DebugOptions) handleAttachPod(ctx context.Context, restClientGetter gen
945945
}
946946
if status.State.Terminated != nil {
947947
klog.V(1).Info("Ephemeral container terminated, falling back to logs")
948-
return logOpts(restClientGetter, pod, opts)
948+
return logOpts(ctx, restClientGetter, pod, opts)
949949
}
950950

951951
if err := opts.Run(); err != nil {
952952
fmt.Fprintf(opts.ErrOut, "warning: couldn't attach to pod/%s, falling back to streaming logs: %v\n", podName, err)
953-
return logOpts(restClientGetter, pod, opts)
953+
return logOpts(ctx, restClientGetter, pod, opts)
954954
}
955955
return nil
956956
}
@@ -968,7 +968,7 @@ func getContainerStatusByName(pod *corev1.Pod, containerName string) *corev1.Con
968968
}
969969

970970
// logOpts logs output from opts to the pods log.
971-
func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error {
971+
func logOpts(ctx context.Context, restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Pod, opts *attach.AttachOptions) error {
972972
ctrName, err := opts.GetContainerName(pod)
973973
if err != nil {
974974
return err
@@ -979,7 +979,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po
979979
return err
980980
}
981981
for _, request := range requests {
982-
if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil {
982+
if err := logs.DefaultConsumeRequest(ctx, request, opts.Out); err != nil {
983983
return err
984984
}
985985
}

staging/src/k8s.io/kubectl/pkg/cmd/logs/logs.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"k8s.io/kubectl/pkg/util"
4242
"k8s.io/kubectl/pkg/util/completion"
4343
"k8s.io/kubectl/pkg/util/i18n"
44+
"k8s.io/kubectl/pkg/util/interrupt"
4445
"k8s.io/kubectl/pkg/util/templates"
4546
)
4647

@@ -124,7 +125,7 @@ type LogsOptions struct {
124125
Options runtime.Object
125126
Resources []string
126127

127-
ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error
128+
ConsumeRequestFn func(context.Context, rest.ResponseWrapper, io.Writer) error
128129

129130
// PodLogOptions
130131
SinceTime string
@@ -375,22 +376,29 @@ func (o LogsOptions) RunLogs() error {
375376
len(requests), o.MaxFollowConcurrency,
376377
)
377378
}
378-
379-
return o.parallelConsumeRequest(requests)
380379
}
381380

382-
return o.sequentialConsumeRequest(requests)
381+
ctx, cancel := context.WithCancel(context.Background())
382+
defer cancel()
383+
intr := interrupt.New(nil, cancel)
384+
return intr.Run(func() error {
385+
if o.Follow && len(requests) > 1 {
386+
return o.parallelConsumeRequest(ctx, requests)
387+
}
388+
389+
return o.sequentialConsumeRequest(ctx, requests)
390+
})
383391
}
384392

385-
func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
393+
func (o LogsOptions) parallelConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
386394
reader, writer := io.Pipe()
387395
wg := &sync.WaitGroup{}
388396
wg.Add(len(requests))
389397
for objRef, request := range requests {
390398
go func(objRef corev1.ObjectReference, request rest.ResponseWrapper) {
391399
defer wg.Done()
392400
out := o.addPrefixIfNeeded(objRef, writer)
393-
if err := o.ConsumeRequestFn(request, out); err != nil {
401+
if err := o.ConsumeRequestFn(ctx, request, out); err != nil {
394402
if !o.IgnoreLogErrors {
395403
writer.CloseWithError(err)
396404

@@ -413,10 +421,10 @@ func (o LogsOptions) parallelConsumeRequest(requests map[corev1.ObjectReference]
413421
return err
414422
}
415423

416-
func (o LogsOptions) sequentialConsumeRequest(requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
424+
func (o LogsOptions) sequentialConsumeRequest(ctx context.Context, requests map[corev1.ObjectReference]rest.ResponseWrapper) error {
417425
for objRef, request := range requests {
418426
out := o.addPrefixIfNeeded(objRef, o.Out)
419-
if err := o.ConsumeRequestFn(request, out); err != nil {
427+
if err := o.ConsumeRequestFn(ctx, request, out); err != nil {
420428
if !o.IgnoreLogErrors {
421429
return err
422430
}
@@ -457,8 +465,8 @@ func (o LogsOptions) addPrefixIfNeeded(ref corev1.ObjectReference, writer io.Wri
457465
// A successful read returns err == nil, not err == io.EOF.
458466
// Because the function is defined to read from request until io.EOF, it does
459467
// not treat an io.EOF as an error to be reported.
460-
func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
461-
readCloser, err := request.Stream(context.TODO())
468+
func DefaultConsumeRequest(ctx context.Context, request rest.ResponseWrapper, out io.Writer) error {
469+
readCloser, err := request.Stream(ctx)
462470
if err != nil {
463471
return err
464472
}

staging/src/k8s.io/kubectl/pkg/cmd/logs/logs_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func TestLog(t *testing.T) {
304304

305305
o := NewLogsOptions(streams)
306306
o.LogsForObject = mock.mockLogsForObject
307-
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
307+
o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
308308
return errors.New("Error from the ConsumeRequestFn")
309309
}
310310
return o
@@ -378,7 +378,7 @@ func TestLog(t *testing.T) {
378378

379379
o := NewLogsOptions(streams)
380380
o.LogsForObject = mock.mockLogsForObject
381-
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
381+
o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
382382
return errors.New("Error from the ConsumeRequestFn")
383383
}
384384
o.Follow = true
@@ -401,7 +401,7 @@ func TestLog(t *testing.T) {
401401

402402
o := NewLogsOptions(streams)
403403
o.LogsForObject = mock.mockLogsForObject
404-
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
404+
o.ConsumeRequestFn = func(ctx context.Context, req restclient.ResponseWrapper, out io.Writer) error {
405405
return errors.New("Error from the ConsumeRequestFn")
406406
}
407407
o.Follow = true
@@ -808,7 +808,7 @@ func TestDefaultConsumeRequest(t *testing.T) {
808808
for _, test := range tests {
809809
t.Run(test.name, func(t *testing.T) {
810810
buf := &bytes.Buffer{}
811-
err := DefaultConsumeRequest(test.request, buf)
811+
err := DefaultConsumeRequest(context.TODO(), test.request, buf)
812812

813813
if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
814814
t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
@@ -932,8 +932,8 @@ type logTestMock struct {
932932
wg *sync.WaitGroup
933933
}
934934

935-
func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error {
936-
readCloser, err := request.Stream(context.Background())
935+
func (l *logTestMock) mockConsumeRequest(ctx context.Context, request restclient.ResponseWrapper, out io.Writer) error {
936+
readCloser, err := request.Stream(ctx)
937937
if err != nil {
938938
return err
939939
}

staging/src/k8s.io/kubectl/pkg/cmd/run/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ func logOpts(restClientGetter genericclioptions.RESTClientGetter, pod *corev1.Po
504504
return err
505505
}
506506
for _, request := range requests {
507-
if err := logs.DefaultConsumeRequest(request, opts.Out); err != nil {
507+
if err := logs.DefaultConsumeRequest(context.Background(), request, opts.Out); err != nil {
508508
return err
509509
}
510510
}

0 commit comments

Comments
 (0)