Skip to content

Commit 7b6bf22

Browse files
Use flow-go Components for composing
1 parent a746c82 commit 7b6bf22

File tree

14 files changed

+2556
-1425
lines changed

14 files changed

+2556
-1425
lines changed

api/profiler.go

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,54 +7,94 @@ import (
77
"net/http"
88
_ "net/http/pprof"
99
"strconv"
10+
"time"
11+
12+
"github.com/onflow/flow-go/module/component"
13+
"github.com/onflow/flow-go/module/irrecoverable"
1014

1115
"github.com/rs/zerolog"
1216
)
1317

1418
type ProfileServer struct {
15-
logger zerolog.Logger
19+
log zerolog.Logger
1620
server *http.Server
1721
endpoint string
22+
23+
startupCompleted chan struct{}
1824
}
1925

26+
var _ component.Component = (*ProfileServer)(nil)
27+
2028
func NewProfileServer(
2129
logger zerolog.Logger,
2230
host string,
2331
port int,
2432
) *ProfileServer {
2533
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
2634
return &ProfileServer{
27-
logger: logger,
28-
server: &http.Server{Addr: endpoint},
29-
endpoint: endpoint,
35+
log: logger,
36+
server: &http.Server{Addr: endpoint},
37+
endpoint: endpoint,
38+
startupCompleted: make(chan struct{}),
3039
}
3140
}
3241

33-
func (s *ProfileServer) ListenAddr() string {
34-
return s.endpoint
35-
}
42+
func (s *ProfileServer) Start(ctx irrecoverable.SignalerContext) {
43+
defer close(s.startupCompleted)
44+
45+
s.server.BaseContext = func(_ net.Listener) context.Context {
46+
return ctx
47+
}
3648

37-
func (s *ProfileServer) Start() {
3849
go func() {
39-
err := s.server.ListenAndServe()
40-
if err != nil {
50+
s.log.Info().Msgf("Profiler server started: %s", s.endpoint)
51+
52+
if err := s.server.ListenAndServe(); err != nil {
53+
// http.ErrServerClosed is returned when Close or Shutdown is called
54+
// we don't consider this an error, so print this with debug level instead
4155
if errors.Is(err, http.ErrServerClosed) {
42-
s.logger.Warn().Msg("Profiler server shutdown")
43-
return
56+
s.log.Debug().Err(err).Msg("Profiler server shutdown")
57+
} else {
58+
s.log.Err(err).Msg("error running profiler server")
4459
}
45-
s.logger.Err(err).Msg("failed to start Profiler server")
46-
panic(err)
4760
}
4861
}()
4962
}
5063

51-
func (s *ProfileServer) Stop() error {
52-
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
53-
defer cancel()
64+
func (s *ProfileServer) Ready() <-chan struct{} {
65+
ready := make(chan struct{})
66+
67+
go func() {
68+
<-s.startupCompleted
69+
close(ready)
70+
}()
5471

55-
return s.server.Shutdown(ctx)
72+
return ready
5673
}
5774

58-
func (s *ProfileServer) Close() error {
59-
return s.server.Close()
75+
func (s *ProfileServer) Done() <-chan struct{} {
76+
done := make(chan struct{})
77+
go func() {
78+
<-s.startupCompleted
79+
defer close(done)
80+
81+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
82+
defer cancel()
83+
84+
err := s.server.Shutdown(ctx)
85+
if err == nil {
86+
s.log.Info().Msg("Profiler server graceful shutdown completed")
87+
}
88+
89+
if errors.Is(err, ctx.Err()) {
90+
s.log.Warn().Msg("Profiler server graceful shutdown timed out")
91+
err := s.server.Close()
92+
if err != nil {
93+
s.log.Err(err).Msg("error closing profiler server")
94+
}
95+
} else {
96+
s.log.Err(err).Msg("error shutting down profiler server")
97+
}
98+
}()
99+
return done
60100
}

api/server.go

Lines changed: 72 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
"strings"
1919
"time"
2020

21-
"github.com/onflow/go-ethereum/core"
21+
"github.com/onflow/flow-go/module/component"
22+
"github.com/onflow/flow-go/module/irrecoverable"
23+
2224
gethVM "github.com/onflow/go-ethereum/core/vm"
2325
gethLog "github.com/onflow/go-ethereum/log"
2426
"github.com/onflow/go-ethereum/rpc"
@@ -57,8 +59,12 @@ type Server struct {
5759

5860
config config.Config
5961
collector metrics.Collector
62+
63+
startupCompleted chan struct{}
6064
}
6165

66+
var _ component.Component = (*Server)(nil)
67+
6268
const (
6369
shutdownTimeout = 5 * time.Second
6470
batchRequestLimit = 50
@@ -79,10 +85,11 @@ func NewServer(
7985
gethLog.SetDefault(gethLog.NewLogger(zeroSlog))
8086

8187
return &Server{
82-
logger: logger,
83-
timeouts: rpc.DefaultHTTPTimeouts,
84-
config: cfg,
85-
collector: collector,
88+
logger: logger,
89+
timeouts: rpc.DefaultHTTPTimeouts,
90+
config: cfg,
91+
collector: collector,
92+
startupCompleted: make(chan struct{}),
8693
}
8794
}
8895

@@ -179,9 +186,10 @@ func (h *Server) disableWS() bool {
179186
}
180187

181188
// Start starts the HTTP server if it is enabled and not already running.
182-
func (h *Server) Start() error {
189+
func (h *Server) Start(ctx irrecoverable.SignalerContext) {
190+
defer close(h.startupCompleted)
183191
if h.endpoint == "" || h.listener != nil {
184-
return nil // already running or not configured
192+
return // already running or not configured
185193
}
186194

187195
// Initialize the server.
@@ -192,16 +200,21 @@ func (h *Server) Start() error {
192200
h.server.ReadHeaderTimeout = h.timeouts.ReadHeaderTimeout
193201
h.server.WriteTimeout = h.timeouts.WriteTimeout
194202
h.server.IdleTimeout = h.timeouts.IdleTimeout
203+
h.server.BaseContext = func(_ net.Listener) context.Context {
204+
return ctx
205+
}
195206
}
196207

208+
listenConfig := net.ListenConfig{}
197209
// Start the server.
198-
listener, err := net.Listen("tcp", h.endpoint)
210+
listener, err := listenConfig.Listen(ctx, "tcp", h.endpoint)
199211
if err != nil {
200212
// If the server fails to start, we need to clear out the RPC and WS
201213
// configurations so they can be configured another time.
202214
h.disableRPC()
203215
h.disableWS()
204-
return err
216+
ctx.Throw(err)
217+
return
205218
}
206219

207220
h.listener = listener
@@ -213,7 +226,7 @@ func (h *Server) Start() error {
213226
return
214227
}
215228
h.logger.Err(err).Msg("failed to start API server")
216-
panic(err)
229+
ctx.Throw(err)
217230
}
218231
}()
219232

@@ -225,8 +238,17 @@ func (h *Server) Start() error {
225238
url := fmt.Sprintf("ws://%v", listener.Addr())
226239
h.logger.Info().Msgf("JSON-RPC over WebSocket enabled: %s", url)
227240
}
241+
}
228242

229-
return nil
243+
func (h *Server) Ready() <-chan struct{} {
244+
ready := make(chan struct{})
245+
246+
go func() {
247+
<-h.startupCompleted
248+
close(ready)
249+
}()
250+
251+
return ready
230252
}
231253

232254
// disableRPC stops the JSON-RPC over HTTP handler.
@@ -296,41 +318,50 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
296318
w.WriteHeader(http.StatusNotFound)
297319
}
298320

299-
// Stop shuts down the HTTP server.
300-
func (h *Server) Stop() {
301-
if h.listener == nil {
302-
return // not running
303-
}
321+
// Done shuts down the HTTP server.
322+
func (h *Server) Done() <-chan struct{} {
323+
done := make(chan struct{})
304324

305-
// Shut down the server.
306-
httpHandler := h.httpHandler
307-
if httpHandler != nil {
308-
httpHandler.server.Stop()
309-
h.httpHandler = nil
310-
}
325+
go func() {
326+
defer close(done)
311327

312-
wsHandler := h.wsHandler
313-
if wsHandler != nil {
314-
wsHandler.server.Stop()
315-
h.wsHandler = nil
316-
}
328+
if h.listener == nil {
329+
return // not running
330+
}
317331

318-
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
319-
defer cancel()
320-
err := h.server.Shutdown(ctx)
321-
if err != nil && err == ctx.Err() {
322-
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
323-
h.server.Close()
324-
}
332+
// Shut down the server.
333+
httpHandler := h.httpHandler
334+
if httpHandler != nil {
335+
httpHandler.server.Stop()
336+
h.httpHandler = nil
337+
}
338+
339+
wsHandler := h.wsHandler
340+
if wsHandler != nil {
341+
wsHandler.server.Stop()
342+
h.wsHandler = nil
343+
}
344+
345+
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
346+
defer cancel()
347+
err := h.server.Shutdown(ctx)
348+
if err != nil && err == ctx.Err() {
349+
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
350+
h.server.Close()
351+
}
325352

326-
h.listener.Close()
327-
h.logger.Info().Msgf(
328-
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
329-
)
353+
h.listener.Close()
354+
h.logger.Info().Msgf(
355+
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
356+
)
357+
358+
// Clear out everything to allow re-configuring it later.
359+
h.host, h.port, h.endpoint = "", 0, ""
360+
h.server, h.listener = nil, nil
361+
362+
}()
330363

331-
// Clear out everything to allow re-configuring it later.
332-
h.host, h.port, h.endpoint = "", 0, ""
333-
h.server, h.listener = nil, nil
364+
return done
334365
}
335366

336367
// CheckTimeouts ensures that timeout values are meaningful

api/stream.go

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func newSubscription[T any](
178178

179179
rpcSub := notifier.CreateSubscription()
180180

181-
subs := models.NewSubscription(logger, callback(notifier, rpcSub))
181+
subs := models.NewSubscription(callback(notifier, rpcSub))
182182

183183
l := logger.With().
184184
Str("gateway-subscription-id", fmt.Sprintf("%p", subs)).
@@ -190,16 +190,8 @@ func newSubscription[T any](
190190
go func() {
191191
defer publisher.Unsubscribe(subs)
192192

193-
for {
194-
select {
195-
case err := <-subs.Error():
196-
l.Debug().Err(err).Msg("subscription returned error")
197-
return
198-
case err := <-rpcSub.Err():
199-
l.Debug().Err(err).Msg("client unsubscribed")
200-
return
201-
}
202-
}
193+
err := <-rpcSub.Err()
194+
l.Debug().Err(err).Msg("client unsubscribed")
203195
}()
204196

205197
l.Info().Msg("new heads subscription created")

0 commit comments

Comments
 (0)