Skip to content

Commit 6a2ebc9

Browse files
committed
must-gather: Add support for context
This is in preparation of proper signal handling.
1 parent 1f1fbf4 commit 6a2ebc9

File tree

2 files changed

+72
-56
lines changed

2 files changed

+72
-56
lines changed

pkg/cli/admin/mustgather/mustgather.go

Lines changed: 70 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (o *MustGatherOptions) Complete(f kcmdutil.Factory, cmd *cobra.Command, arg
242242
return fmt.Errorf("--run-namespace %s", errStr)
243243
}
244244
}
245-
if err := o.completeImages(); err != nil {
245+
if err := o.completeImages(context.TODO()); err != nil {
246246
return err
247247
}
248248
o.PrinterCreated, err = printers.NewTypeSetter(scheme.Scheme).WrapToPrinter(&printers.NamePrinter{Operation: "created"}, nil)
@@ -257,9 +257,9 @@ func (o *MustGatherOptions) Complete(f kcmdutil.Factory, cmd *cobra.Command, arg
257257
return nil
258258
}
259259

260-
func (o *MustGatherOptions) completeImages() error {
260+
func (o *MustGatherOptions) completeImages(ctx context.Context) error {
261261
for _, imageStream := range o.ImageStreams {
262-
if image, err := o.resolveImageStreamTagString(imageStream); err == nil {
262+
if image, err := o.resolveImageStreamTagString(ctx, imageStream); err == nil {
263263
o.Images = append(o.Images, image)
264264
} else {
265265
return fmt.Errorf("unable to resolve image stream '%v': %v", imageStream, err)
@@ -268,7 +268,7 @@ func (o *MustGatherOptions) completeImages() error {
268268
if len(o.Images) == 0 || o.AllImages {
269269
var image string
270270
var err error
271-
if image, err = o.resolveImageStreamTag("openshift", "must-gather", "latest"); err != nil {
271+
if image, err = o.resolveImageStreamTag(ctx, "openshift", "must-gather", "latest"); err != nil {
272272
o.log("%v\n", err)
273273
image = "registry.redhat.io/openshift4/ose-must-gather:latest"
274274
}
@@ -279,12 +279,12 @@ func (o *MustGatherOptions) completeImages() error {
279279
pluginImages := make(map[string]struct{})
280280
var err error
281281

282-
pluginImages, err = o.annotatedCSVs()
282+
pluginImages, err = o.annotatedCSVs(ctx)
283283
if err != nil {
284284
return err
285285
}
286286

287-
cos, err := o.ConfigClient.ConfigV1().ClusterOperators().List(context.TODO(), metav1.ListOptions{})
287+
cos, err := o.ConfigClient.ConfigV1().ClusterOperators().List(ctx, metav1.ListOptions{})
288288
if err != nil {
289289
return err
290290
}
@@ -305,15 +305,15 @@ func (o *MustGatherOptions) completeImages() error {
305305
return nil
306306
}
307307

308-
func (o *MustGatherOptions) annotatedCSVs() (map[string]struct{}, error) {
308+
func (o *MustGatherOptions) annotatedCSVs(ctx context.Context) (map[string]struct{}, error) {
309309
csvGVR := schema.GroupVersionResource{
310310
Group: "operators.coreos.com",
311311
Version: "v1alpha1",
312312
Resource: "clusterserviceversions",
313313
}
314314
pluginImages := make(map[string]struct{})
315315

316-
csvs, err := o.DynamicClient.Resource(csvGVR).List(context.TODO(), metav1.ListOptions{})
316+
csvs, err := o.DynamicClient.Resource(csvGVR).List(ctx, metav1.ListOptions{})
317317
if err != nil {
318318
return nil, err
319319
}
@@ -327,12 +327,12 @@ func (o *MustGatherOptions) annotatedCSVs() (map[string]struct{}, error) {
327327
return pluginImages, nil
328328
}
329329

330-
func (o *MustGatherOptions) resolveImageStreamTagString(s string) (string, error) {
330+
func (o *MustGatherOptions) resolveImageStreamTagString(ctx context.Context, s string) (string, error) {
331331
namespace, name, tag := parseImageStreamTagString(s)
332332
if len(namespace) == 0 {
333333
return "", fmt.Errorf("expected namespace/name:tag")
334334
}
335-
return o.resolveImageStreamTag(namespace, name, tag)
335+
return o.resolveImageStreamTag(ctx, namespace, name, tag)
336336
}
337337

338338
func parseImageStreamTagString(s string) (string, string, string) {
@@ -349,8 +349,8 @@ func parseImageStreamTagString(s string) (string, string, string) {
349349
return namespace, name, tag
350350
}
351351

352-
func (o *MustGatherOptions) resolveImageStreamTag(namespace, name, tag string) (string, error) {
353-
imageStream, err := o.ImageClient.ImageStreams(namespace).Get(context.TODO(), name, metav1.GetOptions{})
352+
func (o *MustGatherOptions) resolveImageStreamTag(ctx context.Context, namespace, name, tag string) (string, error) {
353+
imageStream, err := o.ImageClient.ImageStreams(namespace).Get(ctx, name, metav1.GetOptions{})
354354
if err != nil {
355355
return "", err
356356
}
@@ -557,6 +557,10 @@ func getCandidateNodeNames(nodes *corev1.NodeList, hasMaster bool) []string {
557557
func (o *MustGatherOptions) Run() error {
558558
var errs []error
559559

560+
// The following context is being used for now until proper signal handling is implemented.
561+
ctx, cancel := context.WithCancel(context.TODO())
562+
defer cancel()
563+
560564
if err := os.MkdirAll(o.DestDir, os.ModePerm); err != nil {
561565
// ensure the errors bubble up to BackupGathering method for display
562566
errs = []error{err}
@@ -579,14 +583,20 @@ func (o *MustGatherOptions) Run() error {
579583
}
580584

581585
// print at both the beginning and at the end. This information is important enough to be in both spots.
582-
o.PrintBasicClusterState(context.TODO())
586+
o.PrintBasicClusterState(ctx)
583587
defer func() {
588+
// Shortcircuit this in case the context is already cancelled.
589+
if ctx.Err() != nil {
590+
klog.Warning("Reprinting cluster state skipped, terminating...")
591+
return
592+
}
584593
fmt.Fprintf(o.RawOut, "\n\n")
585594
fmt.Fprintf(o.RawOut, "Reprinting Cluster State:\n")
586-
o.PrintBasicClusterState(context.TODO())
595+
o.PrintBasicClusterState(ctx)
587596
}()
588597

589598
// Ensure resource cleanup unless instructed otherwise ...
599+
// There is no context passed into the cleanup function as it's unrelated to the main context.
590600
var cleanupNamespace func()
591601
if !o.Keep {
592602
defer func() {
@@ -596,19 +606,20 @@ func (o *MustGatherOptions) Run() error {
596606
}()
597607
}
598608

599-
// Due to 'stack unwiding', this should happen after 'clusterState' printing, to ensure that we always
600-
// print our ClusterState information.
609+
// Due to 'stack unwinding', this should happen after 'clusterState' printing, to ensure that we always
610+
// print our ClusterState information.
601611
runBackCollection := true
602612
defer func() {
603-
if !runBackCollection {
613+
// Shortcircuit this in case the context is already cancelled.
614+
if ctx.Err() != nil || !runBackCollection {
604615
return
605616
}
606-
o.BackupGathering(context.TODO(), errs)
617+
o.BackupGathering(ctx, errs)
607618
}()
608619

609620
// Get or create "working" namespace ...
610621
var ns *corev1.Namespace
611-
ns, cleanupNamespace, err = o.getNamespace()
622+
ns, cleanupNamespace, err = o.getNamespace(ctx)
612623
if err != nil {
613624
// ensure the errors bubble up to BackupGathering method for display
614625
errs = []error{err}
@@ -617,7 +628,7 @@ func (o *MustGatherOptions) Run() error {
617628

618629
// Prefer to run in master if there's any but don't be explicit otherwise.
619630
// This enables the command to run by default in hypershift where there's no masters.
620-
nodes, err := o.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
631+
nodes, err := o.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{
621632
LabelSelector: o.NodeSelector,
622633
})
623634
if err != nil {
@@ -645,7 +656,7 @@ func (o *MustGatherOptions) Run() error {
645656
return err
646657
}
647658
if o.NodeSelector != "" {
648-
nodes, err := o.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{
659+
nodes, err := o.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{
649660
LabelSelector: o.NodeSelector,
650661
})
651662
if err != nil {
@@ -658,7 +669,7 @@ func (o *MustGatherOptions) Run() error {
658669
}
659670
} else {
660671
if o.NodeName != "" {
661-
if _, err := o.Client.CoreV1().Nodes().Get(context.TODO(), o.NodeName, metav1.GetOptions{}); err != nil {
672+
if _, err := o.Client.CoreV1().Nodes().Get(ctx, o.NodeName, metav1.GetOptions{}); err != nil {
662673
// ensure the errors bubble up to BackupGathering method for display
663674
errs = []error{err}
664675
return err
@@ -686,6 +697,11 @@ func (o *MustGatherOptions) Run() error {
686697
}
687698
queue.ShutDownWithDrain()
688699

700+
go func() {
701+
<-ctx.Done()
702+
queue.ShutDown()
703+
}()
704+
689705
wg.Add(concurrentMG)
690706
for i := 0; i < concurrentMG; i++ {
691707
go func() {
@@ -730,9 +746,9 @@ func (o *MustGatherOptions) Run() error {
730746
}
731747

732748
// processNextWorkItem creates & processes the must-gather pod and returns error if any
733-
func (o *MustGatherOptions) processNextWorkItem(ns string, pod *corev1.Pod) error {
749+
func (o *MustGatherOptions) processNextWorkItem(ctx context.Context, ns string, pod *corev1.Pod) error {
734750
var err error
735-
pod, err = o.Client.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
751+
pod, err = o.Client.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
736752
if err != nil {
737753
return err
738754
}
@@ -747,8 +763,7 @@ func (o *MustGatherOptions) processNextWorkItem(ns string, pod *corev1.Pod) erro
747763
// it deletes this namespace and all the pods are removed by garbage collector.
748764
// However, if user specifies namespace via `run-namespace`, these pods need to
749765
// be deleted manually.
750-
err = o.Client.CoreV1().Pods(o.RunNamespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
751-
if err != nil {
766+
if err := o.Client.CoreV1().Pods(o.RunNamespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
752767
klog.V(4).Infof("pod deletion error %v", err)
753768
}
754769
}()
@@ -757,19 +772,18 @@ func (o *MustGatherOptions) processNextWorkItem(ns string, pod *corev1.Pod) erro
757772
log := o.newPodOutLogger(o.Out, pod.Name)
758773

759774
// wait for gather container to be running (gather is running)
760-
if err := o.waitForGatherContainerRunning(pod); err != nil {
775+
if err := o.waitForGatherContainerRunning(ctx, pod); err != nil {
761776
log("gather did not start: %s", err)
762-
return fmt.Errorf("gather did not start for pod %s: %s", pod.Name, err)
763-
777+
return fmt.Errorf("gather did not start for pod %s: %w", pod.Name, err)
764778
}
765779
// stream gather container logs
766-
if err := o.getGatherContainerLogs(pod); err != nil {
780+
if err := o.getGatherContainerLogs(ctx, pod); err != nil {
767781
log("gather logs unavailable: %v", err)
768782
}
769783

770784
// wait for pod to be running (gather has completed)
771785
log("waiting for gather to complete")
772-
if err := o.waitForGatherToComplete(pod); err != nil {
786+
if err := o.waitForGatherToComplete(ctx, pod); err != nil {
773787
log("gather never finished: %v", err)
774788
if exiterr, ok := err.(*exec.CodeExitError); ok {
775789
return exiterr
@@ -779,12 +793,12 @@ func (o *MustGatherOptions) processNextWorkItem(ns string, pod *corev1.Pod) erro
779793

780794
// copy the gathered files into the local destination dir
781795
log("downloading gather output")
782-
pod, err = o.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
796+
pod, err = o.Client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
783797
if err != nil {
784798
log("gather output not downloaded: %v\n", err)
785799
return fmt.Errorf("unable to download output from pod %s: %s", pod.Name, err)
786800
}
787-
if err := o.copyFilesFromPod(pod); err != nil {
801+
if err := o.copyFilesFromPod(ctx, pod); err != nil {
788802
log("gather output not downloaded: %v\n", err)
789803
return fmt.Errorf("unable to download output from pod %s: %s", pod.Name, err)
790804
}
@@ -811,7 +825,7 @@ func (o *MustGatherOptions) logTimestamp() error {
811825
return err
812826
}
813827

814-
func (o *MustGatherOptions) copyFilesFromPod(pod *corev1.Pod) error {
828+
func (o *MustGatherOptions) copyFilesFromPod(ctx context.Context, pod *corev1.Pod) error {
815829
streams := o.IOStreams
816830
streams.Out = o.newPrefixWriter(streams.Out, fmt.Sprintf("[%s] OUT", pod.Name), false, true)
817831
imageFolder := regexp.MustCompile("[^A-Za-z0-9]+").ReplaceAllString(pod.Status.ContainerStatuses[0].ImageID, "-")
@@ -839,7 +853,7 @@ func (o *MustGatherOptions) copyFilesFromPod(pod *corev1.Pod) error {
839853
Container: gatherContainerName,
840854
Timestamps: true,
841855
}
842-
readCloser, err := o.Client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, logOptions).Stream(context.TODO())
856+
readCloser, err := o.Client.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, logOptions).Stream(ctx)
843857
if err != nil {
844858
return err
845859
}
@@ -873,7 +887,7 @@ func (o *MustGatherOptions) copyFilesFromPod(pod *corev1.Pod) error {
873887
return kutilerrors.NewAggregate(errs)
874888
}
875889

876-
func (o *MustGatherOptions) getGatherContainerLogs(pod *corev1.Pod) error {
890+
func (o *MustGatherOptions) getGatherContainerLogs(ctx context.Context, pod *corev1.Pod) error {
877891
since2s := int64(2)
878892
opts := &logs.LogsOptions{
879893
Namespace: pod.Namespace,
@@ -894,13 +908,15 @@ func (o *MustGatherOptions) getGatherContainerLogs(pod *corev1.Pod) error {
894908
// gather script might take longer than the default API server time,
895909
// so we should check if the gather script still runs and re-run logs
896910
// thus we run this in a loop
911+
//
912+
// TODO: Use opts.RunLogsContext once Kubernetes v1.35 is available.
897913
if err := opts.RunLogs(); err != nil {
898914
return err
899915
}
900916

901917
// to ensure we don't print all of history set since to past 2 seconds
902918
opts.Options.(*corev1.PodLogOptions).SinceSeconds = &since2s
903-
if done, _ := o.isGatherDone(pod); done {
919+
if done, _ := o.isGatherDone(ctx, pod); done {
904920
return nil
905921
}
906922
klog.V(4).Infof("lost logs, re-trying...")
@@ -931,15 +947,15 @@ func (o *MustGatherOptions) newPrefixWriter(out io.Writer, prefix string, ignore
931947
return writer
932948
}
933949

934-
func (o *MustGatherOptions) waitForGatherToComplete(pod *corev1.Pod) error {
935-
return wait.PollUntilContextTimeout(context.TODO(), 10*time.Second, o.Timeout, true, func(ctx context.Context) (bool, error) {
936-
return o.isGatherDone(pod)
950+
func (o *MustGatherOptions) waitForGatherToComplete(ctx context.Context, pod *corev1.Pod) error {
951+
return wait.PollUntilContextTimeout(ctx, 10*time.Second, o.Timeout, true, func(ctx context.Context) (bool, error) {
952+
return o.isGatherDone(ctx, pod)
937953
})
938954
}
939955

940-
func (o *MustGatherOptions) isGatherDone(pod *corev1.Pod) (bool, error) {
956+
func (o *MustGatherOptions) isGatherDone(ctx context.Context, pod *corev1.Pod) (bool, error) {
941957
var err error
942-
if pod, err = o.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err != nil {
958+
if pod, err = o.Client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil {
943959
// at this stage pod should exist, we've been gathering container logs, so error if not found
944960
if kerrors.IsNotFound(err) {
945961
return true, err
@@ -971,10 +987,10 @@ func (o *MustGatherOptions) isGatherDone(pod *corev1.Pod) (bool, error) {
971987
return false, nil
972988
}
973989

974-
func (o *MustGatherOptions) waitForGatherContainerRunning(pod *corev1.Pod) error {
975-
return wait.PollUntilContextTimeout(context.TODO(), 10*time.Second, o.Timeout, true, func(ctx context.Context) (bool, error) {
990+
func (o *MustGatherOptions) waitForGatherContainerRunning(ctx context.Context, pod *corev1.Pod) error {
991+
return wait.PollUntilContextTimeout(ctx, 10*time.Second, o.Timeout, true, func(ctx context.Context) (bool, error) {
976992
var err error
977-
if pod, err = o.Client.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{}); err == nil {
993+
if pod, err = o.Client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err == nil {
978994
if len(pod.Status.ContainerStatuses) == 0 {
979995
return false, nil
980996
}
@@ -996,21 +1012,21 @@ func (o *MustGatherOptions) waitForGatherContainerRunning(pod *corev1.Pod) error
9961012
})
9971013
}
9981014

999-
func (o *MustGatherOptions) getNamespace() (*corev1.Namespace, func(), error) {
1015+
func (o *MustGatherOptions) getNamespace(ctx context.Context) (*corev1.Namespace, func(), error) {
10001016
if o.RunNamespace == "" {
1001-
return o.createTempNamespace()
1017+
return o.createTempNamespace(ctx)
10021018
}
10031019

1004-
ns, err := o.Client.CoreV1().Namespaces().Get(context.TODO(), o.RunNamespace, metav1.GetOptions{})
1020+
ns, err := o.Client.CoreV1().Namespaces().Get(ctx, o.RunNamespace, metav1.GetOptions{})
10051021
if err != nil {
10061022
return nil, nil, fmt.Errorf("retrieving namespace %q: %w", o.RunNamespace, err)
10071023
}
10081024

10091025
return ns, func() {}, nil
10101026
}
10111027

1012-
func (o *MustGatherOptions) createTempNamespace() (*corev1.Namespace, func(), error) {
1013-
ns, err := o.Client.CoreV1().Namespaces().Create(context.TODO(), newNamespace(), metav1.CreateOptions{})
1028+
func (o *MustGatherOptions) createTempNamespace(ctx context.Context) (*corev1.Namespace, func(), error) {
1029+
ns, err := o.Client.CoreV1().Namespaces().Create(ctx, newNamespace(), metav1.CreateOptions{})
10141030
if err != nil {
10151031
return nil, nil, fmt.Errorf("creating temp namespace: %w", err)
10161032
}
@@ -1241,19 +1257,19 @@ func (o *MustGatherOptions) BackupGathering(ctx context.Context, errs []error) {
12411257
streams.Out = o.newPrefixWriter(streams.Out, fmt.Sprintf("[must-gather ] OUT"), false, true)
12421258
destDir := path.Join(o.DestDir, fmt.Sprintf("inspect.local.%06d", rand.Int63()))
12431259

1244-
if err := runInspect(streams, rest.CopyConfig(o.Config), destDir, []string{typeTargets}); err != nil {
1260+
if err := runInspect(ctx, streams, rest.CopyConfig(o.Config), destDir, []string{typeTargets}); err != nil {
12451261
fmt.Fprintf(o.ErrOut, "error completing cluster type inspection: %v\n", err)
12461262
}
12471263

12481264
fmt.Fprintf(o.ErrOut, "Falling back to `oc adm inspect %s` to collect basic cluster named resources.\n", strings.Join(namedTargets, " "))
12491265

1250-
if err := runInspect(streams, rest.CopyConfig(o.Config), destDir, namedTargets); err != nil {
1266+
if err := runInspect(ctx, streams, rest.CopyConfig(o.Config), destDir, namedTargets); err != nil {
12511267
fmt.Fprintf(o.ErrOut, "error completing cluster named resource inspection: %v\n", err)
12521268
}
12531269
return
12541270
}
12551271

1256-
func runInspect(streams genericiooptions.IOStreams, config *rest.Config, destDir string, arguments []string) error {
1272+
func runInspect(ctx context.Context, streams genericiooptions.IOStreams, config *rest.Config, destDir string, arguments []string) error {
12571273
inspectOptions := inspect.NewInspectOptions(streams)
12581274
inspectOptions.RESTConfig = config
12591275
inspectOptions.DestDir = destDir
@@ -1264,7 +1280,7 @@ func runInspect(streams genericiooptions.IOStreams, config *rest.Config, destDir
12641280
if err := inspectOptions.Validate(); err != nil {
12651281
return fmt.Errorf("error validating backup collection: %w", err)
12661282
}
1267-
if err := inspectOptions.Run(); err != nil {
1283+
if err := inspectOptions.RunContext(ctx); err != nil {
12681284
return fmt.Errorf("error running backup collection: %w", err)
12691285
}
12701286
return nil

0 commit comments

Comments
 (0)