Skip to content

Commit 66155b9

Browse files
committed
Add concurrency test
Use 10 concurrent clients to send http request to remote server and download 10M bytes content from the server. We could test protocols other than http as well. Fix Issue 26
1 parent 3b44a3d commit 66155b9

File tree

3 files changed

+167
-9
lines changed

3 files changed

+167
-9
lines changed

pkg/agent/client/client.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"math/rand"
2424
"net"
25+
"sync"
2526
"time"
2627

2728
"google.golang.org/grpc"
@@ -43,9 +44,11 @@ type dialResult struct {
4344

4445
// grpcTunnel implements Tunnel
4546
type grpcTunnel struct {
46-
stream agent.ProxyService_ProxyClient
47-
pendingDial map[int64]chan<- dialResult
48-
conns map[int64]*conn
47+
stream agent.ProxyService_ProxyClient
48+
pendingDial map[int64]chan<- dialResult
49+
conns map[int64]*conn
50+
pendingDialLock sync.RWMutex
51+
connsLock sync.RWMutex
4952
}
5053

5154
// CreateGrpcTunnel creates a Tunnel to dial to a remote server through a
@@ -90,7 +93,11 @@ func (t *grpcTunnel) serve() {
9093
switch pkt.Type {
9194
case agent.PacketType_DIAL_RSP:
9295
resp := pkt.GetDialResponse()
93-
if ch, ok := t.pendingDial[resp.Random]; !ok {
96+
t.pendingDialLock.RLock()
97+
ch, ok := t.pendingDial[resp.Random]
98+
t.pendingDialLock.RUnlock()
99+
100+
if !ok {
94101
klog.Warning("DialResp not recognized; dropped")
95102
} else {
96103
ch <- dialResult{
@@ -101,18 +108,28 @@ func (t *grpcTunnel) serve() {
101108
case agent.PacketType_DATA:
102109
resp := pkt.GetData()
103110
// TODO: flow control
104-
if conn, ok := t.conns[resp.ConnectID]; ok {
111+
t.connsLock.RLock()
112+
conn, ok := t.conns[resp.ConnectID]
113+
t.connsLock.RUnlock()
114+
115+
if ok {
105116
conn.readCh <- resp.Data
106117
} else {
107118
klog.Warningf("connection id %d not recognized", resp.ConnectID)
108119
}
109120
case agent.PacketType_CLOSE_RSP:
110121
resp := pkt.GetCloseResponse()
111-
if conn, ok := t.conns[resp.ConnectID]; ok {
122+
t.connsLock.RLock()
123+
conn, ok := t.conns[resp.ConnectID]
124+
t.connsLock.RUnlock()
125+
126+
if ok {
112127
close(conn.readCh)
113128
conn.closeCh <- resp.Error
114129
close(conn.closeCh)
130+
t.connsLock.Lock()
115131
delete(t.conns, resp.ConnectID)
132+
t.connsLock.Unlock()
116133
} else {
117134
klog.Warningf("connection id %d not recognized", resp.ConnectID)
118135
}
@@ -129,9 +146,13 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
129146

130147
random := rand.Int63()
131148
resCh := make(chan dialResult)
149+
t.pendingDialLock.Lock()
132150
t.pendingDial[random] = resCh
151+
t.pendingDialLock.Unlock()
133152
defer func() {
153+
t.pendingDialLock.Lock()
134154
delete(t.pendingDial, random)
155+
t.pendingDialLock.Unlock()
135156
}()
136157

137158
req := &agent.Packet{
@@ -163,7 +184,9 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
163184
c.connID = res.connid
164185
c.readCh = make(chan []byte, 10)
165186
c.closeCh = make(chan string)
187+
t.connsLock.Lock()
166188
t.conns[res.connid] = c
189+
t.connsLock.Unlock()
167190
case <-time.After(30 * time.Second):
168191
return nil, errors.New("dial timeout")
169192
}

tests/concurrent_test.go

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package tests
2+
3+
import (
4+
"io/ioutil"
5+
"net/http"
6+
"net/http/httptest"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"google.golang.org/grpc"
12+
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/client"
13+
)
14+
15+
func TestProxy_Concurrency(t *testing.T) {
16+
length := 1 << 20
17+
chunks := 10
18+
server := httptest.NewServer(newSizedServer(length, chunks))
19+
defer server.Close()
20+
21+
stopCh := make(chan struct{})
22+
defer close(stopCh)
23+
24+
proxy, cleanup, err := runGRPCProxyServer()
25+
defer cleanup()
26+
27+
if err := runAgent(proxy.agent, stopCh); err != nil {
28+
t.Fatal(err)
29+
}
30+
31+
// Wait for agent to register on proxy server
32+
time.Sleep(time.Second)
33+
34+
// run test client
35+
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
36+
if err != nil {
37+
t.Fatal(err)
38+
}
39+
40+
var wg sync.WaitGroup
41+
verify := func() {
42+
defer wg.Done()
43+
44+
c := &http.Client{
45+
Transport: &http.Transport{
46+
Dial: tunnel.Dial,
47+
},
48+
}
49+
50+
r, err := c.Get(server.URL)
51+
if err != nil {
52+
t.Error(err)
53+
}
54+
55+
data, err := ioutil.ReadAll(r.Body)
56+
if err != nil {
57+
t.Error(err)
58+
}
59+
60+
if len(data) != length*chunks {
61+
t.Errorf("expect data length %d; got %d", length*chunks, len(data))
62+
}
63+
}
64+
65+
concurrency := 10
66+
wg.Add(concurrency)
67+
for i := 0; i < concurrency; i++ {
68+
go verify()
69+
}
70+
wg.Wait()
71+
}

tests/proxy_test.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,32 @@ import (
2121

2222
// test remote server
2323
type testServer struct {
24+
echo []byte
25+
chunks int
26+
}
27+
28+
func newEchoServer(echo string) *testServer {
29+
return &testServer{
30+
echo: []byte(echo),
31+
chunks: 1,
32+
}
33+
}
34+
35+
func newSizedServer(length, chunks int) *testServer {
36+
return &testServer{
37+
echo: make([]byte, length),
38+
chunks: chunks,
39+
}
2440
}
2541

2642
func (s *testServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
27-
w.Write([]byte("hello"))
43+
for i := 0; i < s.chunks; i++ {
44+
w.Write(s.echo)
45+
}
2846
}
2947

3048
func TestBasicProxy_GRPC(t *testing.T) {
31-
server := httptest.NewServer(&testServer{})
49+
server := httptest.NewServer(newEchoServer("hello"))
3250
defer server.Close()
3351

3452
stopCh := make(chan struct{})
@@ -74,8 +92,54 @@ func TestBasicProxy_GRPC(t *testing.T) {
7492
}
7593
}
7694

95+
func TestProxy_LargeResponse(t *testing.T) {
96+
length := 1 << 20 // 1M
97+
chunks := 10
98+
server := httptest.NewServer(newSizedServer(length, chunks))
99+
defer server.Close()
100+
101+
stopCh := make(chan struct{})
102+
defer close(stopCh)
103+
104+
proxy, cleanup, err := runGRPCProxyServer()
105+
defer cleanup()
106+
107+
if err := runAgent(proxy.agent, stopCh); err != nil {
108+
t.Fatal(err)
109+
}
110+
111+
// Wait for agent to register on proxy server
112+
time.Sleep(time.Second)
113+
114+
// run test client
115+
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
116+
if err != nil {
117+
t.Fatal(err)
118+
}
119+
120+
c := &http.Client{
121+
Transport: &http.Transport{
122+
Dial: tunnel.Dial,
123+
},
124+
}
125+
126+
r, err := c.Get(server.URL)
127+
if err != nil {
128+
t.Error(err)
129+
}
130+
131+
data, err := ioutil.ReadAll(r.Body)
132+
if err != nil {
133+
t.Error(err)
134+
}
135+
136+
if len(data) != length*chunks {
137+
t.Errorf("expect data length %d; got %d", length*chunks, len(data))
138+
}
139+
}
140+
77141
func TestBasicProxy_HTTPCONN(t *testing.T) {
78-
server := httptest.NewServer(&testServer{})
142+
server := httptest.NewServer(newEchoServer("hello"))
79143
defer server.Close()
80144

81145
stopCh := make(chan struct{})

0 commit comments

Comments
 (0)