@@ -19,39 +19,34 @@ package tests
19
19
import (
20
20
"bytes"
21
21
"context"
22
- "fmt"
23
22
"io"
24
23
"net/http"
25
24
"net/http/httptest"
25
+ "strconv"
26
26
"sync"
27
27
"testing"
28
+ "time"
28
29
29
30
"google.golang.org/grpc"
30
31
"k8s.io/apimachinery/pkg/util/wait"
31
32
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client"
32
- "sigs.k8s.io/apiserver-network-proxy/pkg/server"
33
- "sigs.k8s.io/apiserver-network-proxy/proto/header"
34
33
)
35
34
36
35
type simpleServer struct {
37
- receivedSecondReq chan struct {}
36
+ mu sync. Mutex
38
37
}
39
38
40
- // ServeHTTP blocks the response to the request whose body is "1" until a
41
- // request whose body is "2" is handled.
42
39
func (s * simpleServer ) ServeHTTP (w http.ResponseWriter , req * http.Request ) {
40
+ s .mu .Lock ()
41
+ defer s .mu .Unlock ()
42
+
43
+ time .Sleep (time .Millisecond )
44
+
43
45
bytes , err := io .ReadAll (req .Body )
44
46
if err != nil {
45
47
w .Write ([]byte (err .Error ()))
46
48
}
47
- if string (bytes ) == "2" {
48
- close (s .receivedSecondReq )
49
- w .Write ([]byte ("2" ))
50
- }
51
- if string (bytes ) == "1" {
52
- <- s .receivedSecondReq
53
- w .Write ([]byte ("1" ))
54
- }
49
+ w .Write (bytes )
55
50
}
56
51
57
52
// TODO: test http-connect as well.
@@ -70,119 +65,45 @@ func getTestClient(front string, t *testing.T) *http.Client {
70
65
}
71
66
}
72
67
73
- // singleTimeManager makes sure that a backend only serves one request.
74
- type singleTimeManager struct {
75
- mu sync.Mutex
76
- backends map [string ]server.Backend
77
- used map [string ]struct {}
78
- }
79
-
80
- func (s * singleTimeManager ) AddBackend (agentID string , _ header.IdentifierType , backend server.Backend ) {
81
- s .mu .Lock ()
82
- defer s .mu .Unlock ()
83
- s .backends [agentID ] = backend
84
- }
85
-
86
- func (s * singleTimeManager ) RemoveBackend (agentID string , _ header.IdentifierType , backend server.Backend ) {
87
- s .mu .Lock ()
88
- defer s .mu .Unlock ()
89
- v , ok := s .backends [agentID ]
90
- if ! ok {
91
- panic (fmt .Errorf ("no backends found for %s" , agentID ))
92
- }
93
- if v != backend {
94
- panic (fmt .Errorf ("recorded backend %v does not match %v" , & v , backend ))
95
- }
96
- delete (s .backends , agentID )
97
- }
98
-
99
- func (s * singleTimeManager ) Backend (_ context.Context ) (server.Backend , error ) {
100
- s .mu .Lock ()
101
- defer s .mu .Unlock ()
102
- for k , v := range s .backends {
103
- if _ , ok := s .used [k ]; ! ok {
104
- s .used [k ] = struct {}{}
105
- return v , nil
106
- }
107
- }
108
- return nil , fmt .Errorf ("cannot find backend to a new agent" )
109
- }
110
-
111
- func (s * singleTimeManager ) GetBackend (agentID string ) server.Backend {
112
- return nil
113
- }
114
-
115
- func (s * singleTimeManager ) NumBackends () int {
116
- return 0
117
- }
118
-
119
- func newSingleTimeGetter (m * server.DefaultBackendManager ) * singleTimeManager {
120
- return & singleTimeManager {
121
- used : make (map [string ]struct {}),
122
- backends : make (map [string ]server.Backend ),
123
- }
124
- }
125
-
126
- var _ server.BackendManager = & singleTimeManager {}
127
-
128
- func (s * singleTimeManager ) Ready () (bool , string ) {
129
- return true , ""
130
- }
131
-
132
68
func TestConcurrentClientRequest (t * testing.T ) {
133
- t . Skip () // FIXME: figure out how to run this without overriding the BackendManagers
134
- s := httptest .NewServer (& simpleServer {receivedSecondReq : make ( chan struct {}) })
69
+ const numConcurrentRequests = 100
70
+ s := httptest .NewServer (& simpleServer {})
135
71
defer s .Close ()
136
72
137
73
ps := runGRPCProxyServerWithServerCount (t , 1 )
138
74
defer ps .Stop ()
139
- // ps.BackendManagers = []server.BackendManager{newSingleTimeGetter(server.NewDefaultBackendManager())} FIXME
140
75
141
76
// Run two agents
142
- ai1 := runAgent (t , ps .AgentAddr ())
143
- ai2 := runAgent (t , ps .AgentAddr ())
144
- defer ai1 .Stop ()
145
- defer ai2 .Stop ()
146
- waitForConnectedServerCount (t , 1 , ai1 )
147
- waitForConnectedServerCount (t , 1 , ai2 )
148
-
149
- client1 := getTestClient (ps .FrontAddr (), t )
150
- client2 := getTestClient (ps .FrontAddr (), t )
77
+ a1 := runAgent (t , ps .AgentAddr ())
78
+ a2 := runAgent (t , ps .AgentAddr ())
79
+ defer a1 .Stop ()
80
+ defer a2 .Stop ()
81
+ waitForConnectedServerCount (t , 1 , a1 )
82
+ waitForConnectedServerCount (t , 1 , a2 )
83
+
151
84
var wg sync.WaitGroup
152
- wg .Add (2 )
153
- go func () {
154
- defer wg .Done ()
155
- r , err := client1 .Post (s .URL , "text/plain" , bytes .NewBufferString ("1" ))
156
- if err != nil {
157
- t .Error (err )
158
- return
159
- }
160
- data , err := io .ReadAll (r .Body )
161
- if err != nil {
162
- t .Error (err )
163
- }
164
- r .Body .Close ()
165
-
166
- if string (data ) != "1" {
167
- t .Errorf ("expect %v; got %v" , "1" , string (data ))
168
- }
169
- }()
170
- go func () {
171
- defer wg .Done ()
172
- r , err := client2 .Post (s .URL , "text/plain" , bytes .NewBufferString ("2" ))
173
- if err != nil {
174
- t .Error (err )
175
- return
176
- }
177
- data , err := io .ReadAll (r .Body )
178
- if err != nil {
179
- t .Error (err )
180
- }
181
- r .Body .Close ()
182
-
183
- if string (data ) != "2" {
184
- t .Errorf ("expect %v; got %v" , "2" , string (data ))
185
- }
186
- }()
85
+ wg .Add (numConcurrentRequests )
86
+ for i := 0 ; i < numConcurrentRequests ; i ++ {
87
+ id := i
88
+ go func () {
89
+ defer wg .Done ()
90
+ client1 := getTestClient (ps .FrontAddr (), t )
91
+
92
+ r , err := client1 .Post (s .URL , "text/plain" , bytes .NewBufferString (strconv .Itoa (id )))
93
+ if err != nil {
94
+ t .Error (err )
95
+ return
96
+ }
97
+ data , err := io .ReadAll (r .Body )
98
+ if err != nil {
99
+ t .Error (err )
100
+ }
101
+ r .Body .Close ()
102
+
103
+ if string (data ) != strconv .Itoa (id ) {
104
+ t .Errorf ("expect %d; got %s" , id , string (data ))
105
+ }
106
+ }()
107
+ }
187
108
wg .Wait ()
188
109
}
0 commit comments