Skip to content

Commit 3a9efe9

Browse files
authored
fix: ingress issues for grpc and http dual connection (#741)
* revert: revert changes for adding cmux * fix: increase write timeout due to grpc errors
1 parent c4cf585 commit 3a9efe9

File tree

4 files changed

+29
-45
lines changed

4 files changed

+29
-45
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ require (
3838
github.com/schollz/progressbar/v3 v3.8.5
3939
github.com/sirupsen/logrus v1.8.1
4040
github.com/slack-go/slack v0.9.1
41-
github.com/soheilhy/cmux v0.1.5
4241
github.com/spf13/afero v1.8.2
4342
github.com/spf13/cobra v1.2.1
4443
github.com/spf13/viper v1.8.1

go.sum

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1373,7 +1373,6 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
13731373
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
13741374
github.com/snowflakedb/gosnowflake v1.6.3/go.mod h1:6hLajn6yxuJ4xUHZegMekpq9rnQbGJ7TMwXjgTmA6lg=
13751375
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
1376-
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
13771376
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
13781377
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
13791378
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=

server/optimus.go

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"context"
66
"fmt"
77
"io"
8-
"net"
98
"net/http"
109
"os"
1110

@@ -16,7 +15,6 @@ import (
1615
"github.com/odpf/salt/log"
1716
"github.com/prometheus/client_golang/prometheus"
1817
slackapi "github.com/slack-go/slack"
19-
"github.com/soheilhy/cmux"
2018
"google.golang.org/grpc"
2119

2220
"github.com/odpf/optimus/config"
@@ -60,7 +58,6 @@ type OptimusServer struct {
6058
key *[keyLength]byte
6159

6260
serverAddr string
63-
mux cmux.CMux
6461
grpcServer *grpc.Server
6562
httpServer *http.Server
6663

@@ -85,7 +82,6 @@ func New(conf config.ServerConfig) (*OptimusServer, error) {
8582
server.setupTelemetry,
8683
server.setupAppKey,
8784
server.setupDB,
88-
server.setupMux,
8985
server.setupGRPCServer,
9086
server.setupHandlers,
9187
server.setupMonitoring,
@@ -172,16 +168,6 @@ func (s *OptimusServer) setupDB() error {
172168
return nil
173169
}
174170

175-
func (s *OptimusServer) setupMux() error {
176-
ln, err := net.Listen("tcp", s.serverAddr)
177-
if err != nil {
178-
return err
179-
}
180-
181-
s.mux = cmux.New(ln)
182-
return nil
183-
}
184-
185171
func (s *OptimusServer) setupGRPCServer() error {
186172
var err error
187173
s.grpcServer, err = setupGRPCServer(s.logger)
@@ -195,40 +181,20 @@ func (s *OptimusServer) setupMonitoring() error {
195181
}
196182

197183
func (s *OptimusServer) setupHTTPProxy() error {
198-
srv, cleanup, err := prepareHTTPProxy(s.serverAddr)
184+
srv, cleanup, err := prepareHTTPProxy(s.serverAddr, s.grpcServer)
199185
s.httpServer = srv
200186
s.cleanupFn = append(s.cleanupFn, cleanup)
201187
return err
202188
}
203189

204190
func (s *OptimusServer) startListening() {
205191
// run our server in a goroutine so that it doesn't block to wait for termination requests
206-
// We first match a request first for grpc
207-
grpcLn := s.mux.MatchWithWriters(
208-
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
209-
// if it does not match grpc, we send the request to http
210-
httpL := s.mux.Match(cmux.Any())
211-
212-
go func() {
213-
// Start listening on grpc server
214-
if err := s.grpcServer.Serve(grpcLn); err != nil {
215-
s.logger.Info("shutting down grpc server")
216-
}
217-
}()
218-
219-
go func() {
220-
// Start listening on http server
221-
err := s.httpServer.Serve(httpL)
222-
if err != nil {
223-
s.logger.Info("Shutting http proxy")
224-
}
225-
}()
226-
227192
go func() {
228-
// Start mux server
229193
s.logger.Info("Listening at", "address", s.serverAddr)
230-
if err := s.mux.Serve(); err != nil {
231-
s.logger.Info("server error", "error", err)
194+
if err := s.httpServer.ListenAndServe(); err != nil {
195+
if err != http.ErrServerClosed {
196+
s.logger.Fatal("server error", "error", err)
197+
}
232198
}
233199
}()
234200
}

server/server.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/http"
88
"net/url"
9+
"strings"
910
"time"
1011

1112
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@@ -18,6 +19,8 @@ import (
1819
"github.com/sirupsen/logrus"
1920
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
2021
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
22+
"golang.org/x/net/http2"
23+
"golang.org/x/net/http2/h2c"
2124
"google.golang.org/grpc"
2225
"google.golang.org/grpc/codes"
2326
"google.golang.org/grpc/health"
@@ -100,7 +103,7 @@ func setupGRPCServer(l log.Logger) (*grpc.Server, error) {
100103
return grpcServer, nil
101104
}
102105

103-
func prepareHTTPProxy(grpcAddr string) (*http.Server, func(), error) {
106+
func prepareHTTPProxy(grpcAddr string, grpcServer *grpc.Server) (*http.Server, func(), error) {
104107
timeoutGrpcDialCtx, grpcDialCancel := context.WithTimeout(context.Background(), DialTimeout)
105108
defer grpcDialCancel()
106109

@@ -166,12 +169,29 @@ func prepareHTTPProxy(grpcAddr string) (*http.Server, func(), error) {
166169

167170
//nolint: gomnd
168171
srv := &http.Server{
169-
Handler: baseMux,
172+
Handler: grpcHandlerFunc(grpcServer, baseMux),
170173
Addr: grpcAddr,
171174
ReadTimeout: 5 * time.Second,
172-
WriteTimeout: 60 * time.Second,
173-
IdleTimeout: 120 * time.Second,
175+
WriteTimeout: 30 * time.Minute, // FIXME: Creating issues for grpc connection
176+
IdleTimeout: 5 * time.Minute,
174177
}
175178

176179
return srv, cleanup, nil
177180
}
181+
182+
// grpcHandlerFunc routes http1 calls to baseMux and http2 with grpc header to grpcServer.
183+
// Using a single port for proxying both http1 & 2 protocols will degrade http performance
184+
// but for our use-case the convenience per performance tradeoff is better suited
185+
// if in the future, this does become a bottleneck(which I highly doubt), we can break the service
186+
// into two ports, default port for grpc and default+1 for grpc-gateway proxy.
187+
// We can also use something like a connection multiplexer
188+
// https://github.com/soheilhy/cmux to achieve the same.
189+
func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler {
190+
return h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
191+
if r.ProtoMajor == 2 && strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
192+
grpcServer.ServeHTTP(w, r)
193+
} else {
194+
otherHandler.ServeHTTP(w, r)
195+
}
196+
}), &http2.Server{})
197+
}

0 commit comments

Comments
 (0)