Skip to content

Commit a04f1b8

Browse files
authored
Merge pull request kubernetes#93046 from oomichi/WatchUntilWithoutRetry
Move WatchUntilWithoutRetry() from e2e framework
2 parents 24ec90d + c86caa6 commit a04f1b8

File tree

2 files changed

+56
-56
lines changed

2 files changed

+56
-56
lines changed

test/e2e/apps/rc.go

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ var _ = SIGDescribe("ReplicationController", func() {
147147
eventFound := false
148148
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
149149
defer cancel()
150-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
150+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
151151
if watchEvent.Type != watch.Added {
152152
return false, nil
153153
}
@@ -162,7 +162,7 @@ var _ = SIGDescribe("ReplicationController", func() {
162162
eventFound = false
163163
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
164164
defer cancel()
165-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
165+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
166166
var rc *v1.ReplicationController
167167
rcBytes, err := json.Marshal(watchEvent.Object)
168168
if err != nil {
@@ -196,7 +196,7 @@ var _ = SIGDescribe("ReplicationController", func() {
196196
eventFound = false
197197
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
198198
defer cancel()
199-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
199+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
200200
if watchEvent.Type != watch.Modified {
201201
return false, nil
202202
}
@@ -224,7 +224,7 @@ var _ = SIGDescribe("ReplicationController", func() {
224224
eventFound = false
225225
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
226226
defer cancel()
227-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
227+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
228228
if watchEvent.Type != watch.Modified {
229229
return false, nil
230230
}
@@ -236,7 +236,7 @@ var _ = SIGDescribe("ReplicationController", func() {
236236
framework.ExpectEqual(eventFound, true, "failed to find RC %v event", watch.Added)
237237

238238
ginkgo.By("waiting for available Replicas")
239-
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
239+
_, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
240240
var rc *v1.ReplicationController
241241
rcBytes, err := json.Marshal(watchEvent.Object)
242242
if err != nil {
@@ -278,7 +278,7 @@ var _ = SIGDescribe("ReplicationController", func() {
278278
eventFound = false
279279
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
280280
defer cancel()
281-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
281+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
282282
if watchEvent.Type != watch.Modified {
283283
return false, nil
284284
}
@@ -291,7 +291,7 @@ var _ = SIGDescribe("ReplicationController", func() {
291291

292292
ginkgo.By("waiting for ReplicationController's scale to be the max amount")
293293
eventFound = false
294-
_, err = framework.WatchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
294+
_, err = watchUntilWithoutRetry(context.TODO(), retryWatcher, func(watchEvent watch.Event) (bool, error) {
295295
var rc *v1.ReplicationController
296296
rcBytes, err := json.Marshal(watchEvent.Object)
297297
if err != nil {
@@ -329,7 +329,7 @@ var _ = SIGDescribe("ReplicationController", func() {
329329
eventFound = false
330330
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
331331
defer cancel()
332-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
332+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
333333
if watchEvent.Type != watch.Modified {
334334
return false, nil
335335
}
@@ -366,7 +366,7 @@ var _ = SIGDescribe("ReplicationController", func() {
366366
eventFound = false
367367
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second)
368368
defer cancel()
369-
_, err = framework.WatchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
369+
_, err = watchUntilWithoutRetry(ctx, retryWatcher, func(watchEvent watch.Event) (bool, error) {
370370
if watchEvent.Type != watch.Deleted {
371371
return false, nil
372372
}
@@ -676,3 +676,50 @@ func updateReplicationControllerWithRetries(c clientset.Interface, namespace, na
676676
}
677677
return rc, pollErr
678678
}
679+
680+
// watchUntilWithoutRetry ...
681+
// reads items from the watch until each provided condition succeeds, and then returns the last watch
682+
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
683+
// If no event has been received, the returned event will be nil.
684+
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
685+
// Waits until context deadline or until context is canceled.
686+
//
687+
// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection
688+
func watchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
689+
ch := watcher.ResultChan()
690+
var lastEvent *watch.Event
691+
for _, condition := range conditions {
692+
// check the next condition against the previous event and short circuit waiting for the next watch
693+
if lastEvent != nil {
694+
done, err := condition(*lastEvent)
695+
if err != nil {
696+
return lastEvent, err
697+
}
698+
if done {
699+
continue
700+
}
701+
}
702+
ConditionSucceeded:
703+
for {
704+
select {
705+
case event, ok := <-ch:
706+
if !ok {
707+
return lastEvent, watchtools.ErrWatchClosed
708+
}
709+
lastEvent = &event
710+
711+
done, err := condition(event)
712+
if err != nil {
713+
return lastEvent, err
714+
}
715+
if done {
716+
break ConditionSucceeded
717+
}
718+
719+
case <-ctx.Done():
720+
return lastEvent, wait.ErrWaitTimeout
721+
}
722+
}
723+
}
724+
return lastEvent, nil
725+
}

test/e2e/framework/util.go

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1383,50 +1383,3 @@ retriesLoop:
13831383
break retriesLoop
13841384
}
13851385
}
1386-
1387-
// WatchUntilWithoutRetry ...
1388-
// reads items from the watch until each provided condition succeeds, and then returns the last watch
1389-
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
1390-
// If no event has been received, the returned event will be nil.
1391-
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
1392-
// Waits until context deadline or until context is canceled.
1393-
//
1394-
// the same as watchtools.UntilWithoutRetry, just without the closing of the watch - as for the purpose of being paired with WatchEventSequenceVerifier, the watch is needed for continual watch event collection
1395-
func WatchUntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
1396-
ch := watcher.ResultChan()
1397-
var lastEvent *watch.Event
1398-
for _, condition := range conditions {
1399-
// check the next condition against the previous event and short circuit waiting for the next watch
1400-
if lastEvent != nil {
1401-
done, err := condition(*lastEvent)
1402-
if err != nil {
1403-
return lastEvent, err
1404-
}
1405-
if done {
1406-
continue
1407-
}
1408-
}
1409-
ConditionSucceeded:
1410-
for {
1411-
select {
1412-
case event, ok := <-ch:
1413-
if !ok {
1414-
return lastEvent, watchtools.ErrWatchClosed
1415-
}
1416-
lastEvent = &event
1417-
1418-
done, err := condition(event)
1419-
if err != nil {
1420-
return lastEvent, err
1421-
}
1422-
if done {
1423-
break ConditionSucceeded
1424-
}
1425-
1426-
case <-ctx.Done():
1427-
return lastEvent, wait.ErrWaitTimeout
1428-
}
1429-
}
1430-
}
1431-
return lastEvent, nil
1432-
}

0 commit comments

Comments
 (0)