Skip to content

Commit 33b9978

Browse files
authored
Merge pull request #93 from Jefftree/grpc-close-conn
Close UDS socket connection on completion in GRPC mode
2 parents 5ab9684 + 3551828 commit 33b9978

File tree

7 files changed

+19
-15
lines changed

7 files changed

+19
-15
lines changed

cmd/client/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (c *Client) getUDSDialer(o *GrpcProxyClientOptions) (func(ctx context.Conte
264264
}
265265
return c, err
266266
})
267-
tunnel, err := client.CreateGrpcTunnel(o.proxyUdsName, dialOption, grpc.WithInsecure(), grpc.WithUserAgent(o.userAgent))
267+
tunnel, err := client.CreateSingleUseGrpcTunnel(o.proxyUdsName, dialOption, grpc.WithInsecure(), grpc.WithUserAgent(o.userAgent))
268268
if err != nil {
269269
return nil, fmt.Errorf("failed to create tunnel %s, got %v", o.proxyUdsName, err)
270270
}
@@ -334,7 +334,7 @@ func (c *Client) getMTLSDialer(o *GrpcProxyClientOptions) (func(ctx context.Cont
334334
transportCreds := credentials.NewTLS(tlsConfig)
335335
dialOption := grpc.WithTransportCredentials(transportCreds)
336336
serverAddress := fmt.Sprintf("%s:%d", o.proxyHost, o.proxyPort)
337-
tunnel, err := client.CreateGrpcTunnel(serverAddress, dialOption)
337+
tunnel, err := client.CreateSingleUseGrpcTunnel(serverAddress, dialOption)
338338
if err != nil {
339339
return nil, fmt.Errorf("failed to create tunnel %s, got %v", serverAddress, err)
340340
}

konnectivity-client/pkg/client/client.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ type grpcTunnel struct {
5151
connsLock sync.RWMutex
5252
}
5353

54-
// CreateGrpcTunnel creates a Tunnel to dial to a remote server through a
54+
// CreateSingleUseGrpcTunnel creates a Tunnel to dial to a remote server through a
5555
// gRPC based proxy service.
56-
func CreateGrpcTunnel(address string, opts ...grpc.DialOption) (Tunnel, error) {
56+
// Currently, a single tunnel supports a single connection, and the tunnel is closed when the connection is terminated
57+
// The Dial() method of the returned tunnel should only be called once
58+
func CreateSingleUseGrpcTunnel(address string, opts ...grpc.DialOption) (Tunnel, error) {
5759
c, err := grpc.Dial(address, opts...)
5860
if err != nil {
5961
return nil, err
@@ -72,12 +74,14 @@ func CreateGrpcTunnel(address string, opts ...grpc.DialOption) (Tunnel, error) {
7274
conns: make(map[int64]*conn),
7375
}
7476

75-
go tunnel.serve()
77+
go tunnel.serve(c)
7678

7779
return tunnel, nil
7880
}
7981

80-
func (t *grpcTunnel) serve() {
82+
func (t *grpcTunnel) serve(c *grpc.ClientConn) {
83+
defer c.Close()
84+
8185
for {
8286
pkt, err := t.stream.Recv()
8387
if err == io.EOF {
@@ -130,9 +134,9 @@ func (t *grpcTunnel) serve() {
130134
t.connsLock.Lock()
131135
delete(t.conns, resp.ConnectID)
132136
t.connsLock.Unlock()
133-
} else {
134-
klog.Warningf("connection id %d not recognized", resp.ConnectID)
137+
return
135138
}
139+
klog.Warningf("connection id %d not recognized", resp.ConnectID)
136140
}
137141
}
138142
}

tests/concurrent_client_request_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (s *simpleServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
3939

4040
// TODO: test http-connect as well.
4141
func getTestClient(front string, t *testing.T) *http.Client {
42-
tunnel, err := client.CreateGrpcTunnel(front, grpc.WithInsecure())
42+
tunnel, err := client.CreateSingleUseGrpcTunnel(front, grpc.WithInsecure())
4343
if err != nil {
4444
t.Fatal(err)
4545
}

tests/concurrent_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func TestProxy_Concurrency(t *testing.T) {
3333
time.Sleep(time.Second)
3434

3535
// run test client
36-
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
36+
tunnel, err := client.CreateSingleUseGrpcTunnel(proxy.front, grpc.WithInsecure())
3737
if err != nil {
3838
t.Fatal(err)
3939
}

tests/ha_proxy_server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ func TestBasicHAProxyServer_GRPC(t *testing.T) {
183183
}
184184

185185
func testProxyServer(t *testing.T, front string, target string) {
186-
tunnel, err := client.CreateGrpcTunnel(front, grpc.WithInsecure())
186+
tunnel, err := client.CreateSingleUseGrpcTunnel(front, grpc.WithInsecure())
187187
if err != nil {
188188
t.Fatal(err)
189189
}

tests/proxy_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestBasicProxy_GRPC(t *testing.T) {
6767
time.Sleep(time.Second)
6868

6969
// run test client
70-
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
70+
tunnel, err := client.CreateSingleUseGrpcTunnel(proxy.front, grpc.WithInsecure())
7171
if err != nil {
7272
t.Fatal(err)
7373
}
@@ -111,7 +111,7 @@ func TestProxyHandleDialError_GRPC(t *testing.T) {
111111
time.Sleep(time.Second)
112112

113113
// run test client
114-
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
114+
tunnel, err := client.CreateSingleUseGrpcTunnel(proxy.front, grpc.WithInsecure())
115115
if err != nil {
116116
t.Fatal(err)
117117
}
@@ -152,7 +152,7 @@ func TestProxy_LargeResponse(t *testing.T) {
152152
time.Sleep(time.Second)
153153

154154
// run test client
155-
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
155+
tunnel, err := client.CreateSingleUseGrpcTunnel(proxy.front, grpc.WithInsecure())
156156
if err != nil {
157157
t.Fatal(err)
158158
}

tests/tcp_server_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestEchoServer(t *testing.T) {
6060
time.Sleep(time.Second)
6161

6262
// run test client
63-
tunnel, err := client.CreateGrpcTunnel(proxy.front, grpc.WithInsecure())
63+
tunnel, err := client.CreateSingleUseGrpcTunnel(proxy.front, grpc.WithInsecure())
6464
if err != nil {
6565
t.Fatal(err)
6666
}

0 commit comments

Comments
 (0)