Skip to content

Commit aa4113d

Browse files
committed
Set up connection onClose prior to adding to connection map
1 parent df908c3 commit aa4113d

File tree

2 files changed

+74
-5
lines changed

2 files changed

+74
-5
lines changed

staging/src/k8s.io/client-go/util/connrotation/connrotation.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,6 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
7777

7878
closable := &closableConn{Conn: conn}
7979

80-
// Start tracking the connection
81-
d.mu.Lock()
82-
d.conns[closable] = struct{}{}
83-
d.mu.Unlock()
84-
8580
// When the connection is closed, remove it from the map. This will
8681
// be no-op if the connection isn't in the map, e.g. if CloseAll()
8782
// is called.
@@ -91,6 +86,11 @@ func (d *Dialer) DialContext(ctx context.Context, network, address string) (net.
9186
d.mu.Unlock()
9287
}
9388

89+
// Start tracking the connection
90+
d.mu.Lock()
91+
d.conns[closable] = struct{}{}
92+
d.mu.Unlock()
93+
9494
return closable, nil
9595
}
9696

staging/src/k8s.io/client-go/util/connrotation/connrotation_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package connrotation
1919
import (
2020
"context"
2121
"net"
22+
"sync"
23+
"sync/atomic"
2224
"testing"
2325
"time"
2426
)
@@ -50,6 +52,73 @@ func TestCloseAll(t *testing.T) {
5052
}
5153
}
5254

55+
// TestCloseAllRace ensures CloseAll works with connections being simultaneously dialed
56+
func TestCloseAllRace(t *testing.T) {
57+
conns := int64(0)
58+
dialer := NewDialer(func(ctx context.Context, network, address string) (net.Conn, error) {
59+
return closeOnlyConn{onClose: func() { atomic.AddInt64(&conns, -1) }}, nil
60+
})
61+
62+
done := make(chan struct{})
63+
wg := &sync.WaitGroup{}
64+
65+
// Close all as fast as we can
66+
wg.Add(1)
67+
go func() {
68+
defer wg.Done()
69+
for {
70+
select {
71+
case <-done:
72+
return
73+
default:
74+
dialer.CloseAll()
75+
}
76+
}
77+
}()
78+
79+
// Dial as fast as we can
80+
wg.Add(1)
81+
go func() {
82+
defer wg.Done()
83+
for {
84+
select {
85+
case <-done:
86+
return
87+
default:
88+
if _, err := dialer.Dial("", ""); err != nil {
89+
t.Error(err)
90+
return
91+
}
92+
atomic.AddInt64(&conns, 1)
93+
}
94+
}
95+
}()
96+
97+
// Soak to ensure no races
98+
time.Sleep(time.Second)
99+
100+
// Signal completion
101+
close(done)
102+
// Wait for goroutines
103+
wg.Wait()
104+
// Ensure CloseAll ran after all dials
105+
dialer.CloseAll()
106+
107+
// Expect all connections to close within 5 seconds
108+
for start := time.Now(); time.Now().Sub(start) < 5*time.Second; time.Sleep(10 * time.Millisecond) {
109+
// Ensure all connections were closed
110+
if c := atomic.LoadInt64(&conns); c == 0 {
111+
break
112+
} else {
113+
t.Logf("got %d open connections, want 0, will retry", c)
114+
}
115+
}
116+
// Ensure all connections were closed
117+
if c := atomic.LoadInt64(&conns); c != 0 {
118+
t.Fatalf("got %d open connections, want 0", c)
119+
}
120+
}
121+
53122
type closeOnlyConn struct {
54123
net.Conn
55124
onClose func()

0 commit comments

Comments
 (0)