Skip to content

Commit 8100da1

Browse files
authored
Merge pull request #35 from anfernee/concurrent-test
Add concurrency test
2 parents 9028280 + f3ac3a8 commit 8100da1

File tree

4 files changed

+303
-9
lines changed

4 files changed

+303
-9
lines changed

pkg/agent/client/client.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"math/rand"
2424
"net"
25+
"sync"
2526
"time"
2627

2728
"google.golang.org/grpc"
@@ -43,9 +44,11 @@ type dialResult struct {
4344

4445
// grpcTunnel implements Tunnel
4546
type grpcTunnel struct {
46-
stream agent.ProxyService_ProxyClient
47-
pendingDial map[int64]chan<- dialResult
48-
conns map[int64]*conn
47+
stream agent.ProxyService_ProxyClient
48+
pendingDial map[int64]chan<- dialResult
49+
conns map[int64]*conn
50+
pendingDialLock sync.RWMutex
51+
connsLock sync.RWMutex
4952
}
5053

5154
// CreateGrpcTunnel creates a Tunnel to dial to a remote server through a
@@ -90,7 +93,11 @@ func (t *grpcTunnel) serve() {
9093
switch pkt.Type {
9194
case agent.PacketType_DIAL_RSP:
9295
resp := pkt.GetDialResponse()
93-
if ch, ok := t.pendingDial[resp.Random]; !ok {
96+
t.pendingDialLock.RLock()
97+
ch, ok := t.pendingDial[resp.Random]
98+
t.pendingDialLock.RUnlock()
99+
100+
if !ok {
94101
klog.Warning("DialResp not recognized; dropped")
95102
} else {
96103
ch <- dialResult{
@@ -101,18 +108,28 @@ func (t *grpcTunnel) serve() {
101108
case agent.PacketType_DATA:
102109
resp := pkt.GetData()
103110
// TODO: flow control
104-
if conn, ok := t.conns[resp.ConnectID]; ok {
111+
t.connsLock.RLock()
112+
conn, ok := t.conns[resp.ConnectID]
113+
t.connsLock.RUnlock()
114+
115+
if ok {
105116
conn.readCh <- resp.Data
106117
} else {
107118
klog.Warningf("connection id %d not recognized", resp.ConnectID)
108119
}
109120
case agent.PacketType_CLOSE_RSP:
110121
resp := pkt.GetCloseResponse()
111-
if conn, ok := t.conns[resp.ConnectID]; ok {
122+
t.connsLock.RLock()
123+
conn, ok := t.conns[resp.ConnectID]
124+
t.connsLock.RUnlock()
125+
126+
if ok {
112127
close(conn.readCh)
113128
conn.closeCh <- resp.Error
114129
close(conn.closeCh)
130+
t.connsLock.Lock()
115131
delete(t.conns, resp.ConnectID)
132+
t.connsLock.Unlock()
116133
} else {
117134
klog.Warningf("connection id %d not recognized", resp.ConnectID)
118135
}
@@ -129,9 +146,13 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
129146

130147
random := rand.Int63()
131148
resCh := make(chan dialResult)
149+
t.pendingDialLock.Lock()
132150
t.pendingDial[random] = resCh
151+
t.pendingDialLock.Unlock()
133152
defer func() {
153+
t.pendingDialLock.Lock()
134154
delete(t.pendingDial, random)
155+
t.pendingDialLock.Unlock()
135156
}()
136157

137158
req := &agent.Packet{
@@ -163,7 +184,9 @@ func (t *grpcTunnel) Dial(protocol, address string) (net.Conn, error) {
163184
c.connID = res.connid
164185
c.readCh = make(chan []byte, 10)
165186
c.closeCh = make(chan string)
187+
t.connsLock.Lock()
166188
t.conns[res.connid] = c
189+
t.connsLock.Unlock()
167190
case <-time.After(30 * time.Second):
168191
return nil, errors.New("dial timeout")
169192
}

tests/concurrent_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package tests
2+
3+
import (
4+
"io/ioutil"
5+
"net/http"
6+
"net/http/httptest"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"google.golang.org/grpc"
12+
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/client"
13+
)
14+
15+
func TestProxy_Concurrency(t *testing.T) {
16+
length := 1 << 20
17+
chunks := 10
18+
server := httptest.NewServer(newSizedServer(length, chunks))
19+
defer server.Close()
20+
21+
stopCh := make(chan struct{})
22+
defer close(stopCh)
23+
24+
proxy, cleanup, err := runGRPCProxyServer()
25+
if err != nil {
26+
t.Fatal(err)
27+
}
28+
defer cleanup()
29+
30+
if err := runAgent(proxy.agent, stopCh); err != nil {
31+
t.Fatal(err)
32+
}
33+
34+
// Wait for agent to register on proxy server
35+
time.Sleep(time.Second)
36+
37+
// run test client
38+
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
43+
var wg sync.WaitGroup
44+
verify := func() {
45+
defer wg.Done()
46+
47+
c := &http.Client{
48+
Transport: &http.Transport{
49+
Dial: tunnel.Dial,
50+
},
51+
}
52+
53+
r, err := c.Get(server.URL)
54+
if err != nil {
55+
t.Error(err)
56+
}
57+
58+
data, err := ioutil.ReadAll(r.Body)
59+
if err != nil {
60+
t.Error(err)
61+
}
62+
63+
if len(data) != length*chunks {
64+
t.Errorf("expect data length %d; got %d", length*chunks, len(data))
65+
}
66+
}
67+
68+
concurrency := 10
69+
wg.Add(concurrency)
70+
for i := 0; i < concurrency; i++ {
71+
go verify()
72+
}
73+
wg.Wait()
74+
}

tests/proxy_test.go

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,32 @@ import (
2121

2222
// test remote server
2323
type testServer struct {
24+
echo []byte
25+
chunks int
26+
}
27+
28+
func newEchoServer(echo string) *testServer {
29+
return &testServer{
30+
echo: []byte(echo),
31+
chunks: 1,
32+
}
33+
}
34+
35+
func newSizedServer(length, chunks int) *testServer {
36+
return &testServer{
37+
echo: make([]byte, length),
38+
chunks: chunks,
39+
}
2440
}
2541

2642
func (s *testServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
27-
w.Write([]byte("hello"))
43+
for i := 0; i < s.chunks; i++ {
44+
w.Write(s.echo)
45+
}
2846
}
2947

3048
func TestBasicProxy_GRPC(t *testing.T) {
31-
server := httptest.NewServer(&testServer{})
49+
server := httptest.NewServer(newEchoServer("hello"))
3250
defer server.Close()
3351

3452
stopCh := make(chan struct{})
@@ -74,8 +92,57 @@ func TestBasicProxy_GRPC(t *testing.T) {
7492
}
7593
}
7694

95+
func TestProxy_LargeResponse(t *testing.T) {
96+
length := 1 << 20 // 1M
97+
chunks := 10
98+
server := httptest.NewServer(newSizedServer(length, chunks))
99+
defer server.Close()
100+
101+
stopCh := make(chan struct{})
102+
defer close(stopCh)
103+
104+
proxy, cleanup, err := runGRPCProxyServer()
105+
if err != nil {
106+
t.Fatal(err)
107+
}
108+
defer cleanup()
109+
110+
if err := runAgent(proxy.agent, stopCh); err != nil {
111+
t.Fatal(err)
112+
}
113+
114+
// Wait for agent to register on proxy server
115+
time.Sleep(time.Second)
116+
117+
// run test client
118+
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
123+
c := &http.Client{
124+
Transport: &http.Transport{
125+
Dial: tunnel.Dial,
126+
},
127+
}
128+
129+
r, err := c.Get(server.URL)
130+
if err != nil {
131+
t.Error(err)
132+
}
133+
134+
data, err := ioutil.ReadAll(r.Body)
135+
if err != nil {
136+
t.Error(err)
137+
}
138+
139+
if len(data) != length*chunks {
140+
t.Errorf("expect data length %d; got %d", length*chunks, len(data))
141+
}
142+
}
143+
77144
func TestBasicProxy_HTTPCONN(t *testing.T) {
78-
server := httptest.NewServer(&testServer{})
145+
server := httptest.NewServer(newEchoServer("hello"))
79146
defer server.Close()
80147

81148
stopCh := make(chan struct{})

tests/tcp_server_test.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package tests
2+
3+
import (
4+
"net"
5+
"testing"
6+
"time"
7+
8+
"google.golang.org/grpc"
9+
"k8s.io/klog"
10+
"sigs.k8s.io/apiserver-network-proxy/pkg/agent/client"
11+
)
12+
13+
func echo(conn net.Conn) {
14+
var data [256]byte
15+
16+
for {
17+
n, err := conn.Read(data[:])
18+
if err != nil {
19+
klog.Info(err)
20+
return
21+
}
22+
23+
_, err = conn.Write(data[:n])
24+
if err != nil {
25+
klog.Info(err)
26+
return
27+
}
28+
}
29+
}
30+
31+
func TestEchoServer(t *testing.T) {
32+
ln, err := net.Listen("tcp", "")
33+
if err != nil {
34+
t.Error(err)
35+
}
36+
37+
go func() {
38+
for {
39+
conn, err := ln.Accept()
40+
if err != nil {
41+
klog.Info(err)
42+
break
43+
}
44+
go echo(conn)
45+
}
46+
}()
47+
48+
stopCh := make(chan struct{})
49+
defer close(stopCh)
50+
51+
proxy, cleanup, err := runGRPCProxyServer()
52+
if err != nil {
53+
t.Fatal(err)
54+
}
55+
defer cleanup()
56+
57+
if err := runAgent(proxy.agent, stopCh); err != nil {
58+
t.Fatal(err)
59+
}
60+
61+
// Wait for agent to register on proxy server
62+
time.Sleep(time.Second)
63+
64+
// run test client
65+
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
66+
if err != nil {
67+
t.Fatal(err)
68+
}
69+
70+
conn, err := tunnel.Dial("tcp", ln.Addr().String())
71+
if err != nil {
72+
t.Error(err)
73+
}
74+
75+
msg := "1234567890123456789012345"
76+
n, err := conn.Write([]byte(msg))
77+
if err != nil {
78+
t.Error(err)
79+
}
80+
if n != len(msg) {
81+
t.Errorf("expect write %d; got %d", len(msg), n)
82+
}
83+
84+
var data [10]byte
85+
86+
n, err = conn.Read(data[:])
87+
if err != nil {
88+
t.Error(err)
89+
}
90+
if string(data[:n]) != msg[:10] {
91+
t.Errorf("expect %s; got %s", msg[:10], string(data[:n]))
92+
}
93+
94+
n, err = conn.Read(data[:])
95+
if err != nil {
96+
t.Error(err)
97+
}
98+
if string(data[:n]) != msg[10:20] {
99+
t.Errorf("expect %s; got %s", msg[10:20], string(data[:n]))
100+
}
101+
102+
msg2 := "1234567"
103+
n, err = conn.Write([]byte(msg2))
104+
if err != nil {
105+
t.Error(err)
106+
}
107+
if n != len(msg2) {
108+
t.Errorf("expect write %d; got %d", len(msg2), n)
109+
}
110+
111+
n, err = conn.Read(data[:])
112+
if err != nil {
113+
t.Error(err)
114+
}
115+
if string(data[:n]) != msg[20:] {
116+
t.Errorf("expect %s; got %s", msg[20:], string(data[:n]))
117+
}
118+
119+
n, err = conn.Read(data[:])
120+
if err != nil {
121+
t.Error(err)
122+
}
123+
if string(data[:n]) != msg2 {
124+
t.Errorf("expect %s; got %s", msg, string(data[:n]))
125+
}
126+
127+
if err := conn.Close(); err != nil {
128+
t.Error(err)
129+
}
130+
}

0 commit comments

Comments
 (0)