Skip to content

Commit f05820a

Browse files
authored
Merge pull request #28230 from stbenjam/revert-28081-in-cluster-fixes
Revert "in-cluster disruption: ensure that only one monitor is started in cluster"
2 parents 6164d1a + 29d5efd commit f05820a

File tree

26 files changed

+885
-852
lines changed

26 files changed

+885
-852
lines changed

pkg/clioptions/iooptions/io_options.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package iooptions
22

33
import (
4-
"fmt"
54
"io"
65
"os"
7-
"path"
86

97
"github.com/spf13/pflag"
108
"k8s.io/cli-runtime/pkg/genericclioptions"
@@ -38,11 +36,6 @@ func (o *OutputFlags) ConfigureIOStreams(streams genericclioptions.IOStreams, st
3836
return doNothing, nil
3937
}
4038

41-
dir := path.Dir(o.OutFile)
42-
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
43-
return doNothing, fmt.Errorf("failed to create parentdir %q: %w", dir, err)
44-
}
45-
4639
f, err := os.OpenFile(o.OutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
4740
if err != nil {
4841
return doNothing, err

pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/spf13/pflag"
1818

1919
"github.com/openshift/origin/pkg/defaultmonitortests"
20+
"github.com/openshift/origin/pkg/disruption/backend/sampler"
2021
"github.com/openshift/origin/pkg/monitor"
2122
"github.com/spf13/cobra"
2223
"k8s.io/cli-runtime/pkg/genericclioptions"
@@ -122,6 +123,7 @@ func (f *RunMonitorOptions) Run() error {
122123
go func() {
123124
<-abortCh
124125
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
126+
sampler.TearDownInClusterMonitors(restConfig)
125127
cancelFn()
126128

127129
sig := <-abortCh

pkg/cmd/openshift-tests/run-disruption/disruption.go

Lines changed: 114 additions & 182 deletions
Original file line numberDiff line numberDiff line change
@@ -6,51 +6,44 @@ import (
66
"io"
77
"os"
88
"os/signal"
9-
"sync"
9+
"path/filepath"
1010
"syscall"
11+
"time"
1112

12-
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
13+
monitorserialization "github.com/openshift/origin/pkg/monitor/serialization"
1314

14-
"k8s.io/apimachinery/pkg/fields"
15+
"k8s.io/cli-runtime/pkg/genericclioptions"
16+
"k8s.io/client-go/kubernetes"
17+
"k8s.io/client-go/rest"
18+
"k8s.io/klog/v2"
19+
"k8s.io/kubectl/pkg/util/templates"
1520

16-
"github.com/openshift/origin/pkg/clioptions/iooptions"
1721
"github.com/openshift/origin/pkg/disruption/backend"
18-
disruptionci "github.com/openshift/origin/pkg/disruption/ci"
1922
"github.com/openshift/origin/pkg/monitor"
23+
"github.com/openshift/origin/pkg/monitor/apiserveravailability"
24+
"github.com/openshift/origin/pkg/monitor/monitorapi"
2025
"github.com/openshift/origin/test/extended/util/disruption/controlplane"
2126
"github.com/spf13/cobra"
22-
"github.com/spf13/pflag"
23-
corev1 "k8s.io/api/core/v1"
24-
apimachinerywatch "k8s.io/apimachinery/pkg/watch"
25-
"k8s.io/cli-runtime/pkg/genericclioptions"
26-
"k8s.io/client-go/kubernetes"
27-
"k8s.io/client-go/rest"
28-
"k8s.io/client-go/tools/cache"
29-
"k8s.io/client-go/tools/watch"
30-
"k8s.io/kubectl/pkg/util/templates"
3127
)
3228

33-
type RunAPIDisruptionMonitorFlags struct {
34-
ConfigFlags *genericclioptions.ConfigFlags
35-
OutputFlags *iooptions.OutputFlags
36-
37-
ArtifactDir string
38-
LoadBalancerType string
39-
StopConfigMapName string
29+
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
30+
type RunAPIDisruptionMonitorOptions struct {
31+
Out, ErrOut io.Writer
4032

41-
genericclioptions.IOStreams
33+
ArtifactDir string
34+
LoadBalancerType string
35+
ExtraMessage string
4236
}
4337

44-
func NewRunInClusterDisruptionMonitorFlags(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorFlags {
45-
return &RunAPIDisruptionMonitorFlags{
46-
ConfigFlags: genericclioptions.NewConfigFlags(false),
47-
OutputFlags: iooptions.NewOutputOptions(),
48-
IOStreams: ioStreams,
38+
func NewRunInClusterDisruptionMonitorOptions(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorOptions {
39+
return &RunAPIDisruptionMonitorOptions{
40+
Out: ioStreams.Out,
41+
ErrOut: ioStreams.ErrOut,
4942
}
5043
}
5144

5245
func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStreams) *cobra.Command {
53-
f := NewRunInClusterDisruptionMonitorFlags(ioStreams)
46+
disruptionOpt := NewRunInClusterDisruptionMonitorOptions(ioStreams)
5447
cmd := &cobra.Command{
5548
Use: "run-disruption",
5649
Short: "Run API server disruption monitor",
@@ -61,183 +54,122 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea
6154
SilenceUsage: true,
6255
SilenceErrors: true,
6356
RunE: func(cmd *cobra.Command, args []string) error {
64-
ctx, cancelFn := context.WithCancel(context.Background())
65-
defer cancelFn()
66-
abortCh := make(chan os.Signal, 2)
67-
go func() {
68-
<-abortCh
69-
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
70-
cancelFn()
71-
72-
sig := <-abortCh
73-
fmt.Fprintf(f.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
74-
switch sig {
75-
case syscall.SIGINT:
76-
os.Exit(130)
77-
default:
78-
os.Exit(0)
79-
}
80-
}()
81-
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)
82-
83-
if err := f.Validate(); err != nil {
84-
return err
85-
}
86-
87-
o, err := f.ToOptions()
88-
if err != nil {
89-
return err
90-
}
91-
92-
return o.Run(ctx)
57+
return disruptionOpt.Run()
9358
},
9459
}
95-
96-
f.AddFlags(cmd.Flags())
97-
60+
cmd.Flags().StringVar(&disruptionOpt.ArtifactDir,
61+
"artifact-dir", disruptionOpt.ArtifactDir,
62+
"The directory where monitor events will be stored.")
63+
cmd.Flags().StringVar(&disruptionOpt.LoadBalancerType,
64+
"lb-type", disruptionOpt.LoadBalancerType,
65+
"Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
66+
cmd.Flags().StringVar(&disruptionOpt.ExtraMessage,
67+
"extra-message", disruptionOpt.ExtraMessage,
68+
"Add custom label to disruption event message")
9869
return cmd
9970
}
10071

101-
func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) {
102-
flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
103-
flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.")
104-
105-
f.ConfigFlags.AddFlags(flags)
106-
f.OutputFlags.BindFlags(flags)
107-
}
108-
109-
func (f *RunAPIDisruptionMonitorFlags) SetIOStreams(streams genericclioptions.IOStreams) {
110-
f.IOStreams = streams
111-
}
112-
113-
func (f *RunAPIDisruptionMonitorFlags) Validate() error {
114-
if len(f.OutputFlags.OutFile) == 0 {
115-
return fmt.Errorf("output-file must be specified")
116-
}
117-
if len(f.StopConfigMapName) == 0 {
118-
return fmt.Errorf("stop-configmap must be specified")
119-
}
120-
121-
return nil
122-
}
123-
124-
func (f *RunAPIDisruptionMonitorFlags) ToOptions() (*RunAPIDisruptionMonitorOptions, error) {
125-
originalOutStream := f.IOStreams.Out
126-
closeFn, err := f.OutputFlags.ConfigureIOStreams(f.IOStreams, f)
72+
func (opt *RunAPIDisruptionMonitorOptions) Run() error {
73+
restConfig, err := monitor.GetMonitorRESTConfig()
12774
if err != nil {
128-
return nil, err
75+
return err
12976
}
13077

131-
namespace, _, err := f.ConfigFlags.ToRawKubeConfigLoader().Namespace()
132-
if err != nil {
133-
return nil, err
134-
}
135-
if len(namespace) == 0 {
136-
return nil, fmt.Errorf("namespace must be specified")
137-
}
78+
lb := backend.ParseStringToLoadBalancerType(opt.LoadBalancerType)
13879

139-
restConfig, err := f.ConfigFlags.ToRESTConfig()
140-
if err != nil {
141-
return nil, err
142-
}
143-
kubeClient, err := kubernetes.NewForConfig(restConfig)
80+
ctx, cancelFn := context.WithCancel(context.Background())
81+
defer cancelFn()
82+
abortCh := make(chan os.Signal, 2)
83+
go func() {
84+
<-abortCh
85+
fmt.Fprintf(opt.ErrOut, "Interrupted, terminating\n")
86+
// Give some time to store intervals on disk
87+
time.Sleep(5 * time.Second)
88+
cancelFn()
89+
sig := <-abortCh
90+
fmt.Fprintf(opt.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
91+
switch sig {
92+
case syscall.SIGINT:
93+
os.Exit(130)
94+
default:
95+
os.Exit(0)
96+
}
97+
}()
98+
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)
99+
100+
recorder, err := StartAPIAvailability(ctx, restConfig, lb)
144101
if err != nil {
145-
return nil, err
102+
return err
146103
}
147104

148-
return &RunAPIDisruptionMonitorOptions{
149-
KubeClient: kubeClient,
150-
KubeClientConfig: restConfig,
151-
OutputFile: f.OutputFlags.OutFile,
152-
LoadBalancerType: f.LoadBalancerType,
153-
StopConfigMapName: f.StopConfigMapName,
154-
Namespace: namespace,
155-
CloseFn: closeFn,
156-
OriginalOutFile: originalOutStream,
157-
IOStreams: f.IOStreams,
158-
}, nil
159-
}
160-
161-
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
162-
type RunAPIDisruptionMonitorOptions struct {
163-
KubeClient kubernetes.Interface
164-
KubeClientConfig *rest.Config
165-
OutputFile string
166-
LoadBalancerType string
167-
StopConfigMapName string
168-
Namespace string
169-
170-
OriginalOutFile io.Writer
171-
CloseFn iooptions.CloseFunc
172-
genericclioptions.IOStreams
173-
}
105+
go func() {
106+
ticker := time.NewTicker(100 * time.Millisecond)
107+
defer ticker.Stop()
108+
var last time.Time
109+
done := false
110+
for !done {
111+
select {
112+
case <-ticker.C:
113+
case <-ctx.Done():
114+
done = true
115+
}
116+
events := recorder.Intervals(last, time.Time{})
117+
if len(events) > 0 {
118+
for _, event := range events {
119+
if !event.From.Equal(event.To) {
120+
continue
121+
}
122+
fmt.Fprintln(opt.Out, event.String())
123+
}
124+
last = events[len(events)-1].From
125+
}
126+
}
127+
}()
174128

175-
func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error {
176-
ctx, cancelFn := context.WithCancel(ctx)
177-
defer cancelFn()
129+
<-ctx.Done()
178130

179-
fmt.Fprintf(o.Out, "Starting up.")
131+
// Store intervals to artifact directory
132+
intervals := recorder.Intervals(time.Time{}, time.Time{})
133+
if len(opt.ExtraMessage) > 0 {
134+
fmt.Fprintf(opt.Out, "\nAppending %s to recorded event message\n", opt.ExtraMessage)
135+
for i, event := range intervals {
136+
intervals[i].Message = fmt.Sprintf("%s user-provided-message=%s", event.Message, opt.ExtraMessage)
137+
}
138+
}
180139

181-
startingContent, err := os.ReadFile(o.OutputFile)
182-
if err != nil && !os.IsNotExist(err) {
140+
eventDir := filepath.Join(opt.ArtifactDir, monitorapi.EventDir)
141+
if err := os.MkdirAll(eventDir, os.ModePerm); err != nil {
142+
fmt.Printf("Failed to create monitor-events directory, err: %v\n", err)
183143
return err
184144
}
185-
if len(startingContent) > 0 {
186-
// print starting content to the log so that we can simply scrape the log to find all entries at the end.
187-
o.OriginalOutFile.Write(startingContent)
188-
}
189-
190-
lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType)
191145

192-
recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
193-
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb)
194-
if err != nil {
146+
timeSuffix := fmt.Sprintf("_%s", time.Now().UTC().Format("20060102-150405"))
147+
if err := monitorserialization.EventsToFile(filepath.Join(eventDir, fmt.Sprintf("e2e-events%s.json", timeSuffix)), intervals); err != nil {
148+
fmt.Printf("Failed to write event data, err: %v\n", err)
195149
return err
196150
}
151+
fmt.Fprintf(opt.Out, "\nEvent data written, exiting\n")
197152

198-
go func(ctx context.Context) {
199-
defer cancelFn()
200-
err := o.WaitForStopSignal(ctx)
201-
if err != nil {
202-
fmt.Fprintf(o.ErrOut, "failure waiting for stop: %v", err)
203-
}
204-
}(ctx)
153+
return nil
154+
}
205155

206-
<-ctx.Done()
156+
// StartAPIAvailability monitors just the cluster availability
157+
func StartAPIAvailability(ctx context.Context, restConfig *rest.Config, lb backend.LoadBalancerType) (monitorapi.Recorder, error) {
158+
recorder := monitor.NewRecorder()
207159

208-
fmt.Fprintf(o.Out, "waiting for samplers to stop")
209-
wg := sync.WaitGroup{}
210-
for i := range samplers {
211-
wg.Add(1)
212-
func(sampler disruptionci.Sampler) {
213-
defer wg.Done()
214-
sampler.Stop()
215-
}(samplers[i])
160+
client, err := kubernetes.NewForConfig(restConfig)
161+
if err != nil {
162+
return nil, err
163+
}
164+
if err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, restConfig, lb); err != nil {
165+
return nil, err
216166
}
217-
wg.Wait()
218-
fmt.Fprintf(o.Out, "samplers stopped")
219-
220-
return nil
221-
}
222167

223-
func (o *RunAPIDisruptionMonitorOptions) WaitForStopSignal(ctx context.Context) error {
224-
defer utilruntime.HandleCrash()
225-
226-
_, err := watch.UntilWithSync(
227-
ctx,
228-
cache.NewListWatchFromClient(
229-
o.KubeClient.CoreV1().RESTClient(), "configmaps", o.Namespace, fields.OneTermEqualSelector("metadata.name", o.StopConfigMapName)),
230-
&corev1.ConfigMap{},
231-
nil,
232-
func(event apimachinerywatch.Event) (bool, error) {
233-
switch event.Type {
234-
case apimachinerywatch.Added:
235-
return true, nil
236-
case apimachinerywatch.Modified:
237-
return true, nil
238-
}
239-
return false, nil
240-
},
241-
)
242-
return err
168+
// read the state of the cluster apiserver client access issues *before* any test (like upgrade) begins
169+
intervals, err := apiserveravailability.APIServerAvailabilityIntervalsFromCluster(client, time.Time{}, time.Time{})
170+
if err != nil {
171+
klog.Errorf("error reading initial apiserver availability: %v", err)
172+
}
173+
recorder.AddIntervals(intervals...)
174+
return recorder, nil
243175
}

0 commit comments

Comments
 (0)