Skip to content

Commit 00dd588

Browse files
author
Michal Witkowski
committed
merge upstream grpc.Server changes changing the dispatch logic
1 parent 77edc97 commit 00dd588

File tree

2 files changed

+19
-14
lines changed

2 files changed

+19
-14
lines changed

proxy.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,18 @@ func (s *Proxy) Serve(lis net.Listener) error {
111111
s.conns[st] = true
112112
s.mu.Unlock()
113113

114-
go func() {
115-
st.HandleStreams(func(stream *transport.Stream) {
114+
var wg sync.WaitGroup
115+
st.HandleStreams(func(stream *transport.Stream) {
116+
wg.Add(1)
117+
go func() {
116118
s.handleStream(st, stream)
117-
})
118-
s.mu.Lock()
119-
delete(s.conns, st)
120-
s.mu.Unlock()
121-
}()
119+
wg.Done()
120+
}()
121+
})
122+
wg.Wait()
123+
s.mu.Lock()
124+
delete(s.conns, st)
125+
s.mu.Unlock()
122126
}
123127
}
124128

@@ -154,6 +158,7 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream *
154158
// data coming from backend back to client call
155159
egressPathChan := s.forwardDataFrames(backendStream, frontStream, frontTrans)
156160

161+
// wait for both data streams to complete.
157162
egressErr := <- egressPathChan
158163
ingressErr := <- ingressPathChan
159164
if egressErr != nil || ingressErr != nil {
@@ -179,8 +184,8 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
179184
return nil, nil, grpc.Errorf(codes.Aborted, "cant dial to backend: %v", err)
180185
}
181186
}
182-
// TODO(michal): PickTransport IS NOT IN UPSTREAM GRPC!
183-
_, backendTrans, err := grpcConn.PickTransport(ctx)
187+
// TODO(michal): ClientConn.Picker() IS NOT IN UPSTREAM GRPC! https://github.com/grpc/grpc-go/pull/397
188+
backendTrans, err := grpcConn.Picker().Pick(ctx)
184189
frontendStream, _ := transport.StreamFromContext(ctx)
185190
callHdr := &transport.CallHdr{
186191
Method: frontendStream.Method(),
@@ -197,11 +202,13 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
197202
// It returns an error channel. `nil` on it signifies everything was fine, anything else is a serious problem.
198203
func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error {
199204
ret := make(chan error)
205+
200206
go func () {
201207
data := make([]byte, 4096)
202208
opt := &transport.Options{}
203209
for {
204210
n, err := srcStream.Read(data)
211+
205212
if err == io.EOF {
206213
ret <- nil
207214
break

proxy_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@ import (
1717
"time"
1818
"net"
1919
"testing"
20+
"io"
2021

21-
"github.com/mwitkow-io/grpc-reverseproxy"
22-
pb "github.com/mwitkow-io/grpc-reverseproxy/testservice"
2322

23+
"github.com/mwitkow-io/grpc-proxy"
24+
pb "github.com/mwitkow-io/grpc-proxy/testservice"
2425

2526
"github.com/stretchr/testify/suite"
2627
"github.com/stretchr/testify/require"
@@ -30,8 +31,6 @@ import (
3031
"google.golang.org/grpc/metadata"
3132
"golang.org/x/net/context"
3233
"github.com/stretchr/testify/assert"
33-
"io"
34-
"google.golang.org/grpc/transport"
3534
)
3635

3736

@@ -171,7 +170,6 @@ func (s *ProxyHappySuite) SetupSuite() {
171170
proxyClientConn, err := grpc.Dial(s.serverListener.Addr().String(), grpc.WithInsecure())
172171
require.NoError(s.T(), err, "must not error on deferred client Dial")
173172
proxyServer := proxy.NewServer(func(ctx context.Context) (*grpc.ClientConn, error) {
174-
transport.StreamFromContext()
175173
md, ok := metadata.FromContext(ctx)
176174
if ok {
177175
if _, exists := md[rejectingMdKey]; exists {

0 commit comments

Comments
 (0)