Skip to content

Commit 5dff126

Browse files
Merge pull request #29058 from vrutkovs/in-cluster-fixes-v6
Reapply "OCPBUGS-18865: Reapply "Merge pull request #28944 from vrutkovs/in-cluster-fixes-v4""
2 parents 6807d0c + 0cf9eea commit 5dff126

File tree

34 files changed

+1500
-621
lines changed

34 files changed

+1500
-621
lines changed

pkg/clioptions/iooptions/io_options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package iooptions
22

33
import (
4+
"fmt"
45
"io"
56
"os"
7+
"path"
68

79
"github.com/spf13/pflag"
810
"k8s.io/cli-runtime/pkg/genericclioptions"
@@ -36,6 +38,11 @@ func (o *OutputFlags) ConfigureIOStreams(streams genericclioptions.IOStreams, st
3638
return doNothing, nil
3739
}
3840

41+
dir := path.Dir(o.OutFile)
42+
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
43+
return doNothing, fmt.Errorf("failed to create parentdir %q: %w", dir, err)
44+
}
45+
3946
f, err := os.OpenFile(o.OutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
4047
if err != nil {
4148
return doNothing, err

pkg/cmd/openshift-tests/dev/dev.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ a running cluster.
160160
logrus.Infof("loaded %d intervals", len(intervals))
161161

162162
logrus.Info("running tests")
163-
junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals)
163+
junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals, nil)
164164
for _, junit := range junits {
165165
if junit.FailureOutput != nil {
166166
logrus.Errorf("FAIL: %s", junit.Name)

pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@ import (
1919

2020
"github.com/spf13/pflag"
2121

22+
"github.com/openshift/origin/pkg/defaultmonitortests"
23+
"github.com/openshift/origin/pkg/monitor"
2224
"github.com/spf13/cobra"
2325
"k8s.io/cli-runtime/pkg/genericclioptions"
2426
"k8s.io/kubectl/pkg/util/templates"
25-
26-
"github.com/openshift/origin/pkg/defaultmonitortests"
27-
"github.com/openshift/origin/pkg/monitor"
2827
)
2928

3029
type RunMonitorFlags struct {

pkg/cmd/openshift-tests/run-disruption/disruption.go

Lines changed: 182 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -6,46 +6,51 @@ import (
66
"io"
77
"os"
88
"os/signal"
9-
"path/filepath"
9+
"sync"
1010
"syscall"
11-
"time"
1211

13-
"github.com/openshift/origin/pkg/clioptions/clusterinfo"
12+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1413

15-
monitorserialization "github.com/openshift/origin/pkg/monitor/serialization"
16-
17-
"k8s.io/cli-runtime/pkg/genericclioptions"
18-
"k8s.io/client-go/kubernetes"
19-
"k8s.io/client-go/rest"
20-
"k8s.io/klog/v2"
21-
"k8s.io/kubectl/pkg/util/templates"
14+
"k8s.io/apimachinery/pkg/fields"
2215

16+
"github.com/openshift/origin/pkg/clioptions/iooptions"
2317
"github.com/openshift/origin/pkg/disruption/backend"
18+
disruptionci "github.com/openshift/origin/pkg/disruption/ci"
2419
"github.com/openshift/origin/pkg/monitor"
25-
"github.com/openshift/origin/pkg/monitor/apiserveravailability"
26-
"github.com/openshift/origin/pkg/monitor/monitorapi"
2720
"github.com/openshift/origin/test/extended/util/disruption/controlplane"
2821
"github.com/spf13/cobra"
22+
"github.com/spf13/pflag"
23+
corev1 "k8s.io/api/core/v1"
24+
apimachinerywatch "k8s.io/apimachinery/pkg/watch"
25+
"k8s.io/cli-runtime/pkg/genericclioptions"
26+
"k8s.io/client-go/kubernetes"
27+
"k8s.io/client-go/rest"
28+
"k8s.io/client-go/tools/cache"
29+
"k8s.io/client-go/tools/watch"
30+
"k8s.io/kubectl/pkg/util/templates"
2931
)
3032

31-
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
32-
type RunAPIDisruptionMonitorOptions struct {
33-
Out, ErrOut io.Writer
33+
type RunAPIDisruptionMonitorFlags struct {
34+
ConfigFlags *genericclioptions.ConfigFlags
35+
OutputFlags *iooptions.OutputFlags
36+
37+
ArtifactDir string
38+
LoadBalancerType string
39+
StopConfigMapName string
3440

35-
ArtifactDir string
36-
LoadBalancerType string
37-
ExtraMessage string
41+
genericclioptions.IOStreams
3842
}
3943

40-
func NewRunInClusterDisruptionMonitorOptions(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorOptions {
41-
return &RunAPIDisruptionMonitorOptions{
42-
Out: ioStreams.Out,
43-
ErrOut: ioStreams.ErrOut,
44+
func NewRunInClusterDisruptionMonitorFlags(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorFlags {
45+
return &RunAPIDisruptionMonitorFlags{
46+
ConfigFlags: genericclioptions.NewConfigFlags(false),
47+
OutputFlags: iooptions.NewOutputOptions(),
48+
IOStreams: ioStreams,
4449
}
4550
}
4651

4752
func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStreams) *cobra.Command {
48-
disruptionOpt := NewRunInClusterDisruptionMonitorOptions(ioStreams)
53+
f := NewRunInClusterDisruptionMonitorFlags(ioStreams)
4954
cmd := &cobra.Command{
5055
Use: "run-disruption",
5156
Short: "Run API server disruption monitor",
@@ -56,122 +61,183 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea
5661
SilenceUsage: true,
5762
SilenceErrors: true,
5863
RunE: func(cmd *cobra.Command, args []string) error {
59-
return disruptionOpt.Run()
64+
ctx, cancelFn := context.WithCancel(context.Background())
65+
defer cancelFn()
66+
abortCh := make(chan os.Signal, 2)
67+
go func() {
68+
<-abortCh
69+
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
70+
cancelFn()
71+
72+
sig := <-abortCh
73+
fmt.Fprintf(f.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
74+
switch sig {
75+
case syscall.SIGINT:
76+
os.Exit(130)
77+
default:
78+
os.Exit(0)
79+
}
80+
}()
81+
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)
82+
83+
if err := f.Validate(); err != nil {
84+
return err
85+
}
86+
87+
o, err := f.ToOptions()
88+
if err != nil {
89+
return err
90+
}
91+
92+
return o.Run(ctx)
6093
},
6194
}
62-
cmd.Flags().StringVar(&disruptionOpt.ArtifactDir,
63-
"artifact-dir", disruptionOpt.ArtifactDir,
64-
"The directory where monitor events will be stored.")
65-
cmd.Flags().StringVar(&disruptionOpt.LoadBalancerType,
66-
"lb-type", disruptionOpt.LoadBalancerType,
67-
"Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
68-
cmd.Flags().StringVar(&disruptionOpt.ExtraMessage,
69-
"extra-message", disruptionOpt.ExtraMessage,
70-
"Add custom label to disruption event message")
95+
96+
f.AddFlags(cmd.Flags())
97+
7198
return cmd
7299
}
73100

74-
func (opt *RunAPIDisruptionMonitorOptions) Run() error {
75-
restConfig, err := clusterinfo.GetMonitorRESTConfig()
76-
if err != nil {
77-
return err
78-
}
101+
func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) {
102+
flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
103+
flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.")
79104

80-
lb := backend.ParseStringToLoadBalancerType(opt.LoadBalancerType)
105+
f.ConfigFlags.AddFlags(flags)
106+
f.OutputFlags.BindFlags(flags)
107+
}
81108

82-
ctx, cancelFn := context.WithCancel(context.Background())
83-
defer cancelFn()
84-
abortCh := make(chan os.Signal, 2)
85-
go func() {
86-
<-abortCh
87-
fmt.Fprintf(opt.ErrOut, "Interrupted, terminating\n")
88-
// Give some time to store intervals on disk
89-
time.Sleep(5 * time.Second)
90-
cancelFn()
91-
sig := <-abortCh
92-
fmt.Fprintf(opt.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
93-
switch sig {
94-
case syscall.SIGINT:
95-
os.Exit(130)
96-
default:
97-
os.Exit(0)
98-
}
99-
}()
100-
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)
109+
func (f *RunAPIDisruptionMonitorFlags) SetIOStreams(streams genericclioptions.IOStreams) {
110+
f.IOStreams = streams
111+
}
101112

102-
recorder, err := StartAPIAvailability(ctx, restConfig, lb)
103-
if err != nil {
104-
return err
113+
func (f *RunAPIDisruptionMonitorFlags) Validate() error {
114+
if len(f.OutputFlags.OutFile) == 0 {
115+
return fmt.Errorf("output-file must be specified")
116+
}
117+
if len(f.StopConfigMapName) == 0 {
118+
return fmt.Errorf("stop-configmap must be specified")
105119
}
106120

107-
go func() {
108-
ticker := time.NewTicker(100 * time.Millisecond)
109-
defer ticker.Stop()
110-
var last time.Time
111-
done := false
112-
for !done {
113-
select {
114-
case <-ticker.C:
115-
case <-ctx.Done():
116-
done = true
117-
}
118-
events := recorder.Intervals(last, time.Time{})
119-
if len(events) > 0 {
120-
for _, event := range events {
121-
if !event.From.Equal(event.To) {
122-
continue
123-
}
124-
fmt.Fprintln(opt.Out, event.String())
125-
}
126-
last = events[len(events)-1].From
127-
}
128-
}
129-
}()
130-
131-
<-ctx.Done()
121+
return nil
122+
}
132123

133-
// Store intervals to artifact directory
134-
intervals := recorder.Intervals(time.Time{}, time.Time{})
135-
if len(opt.ExtraMessage) > 0 {
136-
fmt.Fprintf(opt.Out, "\nAppending %s to recorded event message\n", opt.ExtraMessage)
137-
for i, event := range intervals {
138-
intervals[i].Message.HumanMessage = fmt.Sprintf("%s user-provided-message=%s", event.Message.HumanMessage, opt.ExtraMessage)
139-
}
124+
func (f *RunAPIDisruptionMonitorFlags) ToOptions() (*RunAPIDisruptionMonitorOptions, error) {
125+
originalOutStream := f.IOStreams.Out
126+
closeFn, err := f.OutputFlags.ConfigureIOStreams(f.IOStreams, f)
127+
if err != nil {
128+
return nil, err
140129
}
141130

142-
eventDir := filepath.Join(opt.ArtifactDir, monitorapi.EventDir)
143-
if err := os.MkdirAll(eventDir, os.ModePerm); err != nil {
144-
fmt.Printf("Failed to create monitor-events directory, err: %v\n", err)
145-
return err
131+
namespace, _, err := f.ConfigFlags.ToRawKubeConfigLoader().Namespace()
132+
if err != nil {
133+
return nil, err
134+
}
135+
if len(namespace) == 0 {
136+
return nil, fmt.Errorf("namespace must be specified")
146137
}
147138

148-
timeSuffix := fmt.Sprintf("_%s", time.Now().UTC().Format("20060102-150405"))
149-
if err := monitorserialization.EventsToFile(filepath.Join(eventDir, fmt.Sprintf("e2e-events%s.json", timeSuffix)), intervals); err != nil {
150-
fmt.Printf("Failed to write event data, err: %v\n", err)
151-
return err
139+
restConfig, err := f.ConfigFlags.ToRESTConfig()
140+
if err != nil {
141+
return nil, err
142+
}
143+
kubeClient, err := kubernetes.NewForConfig(restConfig)
144+
if err != nil {
145+
return nil, err
152146
}
153-
fmt.Fprintf(opt.Out, "\nEvent data written, exiting\n")
154147

155-
return nil
148+
return &RunAPIDisruptionMonitorOptions{
149+
KubeClient: kubeClient,
150+
KubeClientConfig: restConfig,
151+
OutputFile: f.OutputFlags.OutFile,
152+
LoadBalancerType: f.LoadBalancerType,
153+
StopConfigMapName: f.StopConfigMapName,
154+
Namespace: namespace,
155+
CloseFn: closeFn,
156+
OriginalOutFile: originalOutStream,
157+
IOStreams: f.IOStreams,
158+
}, nil
156159
}
157160

158-
// StartAPIAvailability monitors just the cluster availability
159-
func StartAPIAvailability(ctx context.Context, restConfig *rest.Config, lb backend.LoadBalancerType) (monitorapi.Recorder, error) {
160-
recorder := monitor.NewRecorder()
161+
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
162+
type RunAPIDisruptionMonitorOptions struct {
163+
KubeClient kubernetes.Interface
164+
KubeClientConfig *rest.Config
165+
OutputFile string
166+
LoadBalancerType string
167+
StopConfigMapName string
168+
Namespace string
169+
170+
OriginalOutFile io.Writer
171+
CloseFn iooptions.CloseFunc
172+
genericclioptions.IOStreams
173+
}
161174

162-
client, err := kubernetes.NewForConfig(restConfig)
163-
if err != nil {
164-
return nil, err
175+
func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error {
176+
ctx, cancelFn := context.WithCancel(ctx)
177+
defer cancelFn()
178+
179+
fmt.Fprintf(o.Out, "Starting up.")
180+
181+
startingContent, err := os.ReadFile(o.OutputFile)
182+
if err != nil && !os.IsNotExist(err) {
183+
return err
165184
}
166-
if err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, restConfig, lb); err != nil {
167-
return nil, err
185+
if len(startingContent) > 0 {
186+
// print starting content to the log so that we can simply scrape the log to find all entries at the end.
187+
o.OriginalOutFile.Write(startingContent)
168188
}
169189

170-
// read the state of the cluster apiserver client access issues *before* any test (like upgrade) begins
171-
intervals, err := apiserveravailability.APIServerAvailabilityIntervalsFromCluster(client, time.Time{}, time.Time{})
190+
lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType)
191+
192+
recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
193+
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb)
172194
if err != nil {
173-
klog.Errorf("error reading initial apiserver availability: %v", err)
195+
return err
174196
}
175-
recorder.AddIntervals(intervals...)
176-
return recorder, nil
197+
198+
go func(ctx context.Context) {
199+
defer cancelFn()
200+
err := o.WaitForStopSignal(ctx)
201+
if err != nil {
202+
fmt.Fprintf(o.ErrOut, "failure waiting for stop: %v", err)
203+
}
204+
}(ctx)
205+
206+
<-ctx.Done()
207+
208+
fmt.Fprintf(o.Out, "waiting for samplers to stop")
209+
wg := sync.WaitGroup{}
210+
for i := range samplers {
211+
wg.Add(1)
212+
func(sampler disruptionci.Sampler) {
213+
defer wg.Done()
214+
sampler.Stop()
215+
}(samplers[i])
216+
}
217+
wg.Wait()
218+
fmt.Fprintf(o.Out, "samplers stopped")
219+
220+
return nil
221+
}
222+
223+
func (o *RunAPIDisruptionMonitorOptions) WaitForStopSignal(ctx context.Context) error {
224+
defer utilruntime.HandleCrash()
225+
226+
_, err := watch.UntilWithSync(
227+
ctx,
228+
cache.NewListWatchFromClient(
229+
o.KubeClient.CoreV1().RESTClient(), "configmaps", o.Namespace, fields.OneTermEqualSelector("metadata.name", o.StopConfigMapName)),
230+
&corev1.ConfigMap{},
231+
nil,
232+
func(event apimachinerywatch.Event) (bool, error) {
233+
switch event.Type {
234+
case apimachinerywatch.Added:
235+
return true, nil
236+
case apimachinerywatch.Modified:
237+
return true, nil
238+
}
239+
return false, nil
240+
},
241+
)
242+
return err
177243
}

0 commit comments

Comments
 (0)