Skip to content

Commit df7c9f1

Browse files
authored
Merge pull request #520 from tallclair/skew-testing
Refactor integration tests to prepare for proxy-server & agent skew testing
2 parents 898de05 + 787d095 commit df7c9f1

13 files changed

+601
-562
lines changed

tests/agent_disconnect_test.go

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,13 @@ import (
3131

3232
"google.golang.org/grpc"
3333
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
34+
"sigs.k8s.io/apiserver-network-proxy/tests/framework"
3435
)
3536

36-
func TestProxy_Agent_Disconnect_HTTP_Persistent_Connection(t *testing.T) {
37+
func TestProxy_Agent_Disconnect_Persistent_Connection(t *testing.T) {
3738
testcases := []struct {
3839
name string
39-
proxyServerFunction func() (proxy, func(), error)
40+
proxyServerFunction func(t testing.TB) framework.ProxyServer
4041
clientFunction func(context.Context, string, string) (*http.Client, error)
4142
}{
4243
{
@@ -57,33 +58,28 @@ func TestProxy_Agent_Disconnect_HTTP_Persistent_Connection(t *testing.T) {
5758
server := httptest.NewServer(newEchoServer("hello"))
5859
defer server.Close()
5960

60-
stopCh := make(chan struct{})
61+
ps := tc.proxyServerFunction(t)
62+
defer ps.Stop()
6163

62-
proxy, cleanup, err := tc.proxyServerFunction()
63-
if err != nil {
64-
t.Fatal(err)
65-
}
66-
defer cleanup()
67-
68-
runAgent(proxy.agent, stopCh)
69-
waitForConnectedAgentCount(t, 1, proxy.server)
64+
a := runAgent(t, ps.AgentAddr())
65+
waitForConnectedAgentCount(t, 1, ps)
7066

7167
// run test client
7268

73-
c, err := tc.clientFunction(ctx, proxy.front, server.URL)
69+
c, err := tc.clientFunction(ctx, ps.FrontAddr(), server.URL)
7470
if err != nil {
75-
t.Errorf("error obtaining client: %v", err)
71+
t.Fatalf("error obtaining client: %v", err)
7672
}
7773

7874
_, err = clientRequest(c, server.URL)
7975

8076
if err != nil {
8177
t.Errorf("expected no error on proxy request, got %v", err)
8278
}
83-
close(stopCh)
79+
a.Stop()
8480

8581
// Wait for the agent to disconnect
86-
waitForConnectedAgentCount(t, 0, proxy.server)
82+
waitForConnectedAgentCount(t, 0, ps)
8783

8884
// Reuse same client to make the request
8985
_, err = clientRequest(c, server.URL)
@@ -99,7 +95,7 @@ func TestProxy_Agent_Disconnect_HTTP_Persistent_Connection(t *testing.T) {
9995
func TestProxy_Agent_Reconnect(t *testing.T) {
10096
testcases := []struct {
10197
name string
102-
proxyServerFunction func() (proxy, func(), error)
98+
proxyServerFunction func(testing.TB) framework.ProxyServer
10399
clientFunction func(context.Context, string, string) (*http.Client, error)
104100
}{
105101
{
@@ -121,43 +117,37 @@ func TestProxy_Agent_Reconnect(t *testing.T) {
121117
server := httptest.NewServer(newEchoServer("hello"))
122118
defer server.Close()
123119

124-
stopCh := make(chan struct{})
125-
126-
proxy, cleanup, err := tc.proxyServerFunction()
127-
if err != nil {
128-
t.Fatal(err)
129-
}
130-
defer cleanup()
120+
ps := tc.proxyServerFunction(t)
121+
defer ps.Stop()
131122

132-
cs1 := runAgent(proxy.agent, stopCh)
133-
waitForConnectedServerCount(t, 1, cs1)
123+
ai1 := runAgent(t, ps.AgentAddr())
124+
waitForConnectedServerCount(t, 1, ai1)
134125

135126
// run test client
136127

137-
c, err := tc.clientFunction(ctx, proxy.front, server.URL)
128+
c, err := tc.clientFunction(ctx, ps.FrontAddr(), server.URL)
138129
if err != nil {
139-
t.Errorf("error obtaining client: %v", err)
130+
t.Fatalf("error obtaining client: %v", err)
140131
}
141132

142133
_, err = clientRequest(c, server.URL)
143134
if err != nil {
144135
t.Errorf("expected no error on proxy request, got %v", err)
145136
}
146-
close(stopCh)
137+
ai1.Stop()
147138

148139
// Wait for the agent to disconnect
149-
waitForConnectedAgentCount(t, 0, proxy.server)
140+
waitForConnectedAgentCount(t, 0, ps)
150141

151142
// Reconnect agent
152-
stopCh2 := make(chan struct{})
153-
defer close(stopCh2)
154-
cs2 := runAgent(proxy.agent, stopCh2)
155-
waitForConnectedServerCount(t, 1, cs2)
143+
ai2 := runAgent(t, ps.AgentAddr())
144+
defer ai2.Stop()
145+
waitForConnectedServerCount(t, 1, ai2)
156146

157147
// Proxy requests should work again after agent reconnects
158-
c2, err := tc.clientFunction(ctx, proxy.front, server.URL)
148+
c2, err := tc.clientFunction(ctx, ps.FrontAddr(), server.URL)
159149
if err != nil {
160-
t.Errorf("error obtaining client: %v", err)
150+
t.Fatalf("error obtaining client: %v", err)
161151
}
162152

163153
_, err = clientRequest(c2, server.URL)

tests/benchmarks_test.go

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,17 +39,11 @@ func BenchmarkLargeResponse_GRPC(b *testing.B) {
3939
server := httptest.NewServer(newSizedServer(length, chunks))
4040
defer server.Close()
4141

42-
stopCh := make(chan struct{})
43-
defer close(stopCh)
42+
ps := runGRPCProxyServer(b)
43+
defer ps.Stop()
4444

45-
proxy, cleanup, err := runGRPCProxyServer()
46-
if err != nil {
47-
b.Fatal(err)
48-
}
49-
defer cleanup()
50-
51-
clientset := runAgent(proxy.agent, stopCh)
52-
waitForConnectedServerCount(b, 1, clientset)
45+
a := runAgent(b, ps.AgentAddr())
46+
waitForConnectedServerCount(b, 1, a)
5347

5448
req, err := http.NewRequest("GET", server.URL, nil)
5549
if err != nil {
@@ -59,7 +53,7 @@ func BenchmarkLargeResponse_GRPC(b *testing.B) {
5953

6054
for n := 0; n < b.N; n++ {
6155
// run test client
62-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, proxy.front, grpc.WithInsecure())
56+
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
6357
if err != nil {
6458
b.Fatal(err)
6559
}
@@ -113,17 +107,11 @@ func BenchmarkLargeRequest_GRPC(b *testing.B) {
113107
}))
114108
defer server.Close()
115109

116-
stopCh := make(chan struct{})
117-
defer close(stopCh)
118-
119-
proxy, cleanup, err := runGRPCProxyServer()
120-
if err != nil {
121-
b.Fatal(err)
122-
}
123-
defer cleanup()
110+
ps := runGRPCProxyServer(b)
111+
defer ps.Stop()
124112

125-
clientset := runAgent(proxy.agent, stopCh)
126-
waitForConnectedServerCount(b, 1, clientset)
113+
a := runAgent(b, ps.AgentAddr())
114+
waitForConnectedServerCount(b, 1, a)
127115

128116
bodyBytes := make([]byte, length)
129117
body := bytes.NewReader(bodyBytes)
@@ -134,7 +122,7 @@ func BenchmarkLargeRequest_GRPC(b *testing.B) {
134122
req.Close = true
135123
for n := 0; n < b.N; n++ {
136124
// run test client
137-
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, proxy.front, grpc.WithInsecure())
125+
tunnel, err := client.CreateSingleUseGrpcTunnel(ctx, ps.FrontAddr(), grpc.WithInsecure())
138126
if err != nil {
139127
b.Fatal(err)
140128
}

tests/concurrent_client_request_test.go

Lines changed: 42 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -19,39 +19,34 @@ package tests
1919
import (
2020
"bytes"
2121
"context"
22-
"fmt"
2322
"io"
2423
"net/http"
2524
"net/http/httptest"
25+
"strconv"
2626
"sync"
2727
"testing"
28+
"time"
2829

2930
"google.golang.org/grpc"
3031
"k8s.io/apimachinery/pkg/util/wait"
3132
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
32-
"sigs.k8s.io/apiserver-network-proxy/pkg/server"
33-
"sigs.k8s.io/apiserver-network-proxy/proto/header"
3433
)
3534

3635
type simpleServer struct {
37-
receivedSecondReq chan struct{}
36+
mu sync.Mutex
3837
}
3938

40-
// ServeHTTP blocks the response to the request whose body is "1" until a
41-
// request whose body is "2" is handled.
4239
func (s *simpleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
40+
s.mu.Lock()
41+
defer s.mu.Unlock()
42+
43+
time.Sleep(time.Millisecond)
44+
4345
bytes, err := io.ReadAll(req.Body)
4446
if err != nil {
4547
w.Write([]byte(err.Error()))
4648
}
47-
if string(bytes) == "2" {
48-
close(s.receivedSecondReq)
49-
w.Write([]byte("2"))
50-
}
51-
if string(bytes) == "1" {
52-
<-s.receivedSecondReq
53-
w.Write([]byte("1"))
54-
}
49+
w.Write(bytes)
5550
}
5651

5752
// TODO: test http-connect as well.
@@ -70,121 +65,45 @@ func getTestClient(front string, t *testing.T) *http.Client {
7065
}
7166
}
7267

73-
// singleTimeManager makes sure that a backend only serves one request.
74-
type singleTimeManager struct {
75-
mu sync.Mutex
76-
backends map[string]server.Backend
77-
used map[string]struct{}
78-
}
79-
80-
func (s *singleTimeManager) AddBackend(agentID string, _ header.IdentifierType, backend server.Backend) {
81-
s.mu.Lock()
82-
defer s.mu.Unlock()
83-
s.backends[agentID] = backend
84-
}
85-
86-
func (s *singleTimeManager) RemoveBackend(agentID string, _ header.IdentifierType, backend server.Backend) {
87-
s.mu.Lock()
88-
defer s.mu.Unlock()
89-
v, ok := s.backends[agentID]
90-
if !ok {
91-
panic(fmt.Errorf("no backends found for %s", agentID))
92-
}
93-
if v != backend {
94-
panic(fmt.Errorf("recorded backend %v does not match %v", &v, backend))
95-
}
96-
delete(s.backends, agentID)
97-
}
98-
99-
func (s *singleTimeManager) Backend(_ context.Context) (server.Backend, error) {
100-
s.mu.Lock()
101-
defer s.mu.Unlock()
102-
for k, v := range s.backends {
103-
if _, ok := s.used[k]; !ok {
104-
s.used[k] = struct{}{}
105-
return v, nil
106-
}
107-
}
108-
return nil, fmt.Errorf("cannot find backend to a new agent")
109-
}
110-
111-
func (s *singleTimeManager) GetBackend(agentID string) server.Backend {
112-
return nil
113-
}
114-
115-
func (s *singleTimeManager) NumBackends() int {
116-
return 0
117-
}
118-
119-
func newSingleTimeGetter(m *server.DefaultBackendManager) *singleTimeManager {
120-
return &singleTimeManager{
121-
used: make(map[string]struct{}),
122-
backends: make(map[string]server.Backend),
123-
}
124-
}
125-
126-
var _ server.BackendManager = &singleTimeManager{}
127-
128-
func (s *singleTimeManager) Ready() (bool, string) {
129-
return true, ""
130-
}
131-
13268
func TestConcurrentClientRequest(t *testing.T) {
133-
s := httptest.NewServer(&simpleServer{receivedSecondReq: make(chan struct{})})
69+
const numConcurrentRequests = 100
70+
s := httptest.NewServer(&simpleServer{})
13471
defer s.Close()
13572

136-
proxy, ps, cleanup, err := runGRPCProxyServerWithServerCount(1)
137-
if err != nil {
138-
t.Fatal(err)
139-
}
140-
defer cleanup()
141-
ps.BackendManagers = []server.BackendManager{newSingleTimeGetter(server.NewDefaultBackendManager())}
73+
ps := runGRPCProxyServerWithServerCount(t, 1)
74+
defer ps.Stop()
14275

143-
stopCh := make(chan struct{})
144-
defer close(stopCh)
14576
// Run two agents
146-
cs1 := runAgentWithID("a", proxy.agent, stopCh)
147-
cs2 := runAgentWithID("b", proxy.agent, stopCh)
148-
waitForConnectedServerCount(t, 1, cs1)
149-
waitForConnectedServerCount(t, 1, cs2)
77+
a1 := runAgent(t, ps.AgentAddr())
78+
a2 := runAgent(t, ps.AgentAddr())
79+
defer a1.Stop()
80+
defer a2.Stop()
81+
waitForConnectedServerCount(t, 1, a1)
82+
waitForConnectedServerCount(t, 1, a2)
15083

151-
client1 := getTestClient(proxy.front, t)
152-
client2 := getTestClient(proxy.front, t)
15384
var wg sync.WaitGroup
154-
wg.Add(2)
155-
go func() {
156-
defer wg.Done()
157-
r, err := client1.Post(s.URL, "text/plain", bytes.NewBufferString("1"))
158-
if err != nil {
159-
t.Error(err)
160-
return
161-
}
162-
data, err := io.ReadAll(r.Body)
163-
if err != nil {
164-
t.Error(err)
165-
}
166-
r.Body.Close()
167-
168-
if string(data) != "1" {
169-
t.Errorf("expect %v; got %v", "1", string(data))
170-
}
171-
}()
172-
go func() {
173-
defer wg.Done()
174-
r, err := client2.Post(s.URL, "text/plain", bytes.NewBufferString("2"))
175-
if err != nil {
176-
t.Error(err)
177-
return
178-
}
179-
data, err := io.ReadAll(r.Body)
180-
if err != nil {
181-
t.Error(err)
182-
}
183-
r.Body.Close()
184-
185-
if string(data) != "2" {
186-
t.Errorf("expect %v; got %v", "2", string(data))
187-
}
188-
}()
85+
wg.Add(numConcurrentRequests)
86+
for i := 0; i < numConcurrentRequests; i++ {
87+
id := i
88+
go func() {
89+
defer wg.Done()
90+
client1 := getTestClient(ps.FrontAddr(), t)
91+
92+
r, err := client1.Post(s.URL, "text/plain", bytes.NewBufferString(strconv.Itoa(id)))
93+
if err != nil {
94+
t.Error(err)
95+
return
96+
}
97+
data, err := io.ReadAll(r.Body)
98+
if err != nil {
99+
t.Error(err)
100+
}
101+
r.Body.Close()
102+
103+
if string(data) != strconv.Itoa(id) {
104+
t.Errorf("expect %d; got %s", id, string(data))
105+
}
106+
}()
107+
}
189108
wg.Wait()
190109
}

0 commit comments

Comments
 (0)