Skip to content

Commit 84242c4

Browse files
author
Michal Witkowski
committed
fix the "i don't know who finished" case
1 parent 9b22f41 commit 84242c4

File tree

2 files changed

+35
-20
lines changed

2 files changed

+35
-20
lines changed

proxy/handler.go

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ package proxy
66
import (
77
"io"
88

9+
"golang.org/x/net/context"
910
"google.golang.org/grpc"
1011
"google.golang.org/grpc/codes"
1112
"google.golang.org/grpc/transport"
12-
"golang.org/x/net/context"
1313
)
1414

1515
var (
@@ -75,24 +75,39 @@ func (s *handler) handler(srv interface{}, serverStream grpc.ServerStream) error
7575
if err != nil {
7676
return err
7777
}
78-
7978
s2cErrChan := s.forwardServerToClient(serverStream, clientStream)
79+
defer close(s2cErrChan)
8080
c2sErrChan := s.forwardClientToServer(clientStream, serverStream)
81-
s2cErr := <-s2cErrChan
82-
if s2cErr != io.EOF {
83-
clientCancel()
84-
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
85-
} else {
86-
clientStream.CloseSend()
87-
}
88-
c2sErr := <-c2sErrChan
89-
90-
serverStream.SetTrailer(clientStream.Trailer())
91-
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
92-
if c2sErr != io.EOF {
93-
return c2sErr
81+
defer close(c2sErrChan)
82+
// We don't know which side is going to stop sending first, so we need a select between the two.
83+
for i := 0; i < 2; i++ {
84+
select {
85+
case s2cErr := <-s2cErrChan:
86+
if s2cErr == io.EOF {
87+
// this is the happy case where the sender has encountered io.EOF, and won't be sending anymore./
88+
// the clientStream>serverStream may continue pumping though.
89+
clientStream.CloseSend()
90+
break
91+
} else {
92+
// however, we may have gotten a receive error (stream disconnected, a read error etc) in which case we need
93+
// to cancel the clientStream to the backend, let all of its goroutines be freed up by the CancelFunc and
94+
// exit with an error to the stack
95+
clientCancel()
96+
return grpc.Errorf(codes.Internal, "failed proxying s2c: %v", s2cErr)
97+
}
98+
case c2sErr := <-c2sErrChan:
99+
// This happens when the clientStream has nothing else to offer (io.EOF), returned a gRPC error. In those two
100+
// cases we may have received Trailers as part of the call. In case of other errors (stream closed) the trailers
101+
// will be nil.
102+
serverStream.SetTrailer(clientStream.Trailer())
103+
// c2sErr will contain RPC error from client code. If not io.EOF return the RPC error as server stream error.
104+
if c2sErr != io.EOF {
105+
return c2sErr
106+
}
107+
return nil
108+
}
94109
}
95-
return nil
110+
return grpc.Errorf(codes.Internal, "gRPC proxying should never reach this stage.")
96111
}
97112

98113
func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerStream) chan error {
@@ -123,7 +138,6 @@ func (s *handler) forwardClientToServer(src grpc.ClientStream, dst grpc.ServerSt
123138
break
124139
}
125140
}
126-
close(ret)
127141
}()
128142
return ret
129143
}
@@ -134,15 +148,16 @@ func (s *handler) forwardServerToClient(src grpc.ServerStream, dst grpc.ClientSt
134148
f := &frame{}
135149
for i := 0; ; i++ {
136150
if err := src.RecvMsg(f); err != nil {
151+
//grpclog.Printf("s2c err: %v", err)
137152
ret <- err // this can be io.EOF which is happy case
138153
break
139154
}
140155
if err := dst.SendMsg(f); err != nil {
156+
//grpclog.Printf("s2c err: %v", err)
141157
ret <- err
142158
break
143159
}
144160
}
145-
close(ret)
146161
}()
147162
return ret
148163
}

proxy/handler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,12 @@ func (s *ProxyHappySuite) SetupSuite() {
210210
"Ping")
211211

212212
// Start the serving loops.
213+
s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String())
213214
go func() {
214-
s.T().Logf("starting grpc.Server at: %v", s.serverListener.Addr().String())
215215
s.server.Serve(s.serverListener)
216216
}()
217+
s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String())
217218
go func() {
218-
s.T().Logf("starting grpc.Proxy at: %v", s.proxyListener.Addr().String())
219219
s.proxy.Serve(s.proxyListener)
220220
}()
221221

0 commit comments

Comments
 (0)