Skip to content

Commit ed6a2c4

Browse files
authored
Merge pull request #6757 from The-K-R-O-K/UlyanaAndrukhiv/6639-ws-ping-pong
[Access] Implement keepalive routine with ping-ponging to ws connection in ws controller
2 parents 61a0b0e + cfad79f commit ed6a2c4

File tree

8 files changed

+634
-50
lines changed

8 files changed

+634
-50
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ generate-mocks: install-mock-generators
205205
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
206206
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
207207
mockery --name 'DataProviderFactory' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
208+
mockery --name 'WebsocketConnection' --dir="./engine/access/rest/websockets" --case=underscore --output="./engine/access/rest/websockets/mock" --outpkg="mock"
208209
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
209210
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
210211
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"

engine/access/rest/websockets/config.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,33 @@ import (
44
"time"
55
)
66

7+
const (
8+
// PingPeriod defines the interval at which ping messages are sent to the client.
9+
// This value must be less than pongWait, cause it that case the server ensures it sends a ping well before the PongWait
10+
// timeout elapses. Each new pong message resets the server's read deadline, keeping the connection alive as long as
11+
// the client is responsive.
12+
//
13+
// Example:
14+
// At t=9, the server sends a ping, initial read deadline is t=10 (for the first message)
15+
// At t=10, the client responds with a pong. The server resets its read deadline to t=20.
16+
// At t=18, the server sends another ping. If the client responds with a pong at t=19, the read deadline is extended to t=29.
17+
//
18+
// In case of failure:
19+
// If the client stops responding, the server will send a ping at t=9 but won't receive a pong by t=10. The server then closes the connection.
20+
PingPeriod = (PongWait * 9) / 10
21+
22+
// PongWait specifies the maximum time to wait for a pong response message from the peer
23+
// after sending a ping
24+
PongWait = 10 * time.Second
25+
26+
// WriteWait specifies a timeout for the write operation. If the write
27+
// isn't completed within this duration, it fails with a timeout error.
28+
// SetWriteDeadline ensures the write operation does not block indefinitely
29+
// if the client is slow or unresponsive. This prevents resource exhaustion
30+
// and allows the server to gracefully handle timeouts for delayed writes.
31+
WriteWait = 10 * time.Second
32+
)
33+
734
type Config struct {
835
MaxSubscriptionsPerConnection uint64
936
MaxResponsesPerSecond uint64
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package websockets
2+
3+
import (
4+
"time"
5+
6+
"github.com/gorilla/websocket"
7+
)
8+
9+
type WebsocketConnection interface {
10+
ReadJSON(v interface{}) error
11+
WriteJSON(v interface{}) error
12+
WriteControl(messageType int, deadline time.Time) error
13+
Close() error
14+
SetReadDeadline(deadline time.Time) error
15+
SetWriteDeadline(deadline time.Time) error
16+
SetPongHandler(h func(string) error)
17+
}
18+
19+
type WebsocketConnectionImpl struct {
20+
conn *websocket.Conn
21+
}
22+
23+
func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnectionImpl {
24+
return &WebsocketConnectionImpl{
25+
conn: conn,
26+
}
27+
}
28+
29+
var _ WebsocketConnection = (*WebsocketConnectionImpl)(nil)
30+
31+
func (c *WebsocketConnectionImpl) ReadJSON(v interface{}) error {
32+
return c.conn.ReadJSON(v)
33+
}
34+
35+
func (c *WebsocketConnectionImpl) WriteJSON(v interface{}) error {
36+
return c.conn.WriteJSON(v)
37+
}
38+
39+
func (c *WebsocketConnectionImpl) WriteControl(messageType int, deadline time.Time) error {
40+
return c.conn.WriteControl(messageType, nil, deadline)
41+
}
42+
43+
func (c *WebsocketConnectionImpl) Close() error {
44+
return c.conn.Close()
45+
}
46+
47+
func (c *WebsocketConnectionImpl) SetReadDeadline(deadline time.Time) error {
48+
return c.conn.SetReadDeadline(deadline)
49+
}
50+
51+
func (c *WebsocketConnectionImpl) SetWriteDeadline(deadline time.Time) error {
52+
return c.conn.SetWriteDeadline(deadline)
53+
}
54+
55+
func (c *WebsocketConnectionImpl) SetPongHandler(h func(string) error) {
56+
c.conn.SetPongHandler(h)
57+
}

engine/access/rest/websockets/controller.go

Lines changed: 125 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,33 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"time"
78

89
"github.com/google/uuid"
910
"github.com/gorilla/websocket"
1011
"github.com/rs/zerolog"
12+
"golang.org/x/sync/errgroup"
1113

1214
dp "github.com/onflow/flow-go/engine/access/rest/websockets/data_providers"
1315
"github.com/onflow/flow-go/engine/access/rest/websockets/models"
1416
"github.com/onflow/flow-go/utils/concurrentmap"
1517
)
1618

1719
type Controller struct {
18-
logger zerolog.Logger
19-
config Config
20-
conn *websocket.Conn
21-
communicationChannel chan interface{}
22-
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
23-
dataProviderFactory dp.DataProviderFactory
20+
logger zerolog.Logger
21+
config Config
22+
conn WebsocketConnection
23+
24+
communicationChannel chan interface{} // Channel for sending messages to the client.
25+
26+
dataProviders *concurrentmap.Map[uuid.UUID, dp.DataProvider]
27+
dataProviderFactory dp.DataProviderFactory
2428
}
2529

2630
func NewWebSocketController(
2731
logger zerolog.Logger,
2832
config Config,
29-
conn *websocket.Conn,
33+
conn WebsocketConnection,
3034
dataProviderFactory dp.DataProviderFactory,
3135
) *Controller {
3236
return &Controller{
@@ -39,62 +43,131 @@ func NewWebSocketController(
3943
}
4044
}
4145

42-
// HandleConnection manages the WebSocket connection, adding context and error handling.
46+
// HandleConnection manages the lifecycle of a WebSocket connection,
47+
// including setup, message processing, and graceful shutdown.
48+
//
49+
// Parameters:
50+
// - ctx: The context for controlling cancellation and timeouts.
4351
func (c *Controller) HandleConnection(ctx context.Context) {
44-
//TODO: configure the connection with ping-pong and deadlines
52+
defer c.shutdownConnection()
53+
54+
// configuring the connection with appropriate read/write deadlines and handlers.
55+
err := c.configureKeepalive()
56+
if err != nil {
57+
// TODO: add error handling here
58+
c.logger.Error().Err(err).Msg("error configuring keepalive connection")
59+
60+
return
61+
}
62+
4563
//TODO: spin up a response limit tracker routine
46-
go c.readMessagesFromClient(ctx)
47-
c.writeMessagesToClient(ctx)
64+
65+
// for track all goroutines and error handling
66+
g, gCtx := errgroup.WithContext(ctx)
67+
68+
g.Go(func() error {
69+
return c.readMessagesFromClient(gCtx)
70+
})
71+
72+
g.Go(func() error {
73+
return c.keepalive(gCtx)
74+
})
75+
76+
g.Go(func() error {
77+
return c.writeMessagesToClient(gCtx)
78+
})
79+
80+
if err = g.Wait(); err != nil {
81+
//TODO: add error handling here
82+
c.logger.Error().Err(err).Msg("error detected in one of the goroutines")
83+
}
84+
}
85+
86+
// configureKeepalive sets up the WebSocket connection with a read deadline
87+
// and a handler for receiving pong messages from the client.
88+
//
89+
// The function does the following:
90+
// 1. Sets an initial read deadline to ensure the server doesn't wait indefinitely
91+
// for a pong message from the client. If no message is received within the
92+
// specified `pongWait` duration, the connection will be closed.
93+
// 2. Establishes a Pong handler that resets the read deadline every time a pong
94+
// message is received from the client, allowing the server to continue waiting
95+
// for further pong messages within the new deadline.
96+
//
97+
// No errors are expected during normal operation.
98+
func (c *Controller) configureKeepalive() error {
99+
// Set the initial read deadline for the first pong message
100+
// The Pong handler itself only resets the read deadline after receiving a Pong.
101+
// It doesn't set an initial deadline. The initial read deadline is crucial to prevent the server from waiting
102+
// forever if the client doesn't send Pongs.
103+
if err := c.conn.SetReadDeadline(time.Now().Add(PongWait)); err != nil {
104+
return fmt.Errorf("failed to set the initial read deadline: %w", err)
105+
}
106+
// Establish a Pong handler which sets the handler for pong messages received from the peer.
107+
c.conn.SetPongHandler(func(string) error {
108+
return c.conn.SetReadDeadline(time.Now().Add(PongWait))
109+
})
110+
111+
return nil
48112
}
49113

50114
// writeMessagesToClient reads a messages from communication channel and passes them on to a client WebSocket connection.
51115
// The communication channel is filled by data providers. Besides, the response limit tracker is involved in
52116
// write message regulation
53-
func (c *Controller) writeMessagesToClient(ctx context.Context) {
54-
//TODO: can it run forever? maybe we should cancel the ctx in the reader routine
117+
//
118+
// No errors are expected during normal operation. All errors are considered benign.
119+
func (c *Controller) writeMessagesToClient(ctx context.Context) error {
55120
for {
56121
select {
57122
case <-ctx.Done():
58-
return
59-
case msg := <-c.communicationChannel:
123+
return nil
124+
case msg, ok := <-c.communicationChannel:
125+
if !ok {
126+
return fmt.Errorf("communication channel closed, no error occurred")
127+
}
60128
// TODO: handle 'response per second' limits
61129

130+
// Specifies a timeout for the write operation. If the write
131+
// isn't completed within this duration, it fails with a timeout error.
132+
// SetWriteDeadline ensures the write operation does not block indefinitely
133+
// if the client is slow or unresponsive. This prevents resource exhaustion
134+
// and allows the server to gracefully handle timeouts for delayed writes.
135+
if err := c.conn.SetWriteDeadline(time.Now().Add(WriteWait)); err != nil {
136+
return fmt.Errorf("failed to set the write deadline: %w", err)
137+
}
62138
err := c.conn.WriteJSON(msg)
63139
if err != nil {
64-
c.logger.Error().Err(err).Msg("error writing to connection")
140+
return fmt.Errorf("failed to write message to connection: %w", err)
65141
}
66142
}
67143
}
68144
}
69145

70146
// readMessagesFromClient continuously reads messages from a client WebSocket connection,
71147
// processes each message, and handles actions based on the message type.
72-
func (c *Controller) readMessagesFromClient(ctx context.Context) {
73-
defer c.shutdownConnection()
74-
148+
//
149+
// No errors are expected during normal operation. All errors are considered benign.
150+
func (c *Controller) readMessagesFromClient(ctx context.Context) error {
75151
for {
76152
select {
77153
case <-ctx.Done():
78-
c.logger.Info().Msg("context canceled, stopping read message loop")
79-
return
154+
return nil
80155
default:
81156
msg, err := c.readMessage()
82157
if err != nil {
83158
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) {
84-
return
159+
return nil
85160
}
86-
c.logger.Warn().Err(err).Msg("error reading message from client")
87-
return
161+
return fmt.Errorf("failed to read message from client: %w", err)
88162
}
89163

90-
baseMsg, validatedMsg, err := c.parseAndValidateMessage(msg)
164+
_, validatedMsg, err := c.parseAndValidateMessage(msg)
91165
if err != nil {
92-
c.logger.Debug().Err(err).Msg("error parsing and validating client message")
93-
return
166+
return fmt.Errorf("failed to parse and validate client message: %w", err)
94167
}
95168

96169
if err := c.handleAction(ctx, validatedMsg); err != nil {
97-
c.logger.Warn().Err(err).Str("action", baseMsg.Action).Msg("error handling action")
170+
return fmt.Errorf("failed to handle message action: %w", err)
98171
}
99172
}
100173
}
@@ -139,7 +212,6 @@ func (c *Controller) parseAndValidateMessage(message json.RawMessage) (models.Ba
139212
validatedMsg = listMsg
140213

141214
default:
142-
c.logger.Debug().Str("action", baseMsg.Action).Msg("unknown action type")
143215
return baseMsg, nil, fmt.Errorf("unknown action type: %s", baseMsg.Action)
144216
}
145217

@@ -202,12 +274,12 @@ func (c *Controller) handleListSubscriptions(ctx context.Context, msg models.Lis
202274
}
203275

204276
func (c *Controller) shutdownConnection() {
205-
defer close(c.communicationChannel)
206-
defer func(conn *websocket.Conn) {
277+
defer func() {
207278
if err := c.conn.Close(); err != nil {
208279
c.logger.Error().Err(err).Msg("error closing connection")
209280
}
210-
}(c.conn)
281+
// TODO: safe closing communicationChannel will be included as a part of PR #6642
282+
}()
211283

212284
err := c.dataProviders.ForEach(func(_ uuid.UUID, dp dp.DataProvider) error {
213285
dp.Close()
@@ -219,3 +291,24 @@ func (c *Controller) shutdownConnection() {
219291

220292
c.dataProviders.Clear()
221293
}
294+
295+
// keepalive sends a ping message periodically to keep the WebSocket connection alive
296+
// and avoid timeouts.
297+
//
298+
// No errors are expected during normal operation. All errors are considered benign.
299+
func (c *Controller) keepalive(ctx context.Context) error {
300+
pingTicker := time.NewTicker(PingPeriod)
301+
defer pingTicker.Stop()
302+
303+
for {
304+
select {
305+
case <-ctx.Done():
306+
return nil
307+
case <-pingTicker.C:
308+
err := c.conn.WriteControl(websocket.PingMessage, time.Now().Add(WriteWait))
309+
if err != nil {
310+
return fmt.Errorf("failed to write ping message: %w", err)
311+
}
312+
}
313+
}
314+
}

0 commit comments

Comments
 (0)