Skip to content

Commit e6fb7ad

Browse files
Merge branch 'master' into janez/expose-node-component-management
2 parents 85fd812 + aedb8dc commit e6fb7ad

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3917
-154
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ generate-mocks: install-mock-generators
205205
mockery --name 'BlockTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
206206
mockery --name 'DataProvider' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
207207
mockery --name 'DataProviderFactory' --dir="./engine/access/rest/websockets/data_providers" --case=underscore --output="./engine/access/rest/websockets/data_providers/mock" --outpkg="mock"
208+
mockery --name 'WebsocketConnection' --dir="./engine/access/rest/websockets" --case=underscore --output="./engine/access/rest/websockets/mock" --outpkg="mock"
208209
mockery --name 'ExecutionDataTracker' --dir="./engine/access/subscription" --case=underscore --output="./engine/access/subscription/mock" --outpkg="mock"
209210
mockery --name 'ConnectionFactory' --dir="./engine/access/rpc/connection" --case=underscore --output="./engine/access/rpc/connection/mock" --outpkg="mock"
210211
mockery --name 'Communicator' --dir="./engine/access/rpc/backend" --case=underscore --output="./engine/access/rpc/backend/mock" --outpkg="mock"
@@ -215,6 +216,7 @@ generate-mocks: install-mock-generators
215216
mockery --name 'Storage' --dir=module/executiondatasync/tracker --case=underscore --output="module/executiondatasync/tracker/mock" --outpkg="mocktracker"
216217
mockery --name 'ScriptExecutor' --dir=module/execution --case=underscore --output="module/execution/mock" --outpkg="mock"
217218
mockery --name 'StorageSnapshot' --dir=fvm/storage/snapshot --case=underscore --output="fvm/storage/snapshot/mock" --outpkg="mock"
219+
mockery --name 'WebsocketConnection' --dir=engine/access/rest/websockets --case=underscore --output="engine/access/rest/websockets/mock" --outpkg="mock"
218220

219221
#temporarily make insecure/ a non-module to allow mockery to create mocks
220222
mv insecure/go.mod insecure/go2.mod

cmd/bootstrap/utils/md5.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package utils
22

33
// The google storage API only provides md5 and crc32 hence overriding the linter flag for md5
44
import (
5-
"crypto/md5" //nolint:gosec
5+
// #nosec
6+
"crypto/md5"
67
"io"
78
"os"
89
)

cmd/util/cmd/verify_execution_result/cmd.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ var (
1818
flagChunkDataPackDir string
1919
flagChain string
2020
flagFromTo string
21+
flagWorkerCount uint // number of workers to verify the blocks concurrently
2122
)
2223

2324
// # verify the last 100 sealed blocks
@@ -47,33 +48,47 @@ func init() {
4748

4849
Cmd.Flags().StringVar(&flagFromTo, "from_to", "",
4950
"the height range to verify blocks (inclusive), i.e, 1-1000, 1000-2000, 2000-3000, etc.")
51+
52+
Cmd.Flags().UintVar(&flagWorkerCount, "worker_count", 1,
53+
"number of workers to use for verification, default is 1")
54+
5055
}
5156

5257
func run(*cobra.Command, []string) {
5358
chainID := flow.ChainID(flagChain)
5459
_ = chainID.Chain()
5560

61+
if flagWorkerCount < 1 {
62+
log.Fatal().Msgf("worker count must be at least 1, but got %v", flagWorkerCount)
63+
}
64+
65+
lg := log.With().
66+
Str("chain", string(chainID)).
67+
Str("datadir", flagDatadir).
68+
Str("chunk_data_pack_dir", flagChunkDataPackDir).
69+
Logger()
70+
5671
if flagFromTo != "" {
5772
from, to, err := parseFromTo(flagFromTo)
5873
if err != nil {
59-
log.Fatal().Err(err).Msg("could not parse from_to")
74+
lg.Fatal().Err(err).Msg("could not parse from_to")
6075
}
6176

62-
log.Info().Msgf("verifying range from %d to %d", from, to)
63-
err = verifier.VerifyRange(from, to, chainID, flagDatadir, flagChunkDataPackDir)
77+
lg.Info().Msgf("verifying range from %d to %d", from, to)
78+
err = verifier.VerifyRange(from, to, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount)
6479
if err != nil {
65-
log.Fatal().Err(err).Msgf("could not verify range from %d to %d", from, to)
80+
lg.Fatal().Err(err).Msgf("could not verify range from %d to %d", from, to)
6681
}
67-
log.Info().Msgf("successfully verified range from %d to %d", from, to)
82+
lg.Info().Msgf("successfully verified range from %d to %d", from, to)
6883

6984
} else {
70-
log.Info().Msgf("verifying last %d sealed blocks", flagLastK)
71-
err := verifier.VerifyLastKHeight(flagLastK, chainID, flagDatadir, flagChunkDataPackDir)
85+
lg.Info().Msgf("verifying last %d sealed blocks", flagLastK)
86+
err := verifier.VerifyLastKHeight(flagLastK, chainID, flagDatadir, flagChunkDataPackDir, flagWorkerCount)
7287
if err != nil {
73-
log.Fatal().Err(err).Msg("could not verify last k height")
88+
lg.Fatal().Err(err).Msg("could not verify last k height")
7489
}
7590

76-
log.Info().Msgf("successfully verified last %d sealed blocks", flagLastK)
91+
lg.Info().Msgf("successfully verified last %d sealed blocks", flagLastK)
7792
}
7893
}
7994

engine/access/rest/websockets/config.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,33 @@ import (
44
"time"
55
)
66

7+
const (
8+
// PingPeriod defines the interval at which ping messages are sent to the client.
9+
// This value must be less than pongWait, cause it that case the server ensures it sends a ping well before the PongWait
10+
// timeout elapses. Each new pong message resets the server's read deadline, keeping the connection alive as long as
11+
// the client is responsive.
12+
//
13+
// Example:
14+
// At t=9, the server sends a ping, initial read deadline is t=10 (for the first message)
15+
// At t=10, the client responds with a pong. The server resets its read deadline to t=20.
16+
// At t=18, the server sends another ping. If the client responds with a pong at t=19, the read deadline is extended to t=29.
17+
//
18+
// In case of failure:
19+
// If the client stops responding, the server will send a ping at t=9 but won't receive a pong by t=10. The server then closes the connection.
20+
PingPeriod = (PongWait * 9) / 10
21+
22+
// PongWait specifies the maximum time to wait for a pong response message from the peer
23+
// after sending a ping
24+
PongWait = 10 * time.Second
25+
26+
// WriteWait specifies a timeout for the write operation. If the write
27+
// isn't completed within this duration, it fails with a timeout error.
28+
// SetWriteDeadline ensures the write operation does not block indefinitely
29+
// if the client is slow or unresponsive. This prevents resource exhaustion
30+
// and allows the server to gracefully handle timeouts for delayed writes.
31+
WriteWait = 10 * time.Second
32+
)
33+
734
type Config struct {
835
MaxSubscriptionsPerConnection uint64
936
MaxResponsesPerSecond uint64
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package websockets
2+
3+
import (
4+
"time"
5+
6+
"github.com/gorilla/websocket"
7+
)
8+
9+
type WebsocketConnection interface {
10+
ReadJSON(v interface{}) error
11+
WriteJSON(v interface{}) error
12+
WriteControl(messageType int, deadline time.Time) error
13+
Close() error
14+
SetReadDeadline(deadline time.Time) error
15+
SetWriteDeadline(deadline time.Time) error
16+
SetPongHandler(h func(string) error)
17+
}
18+
19+
type WebsocketConnectionImpl struct {
20+
conn *websocket.Conn
21+
}
22+
23+
func NewWebsocketConnection(conn *websocket.Conn) *WebsocketConnectionImpl {
24+
return &WebsocketConnectionImpl{
25+
conn: conn,
26+
}
27+
}
28+
29+
var _ WebsocketConnection = (*WebsocketConnectionImpl)(nil)
30+
31+
func (c *WebsocketConnectionImpl) ReadJSON(v interface{}) error {
32+
return c.conn.ReadJSON(v)
33+
}
34+
35+
func (c *WebsocketConnectionImpl) WriteJSON(v interface{}) error {
36+
return c.conn.WriteJSON(v)
37+
}
38+
39+
func (c *WebsocketConnectionImpl) WriteControl(messageType int, deadline time.Time) error {
40+
return c.conn.WriteControl(messageType, nil, deadline)
41+
}
42+
43+
func (c *WebsocketConnectionImpl) Close() error {
44+
return c.conn.Close()
45+
}
46+
47+
func (c *WebsocketConnectionImpl) SetReadDeadline(deadline time.Time) error {
48+
return c.conn.SetReadDeadline(deadline)
49+
}
50+
51+
func (c *WebsocketConnectionImpl) SetWriteDeadline(deadline time.Time) error {
52+
return c.conn.SetWriteDeadline(deadline)
53+
}
54+
55+
func (c *WebsocketConnectionImpl) SetPongHandler(h func(string) error) {
56+
c.conn.SetPongHandler(h)
57+
}

0 commit comments

Comments
 (0)