Skip to content

Commit 28341d1

Browse files
author
Michal Witkowski
committed
move out forward logic to method, allowing for use as grpc.Server not found handler.
1 parent 89e28b4 commit 28341d1

File tree

1 file changed

+48
-43
lines changed

1 file changed

+48
-43
lines changed

proxy.go

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,44 +13,44 @@
1313
// limitations under the License.
1414

1515
package proxy
16+
1617
import (
17-
"net"
1818
"fmt"
1919
"io"
20+
"net"
2021
"strings"
2122
"sync"
2223

24+
"golang.org/x/net/context"
2325
"google.golang.org/grpc"
24-
"google.golang.org/grpc/transport"
26+
"google.golang.org/grpc/codes"
2527
"google.golang.org/grpc/credentials"
2628
"google.golang.org/grpc/grpclog"
27-
"google.golang.org/grpc/codes"
28-
"golang.org/x/net/context"
29+
"google.golang.org/grpc/transport"
2930
)
3031

31-
3232
// transportWriter is a common interface between gRPC transport.ServerTransport and transport.ClientTransport.
3333
type transportWriter interface {
3434
Write(s *transport.Stream, data []byte, opts *transport.Options) error
3535
}
3636

3737
type Proxy struct {
38-
mu sync.Mutex
39-
lis map[net.Listener]bool
40-
conns map[transport.ServerTransport]bool
41-
logger grpclog.Logger
38+
mu sync.Mutex
39+
lis map[net.Listener]bool
40+
conns map[transport.ServerTransport]bool
41+
logger grpclog.Logger
4242
director StreamDirector
43-
opts *options
43+
opts *options
4444
}
4545

4646
// NewServer creates a gRPC proxy which will use the `StreamDirector` for making routing decisions.
4747
func NewServer(director StreamDirector, opt ...ProxyOption) *Proxy {
4848
s := &Proxy{
49-
lis: make(map[net.Listener]bool),
50-
conns: make(map[transport.ServerTransport]bool),
51-
opts: &options{},
49+
lis: make(map[net.Listener]bool),
50+
conns: make(map[transport.ServerTransport]bool),
51+
opts: &options{},
5252
director: director,
53-
logger: &defaultLogger{},
53+
logger: &defaultLogger{},
5454
}
5555
for _, o := range opt {
5656
o(s.opts)
@@ -138,31 +138,53 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream *
138138
}
139139
return
140140
}
141+
ProxyStream(s.director, s.logger, frontTrans, frontStream)
142+
143+
}
141144

142-
backendTrans, backendStream, err := s.backendTransportStream(frontStream.Context())
145+
// Stop stops the gRPC server. Once Stop returns, the server stops accepting
146+
// connection requests and closes all the connected connections.
147+
func (s *Proxy) Stop() {
148+
s.mu.Lock()
149+
listeners := s.lis
150+
s.lis = nil
151+
cs := s.conns
152+
s.conns = nil
153+
s.mu.Unlock()
154+
for lis := range listeners {
155+
lis.Close()
156+
}
157+
for c := range cs {
158+
c.Close()
159+
}
160+
}
161+
162+
// ProxyStream performs a forward of a gRPC frontend stream to a backend.
163+
func ProxyStream(director StreamDirector, logger grpclog.Logger, frontTrans transport.ServerTransport, frontStream *transport.Stream) {
164+
backendTrans, backendStream, err := backendTransportStream(director, frontStream.Context())
143165
if err != nil {
144166
frontTrans.WriteStatus(frontStream, grpc.Code(err), grpc.ErrorDesc(err))
145-
s.logger.Printf("proxy: Proxy.handleStream %v failed to allocate backend: %v", frontStream.Method(), err)
167+
logger.Printf("proxy: Proxy.handleStream %v failed to allocate backend: %v", frontStream.Method(), err)
146168
return
147169
}
148170
defer backendTrans.CloseStream(backendStream, nil)
149171

150172
// data coming from client call to backend
151-
ingressPathChan := s.forwardDataFrames(frontStream, backendStream, backendTrans)
173+
ingressPathChan := forwardDataFrames(frontStream, backendStream, backendTrans)
152174

153175
// custom header handling *must* be after some data is processed by the backend, otherwise there's a deadlock
154176
headerMd, err := backendStream.Header()
155177
if err == nil && len(headerMd) > 0 {
156178
frontTrans.WriteHeader(frontStream, headerMd)
157179
}
158180
// data coming from backend back to client call
159-
egressPathChan := s.forwardDataFrames(backendStream, frontStream, frontTrans)
181+
egressPathChan := forwardDataFrames(backendStream, frontStream, frontTrans)
160182

161183
// wait for both data streams to complete.
162-
egressErr := <- egressPathChan
163-
ingressErr := <- ingressPathChan
184+
egressErr := <-egressPathChan
185+
ingressErr := <-ingressPathChan
164186
if egressErr != nil || ingressErr != nil {
165-
s.logger.Printf("proxy: Proxy.handleStream %v failure during transfer ingres: %v egress: %v", frontStream.Method(), ingressErr, egressErr)
187+
logger.Printf("proxy: Proxy.handleStream %v failure during transfer ingres: %v egress: %v", frontStream.Method(), ingressErr, egressErr)
166188
frontTrans.WriteStatus(frontStream, codes.Unavailable, fmt.Sprintf("problem in transfer ingress: %v egress: %v", ingressErr, egressErr))
167189
return
168190
}
@@ -175,8 +197,8 @@ func (s *Proxy) handleStream(frontTrans transport.ServerTransport, frontStream *
175197
}
176198

177199
// backendTransportStream picks and establishes a Stream to the backend.
178-
func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTransport, *transport.Stream, error) {
179-
grpcConn, err := s.director(ctx)
200+
func backendTransportStream(director StreamDirector, ctx context.Context) (transport.ClientTransport, *transport.Stream, error) {
201+
grpcConn, err := director(ctx)
180202
if err != nil {
181203
if grpc.Code(err) != codes.Unknown { // rpcError check
182204
return nil, nil, err
@@ -189,7 +211,7 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
189211
frontendStream, _ := transport.StreamFromContext(ctx)
190212
callHdr := &transport.CallHdr{
191213
Method: frontendStream.Method(),
192-
Host: "TODOFIXTLS", // TODO(michal): This can fail if the backend server is using TLS Hostname verification. Use conn.authority, once it's public?
214+
Host: "TODOFIXTLS", // TODO(michal): This can fail if the backend server is using TLS Hostname verification. Use conn.authority, once it's public?
193215
}
194216
backendStream, err := backendTrans.NewStream(ctx, callHdr)
195217
if err != nil {
@@ -200,10 +222,10 @@ func (s *Proxy) backendTransportStream(ctx context.Context) (transport.ClientTra
200222

201223
// forwardDataFrames moves data from one gRPC transport `Stream` to another in async fashion.
202224
// It returns an error channel. `nil` on it signifies everything was fine, anything else is a serious problem.
203-
func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error {
225+
func forwardDataFrames(srcStream *transport.Stream, dstStream *transport.Stream, dstTransport transportWriter) chan error {
204226
ret := make(chan error)
205227

206-
go func () {
228+
go func() {
207229
data := make([]byte, 4096)
208230
opt := &transport.Options{}
209231
for {
@@ -227,22 +249,5 @@ func (s *Proxy) forwardDataFrames(srcStream *transport.Stream, dstStream *transp
227249
close(ret)
228250
}()
229251
return ret
230-
231252
}
232253

233-
// Stop stops the gRPC server. Once Stop returns, the server stops accepting
234-
// connection requests and closes all the connected connections.
235-
func (s *Proxy) Stop() {
236-
s.mu.Lock()
237-
listeners := s.lis
238-
s.lis = nil
239-
cs := s.conns
240-
s.conns = nil
241-
s.mu.Unlock()
242-
for lis := range listeners {
243-
lis.Close()
244-
}
245-
for c := range cs {
246-
c.Close()
247-
}
248-
}

0 commit comments

Comments
 (0)