diff --git a/pkg/coordinator/tasks/tx_pool_clean/task.go b/pkg/coordinator/tasks/tx_pool_clean/task.go index bc6b2560..d533a680 100644 --- a/pkg/coordinator/tasks/tx_pool_clean/task.go +++ b/pkg/coordinator/tasks/tx_pool_clean/task.go @@ -69,14 +69,17 @@ func (t *Task) LoadConfig() error { return nil } -func (t *Task) Execute(ctx context.Context) error { +func (t *Task) Execute(_ context.Context) error { clientPool := t.ctx.Scheduler.GetServices().ClientPool() executionClients := clientPool.GetExecutionPool().GetReadyEndpoints(true) t.logger.Infof("Found %d execution clients", len(executionClients)) for _, client := range executionClients { - t.cleanRecursive(client) + err := t.cleanRecursive(client) + if err != nil { + return err + } } t.ctx.SetResult(types.TaskResultSuccess) diff --git a/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md b/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md index 52783061..2f33a9ab 100644 --- a/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md +++ b/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md @@ -12,8 +12,8 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing - **`tps`**: The total number of transactions to send in one second. -- **`duration_s`**: - The test duration (the number of transactions to send is calculated as `tps * duration_s`). +- **`durationS`**: + The test duration (the number of transactions to send is calculated as `tps * durationS`). - **`logInterval`**: The interval at which the script logs progress (e.g., every 100 transactions). @@ -38,10 +38,8 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing - name: tx_pool_latency_analysis config: tps: 1000 - duration_s: 10 + durationS: 10 logInterval: 1000 configVars: privateKey: "walletPrivkey" ``` - - diff --git a/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go b/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go index 3d2924f6..432e52ae 100644 --- a/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go +++ b/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go @@ -1,10 +1,10 @@ -package txpool_latency_analysis +package txpoollatencyanalysis type Config struct { PrivateKey string `yaml:"privateKey" json:"privateKey"` TPS int `yaml:"tps" json:"tps"` - Duration_s int `yaml:"duration_s" json:"duration_s"` + DurationS int `yaml:"durationS" json:"durationS"` LogInterval int `yaml:"logInterval" json:"logInterval"` SecondsBeforeRunning int64 `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"` } @@ -12,7 +12,7 @@ type Config struct { func DefaultConfig() Config { return Config{ TPS: 100, - Duration_s: 60, + DurationS: 60, LogInterval: 100, SecondsBeforeRunning: 0, } diff --git a/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go b/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go index 3d0c85e6..1cb72434 100644 --- a/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go +++ b/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go @@ -1,19 +1,19 @@ -package txpool_latency_analysis +package txpoollatencyanalysis import ( "context" + "crypto/rand" "encoding/json" "fmt" "math" - "math/rand" + "math/big" "os" "time" - "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool" - "github.com/ethereum/go-ethereum/crypto" "github.com/noku-team/assertoor/pkg/coordinator/types" "github.com/noku-team/assertoor/pkg/coordinator/utils/hdr" + txloadtool "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool" "github.com/noku-team/assertoor/pkg/coordinator/wallet" "github.com/sirupsen/logrus" ) @@ -66,8 +66,8 @@ func (t *Task) LoadConfig() error { return err } - if err := config.Validate(); err != nil { - return err + if validationErr := config.Validate(); validationErr != nil { + return validationErr } privKey, _ := crypto.HexToECDSA(config.PrivateKey) @@ -97,11 +97,16 @@ func (t *Task) Execute(ctx context.Context) error { return nil } - client := executionClients[rand.Intn(len(executionClients))] + n, err := rand.Int(rand.Reader, big.NewInt(int64(len(executionClients)))) + if err != nil { + return fmt.Errorf("failed to generate random number: %w", err) + } + + client := executionClients[n.Int64()] t.logger.Infof("Measuring TxPool transaction propagation *latency*") t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds", - client.GetName(), t.config.TPS, t.config.Duration_s) + client.GetName(), t.config.TPS, t.config.DurationS) // Wait for the specified seconds before starting the task if t.config.SecondsBeforeRunning > 0 { @@ -116,10 +121,10 @@ func (t *Task) Execute(ctx context.Context) error { } // Prepare to collect transaction latencies - var testDeadline = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second) + var testDeadline = time.Now().Add(time.Duration(t.config.DurationS+60*30) * time.Second) - load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client) - load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval) + loadTarget := txloadtool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client) + load := txloadtool.NewLoad(loadTarget, t.config.TPS, t.config.DurationS, testDeadline, t.config.LogInterval) // Generate and sending transactions, waiting for their propagation err = load.Execute() @@ -145,24 +150,30 @@ func (t *Task) Execute(ctx context.Context) error { } // Send txes to other clients, for speeding up tx mining - t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs)) + // todo: fix sending to other clients + // t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs)) - for _, tx := range result.Txs { - for _, otherClient := range executionClients { - if otherClient.GetName() == client.GetName() { - continue - } + // for _, tx := range result.Txs { + // for _, otherClient := range executionClients { + // if otherClient.GetName() == client.GetName() { + // continue + // } - otherClient.GetRPCClient().SendTransaction(ctx, tx) - } - } + // if sendErr := otherClient.GetRPCClient().SendTransaction(ctx, tx); sendErr != nil { + // t.logger.Errorf("Failed to send transaction to other client: %v", sendErr) + // t.ctx.SetResult(types.TaskResultFailure) + + // return sendErr + // } + // } + // } t.logger.Infof("Total transactions sent: %d", result.TotalTxs) // Calculate statistics - var maxLatency int64 = 0 + var maxLatency int64 - var minLatency int64 = math.MaxInt64 + minLatency := int64(math.MaxInt64) for _, lat := range result.LatenciesMus { if lat > maxLatency { @@ -178,9 +189,9 @@ func (t *Task) Execute(ctx context.Context) error { maxLatency, maxLatency/1000, minLatency, minLatency/1000) // Generate HDR plot - plot, err := hdr.HdrPlot(result.LatenciesMus) - if err != nil { - t.logger.Errorf("Failed to generate HDR plot: %v", err) + plot, plotErr := hdr.Plot(result.LatenciesMus) + if plotErr != nil { + t.logger.Errorf("Failed to generate HDR plot: %v", plotErr) t.ctx.SetResult(types.TaskResultFailure) return nil @@ -190,7 +201,7 @@ func (t *Task) Execute(ctx context.Context) error { plotFilePath := "tx_pool_latency_hdr_plot.csv" - err = os.WriteFile(plotFilePath, []byte(plot), 0644) + err = os.WriteFile(plotFilePath, []byte(plot), 0o600) if err != nil { t.logger.Errorf("Failed to write HDR plot to file: %v", err) t.ctx.SetResult(types.TaskResultFailure) diff --git a/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md b/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md index fb10e921..f8153096 100644 --- a/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md +++ b/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md @@ -12,8 +12,8 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p - **`tps`**: The total number of transactions to send in one second. -- **`duration_s`**: - The test duration (the number of transactions to send is calculated as `tps * duration_s`). +- **`durationS`**: + The test duration (the number of transactions to send is calculated as `tps * durationS`). - **`logInterval`**: The interval at which the script logs progress (e.g., every 100 transactions). @@ -32,7 +32,7 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p - name: tx_pool_throughput_analysis config: tps: 1000 - duration_s: 10 + durationS: 10 logInterval: 1000 configVars: privateKey: "walletPrivkey" diff --git a/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go b/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go index 80e516e8..3d9c6521 100644 --- a/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go +++ b/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go @@ -1,10 +1,10 @@ -package tx_pool_throughput_analysis +package txpoolthroughputanalysis type Config struct { PrivateKey string `yaml:"privateKey" json:"privateKey"` TPS int `yaml:"tps" json:"tps"` - Duration_s int `yaml:"duration_s" json:"duration_s"` + DurationS int `yaml:"durationS" json:"durationS"` LogInterval int `yaml:"logInterval" json:"logInterval"` SecondsBeforeRunning int `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"` } @@ -12,7 +12,7 @@ type Config struct { func DefaultConfig() Config { return Config{ TPS: 100, - Duration_s: 60, + DurationS: 60, LogInterval: 100, SecondsBeforeRunning: 0, } diff --git a/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go b/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go index 2884d10b..82651342 100644 --- a/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go +++ b/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go @@ -1,16 +1,16 @@ -package tx_pool_throughput_analysis +package txpoolthroughputanalysis import ( "context" + "crypto/rand" "encoding/json" "fmt" - "math/rand" + "math/big" "time" - "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool" - "github.com/ethereum/go-ethereum/crypto" "github.com/noku-team/assertoor/pkg/coordinator/types" + txloadtool "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool" "github.com/noku-team/assertoor/pkg/coordinator/wallet" "github.com/sirupsen/logrus" ) @@ -63,8 +63,8 @@ func (t *Task) LoadConfig() error { return err } - if err := config.Validate(); err != nil { - return err + if validationErr := config.Validate(); validationErr != nil { + return validationErr } privKey, _ := crypto.HexToECDSA(config.PrivateKey) @@ -94,11 +94,16 @@ func (t *Task) Execute(ctx context.Context) error { return nil } - client := executionClients[rand.Intn(len(executionClients))] + n, randErr := rand.Int(rand.Reader, big.NewInt(int64(len(executionClients)))) + if randErr != nil { + return fmt.Errorf("failed to generate random number: %w", randErr) + } + + client := executionClients[n.Int64()] t.logger.Infof("Measuring TxPool transaction propagation *throughput*") t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds", - client.GetName(), t.config.TPS, t.config.Duration_s) + client.GetName(), t.config.TPS, t.config.DurationS) // Wait for the specified seconds before starting the task if t.config.SecondsBeforeRunning > 0 { @@ -113,27 +118,27 @@ func (t *Task) Execute(ctx context.Context) error { } // Prepare to collect transaction latencies - var testDeadline = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second) + testDeadline := time.Now().Add(time.Duration(t.config.DurationS+60*30) * time.Second) - load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client) - load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval) + loadTarget := txloadtool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client) + load := txloadtool.NewLoad(loadTarget, t.config.TPS, t.config.DurationS, testDeadline, t.config.LogInterval) // Generate and sending transactions, waiting for their propagation - err = load.Execute() - if err != nil { - t.logger.Errorf("Error during transaction load execution: %v", err) + execErr := load.Execute() + if execErr != nil { + t.logger.Errorf("Error during transaction load execution: %v", execErr) t.ctx.SetResult(types.TaskResultFailure) - return err + return execErr } // Collect the transactions and their latencies - result, err := load.MeasurePropagationLatencies() - if err != nil { - t.logger.Errorf("Error measuring transaction propagation latencies: %v", err) + result, measureErr := load.MeasurePropagationLatencies() + if measureErr != nil { + t.logger.Errorf("Error measuring transaction propagation latencies: %v", measureErr) t.ctx.SetResult(types.TaskResultFailure) - return err + return measureErr } // Check if the context was cancelled or other errors occurred @@ -142,30 +147,35 @@ func (t *Task) Execute(ctx context.Context) error { } // Send txes to other clients, for speeding up tx mining - t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs)) + // t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs)) - for _, tx := range result.Txs { - for _, otherClient := range executionClients { - if otherClient.GetName() == client.GetName() { - continue - } + // for _, tx := range result.Txs { + // for _, otherClient := range executionClients { + // if otherClient.GetName() == client.GetName() { + // continue + // } - otherClient.GetRPCClient().SendTransaction(ctx, tx) - } - } + // if sendErr := otherClient.GetRPCClient().SendTransaction(ctx, tx); sendErr != nil { + // t.logger.Errorf("Failed to send transaction to other client: %v", sendErr) + // t.ctx.SetResult(types.TaskResultFailure) + + // return sendErr + // } + // } + // } t.logger.Infof("Total transactions sent: %d", result.TotalTxs) // Calculate statistics t.logger.Infof("Last measure delay since start time: %s", result.LastMeasureDelay) - processed_tx_per_second := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds() + processedTxPerSecond := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds() t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s", - result.TotalTxs, result.LastMeasureDelay.Seconds(), processed_tx_per_second) + result.TotalTxs, result.LastMeasureDelay.Seconds(), processedTxPerSecond) t.logger.Infof("Sent %d transactions in %.2fs", result.TotalTxs, result.LastMeasureDelay.Seconds()) - t.ctx.Outputs.SetVar("mean_tps_throughput", processed_tx_per_second) + t.ctx.Outputs.SetVar("mean_tps_throughput", processedTxPerSecond) t.ctx.Outputs.SetVar("tx_count", result.TotalTxs) t.ctx.Outputs.SetVar("duplicated_p2p_event_count", result.DuplicatedP2PEventCount) t.ctx.Outputs.SetVar("missed_p2p_event_count", result.NotReceivedP2PEventCount) @@ -175,7 +185,7 @@ func (t *Task) Execute(ctx context.Context) error { outputs := map[string]interface{}{ "tx_count": result.TotalTxs, - "mean_tps_throughput": processed_tx_per_second, + "mean_tps_throughput": processedTxPerSecond, "duplicated_p2p_event_count": result.DuplicatedP2PEventCount, "coordinated_omission_events_count": result.CoordinatedOmissionEventCount, "missed_p2p_event_count": result.NotReceivedP2PEventCount, diff --git a/pkg/coordinator/utils/hdr/plot.go b/pkg/coordinator/utils/hdr/plot.go index 5357b348..d4201cff 100644 --- a/pkg/coordinator/utils/hdr/plot.go +++ b/pkg/coordinator/utils/hdr/plot.go @@ -6,16 +6,23 @@ import ( "github.com/HdrHistogram/hdrhistogram-go" ) -// HdrPlot creates a percentile distribution plot from a slice of int64 values. +// Plot creates a percentile distribution plot from a slice of int64 values. // It returns the percentile distribution as a formatted string. -func HdrPlot(data []int64) (string, error) { +func Plot(data []int64) (string, error) { // Create a histogram with a resolution of 1 microsecond // The maximum value can be set according to your needs, here it's set to 30 million microseconds (30 seconds) histogram := hdrhistogram.New(1, 30*1000000, 5) // Add the data to the histogram for _, value := range data { - histogram.RecordValue(value) + if value < 0 { + continue + } + + err := histogram.RecordValue(value) + if err != nil { + return "", err + } } // Create a buffer to capture the output of the PercentilesPrint function diff --git a/pkg/coordinator/utils/sentry/chain.go b/pkg/coordinator/utils/sentry/chain.go index 9b8cbb1e..b7f47e1a 100644 --- a/pkg/coordinator/utils/sentry/chain.go +++ b/pkg/coordinator/utils/sentry/chain.go @@ -25,6 +25,7 @@ import ( "fmt" "io" "maps" + "math" "math/big" "os" "path/filepath" @@ -56,19 +57,17 @@ type Chain struct { // NewChain takes the given chain.rlp file, and decodes and returns // the blocks from the file. func NewChain(dir string) (*Chain, error) { - gen, err := loadGenesis(filepath.Join(dir, "genesis.json")) + gblock, gen, err := loadGenesis(filepath.Join(dir, "genesis.json")) if err != nil { return nil, err } - gblock := gen.ToBlock() - blocks, err := blocksFromFile(filepath.Join(dir, "chain.rlp"), gblock) if err != nil { return nil, err } - state, err := readState(filepath.Join(dir, "headstate.json")) + headState, err := readState(filepath.Join(dir, "headstate.json")) if err != nil { return nil, err } @@ -81,7 +80,7 @@ func NewChain(dir string) (*Chain, error) { return &Chain{ genesis: gen, blocks: blocks, - state: state, + state: headState, senders: accounts, config: gen.Config, }, nil @@ -104,7 +103,8 @@ func (c *Chain) AccountsInHashOrder() []state.DumpAccount { list := make([]state.DumpAccount, len(c.state)) i := 0 - for addr, acc := range c.state { + for addr := range c.state { + acc := c.state[addr] list[i] = acc list[i].Address = &addr @@ -124,13 +124,16 @@ func (c *Chain) AccountsInHashOrder() []state.DumpAccount { // CodeHashes returns all bytecode hashes contained in the head state. func (c *Chain) CodeHashes() []common.Hash { - var hashes []common.Hash + hashes := make([]common.Hash, 0, len(c.state)) seen := make(map[common.Hash]struct{}) seen[types.EmptyCodeHash] = struct{}{} - for _, acc := range c.state { + for addr := range c.state { + acc := c.state[addr] + h := common.BytesToHash(acc.CodeHash) + if _, ok := seen[h]; ok { continue } @@ -139,7 +142,7 @@ func (c *Chain) CodeHashes() []common.Hash { seen[h] = struct{}{} } - slices.SortFunc(hashes, (common.Hash).Cmp) + slices.SortFunc(hashes, common.Hash.Cmp) return hashes } @@ -151,7 +154,12 @@ func (c *Chain) Len() int { // ForkID gets the fork id of the chain. func (c *Chain) ForkID() forkid.ID { - return forkid.NewID(c.config, c.blocks[0], uint64(c.Len()), c.blocks[c.Len()-1].Time()) + chainLen := c.Len() + if chainLen < 0 { + panic("negative chain length") + } + + return forkid.NewID(c.config, c.blocks[0], uint64(chainLen), c.blocks[c.Len()-1].Time()) } // TD calculates the total difficulty of the chain at the @@ -176,10 +184,10 @@ func (c *Chain) RootAt(height int) common.Hash { // GetSender returns the address associated with account at the index in the // pre-funded accounts list. -func (c *Chain) GetSender(idx int) (common.Address, uint64) { +func (c *Chain) GetSender(idx int) (addr common.Address, nonce uint64) { accounts := slices.SortedFunc(maps.Keys(c.senders), common.Address.Cmp) - addr := accounts[idx] + addr = accounts[idx] return addr, c.senders[addr].Nonce } @@ -222,6 +230,10 @@ func (c *Chain) GetHeaders(req *eth.GetBlockHeadersPacket) ([]*types.Header, err return nil, errors.New("no block headers requested") } + if req.Amount > math.MaxInt { + return nil, errors.New("requested amount too large") + } + var ( headers = make([]*types.Header, req.Amount) blockNumber uint64 @@ -239,7 +251,7 @@ func (c *Chain) GetHeaders(req *eth.GetBlockHeadersPacket) ([]*types.Header, err } if req.Reverse { - for i := 1; i < int(req.Amount); i++ { + for i := uint64(1); i < req.Amount; i++ { blockNumber -= (1 - req.Skip) headers[i] = c.blocks[blockNumber].Header() } @@ -247,7 +259,7 @@ func (c *Chain) GetHeaders(req *eth.GetBlockHeadersPacket) ([]*types.Header, err return headers, nil } - for i := 1; i < int(req.Amount); i++ { + for i := uint64(1); i < req.Amount; i++ { blockNumber += (1 + req.Skip) headers[i] = c.blocks[blockNumber].Header() } @@ -268,18 +280,20 @@ func (c *Chain) Shorten(height int) *Chain { } } -func loadGenesis(genesisFile string) (core.Genesis, error) { +func loadGenesis(genesisFile string) (*types.Block, core.Genesis, error) { chainConfig, err := os.ReadFile(genesisFile) if err != nil { - return core.Genesis{}, err + return nil, core.Genesis{}, err } var gen core.Genesis if err := json.Unmarshal(chainConfig, &gen); err != nil { - return core.Genesis{}, err + return nil, core.Genesis{}, err } - return gen, nil + gblock := gen.ToBlock() + + return gblock, gen, nil } func blocksFromFile(chainfile string, gblock *types.Block) ([]*types.Block, error) { @@ -311,7 +325,12 @@ func blocksFromFile(chainfile string, gblock *types.Block) ([]*types.Block, erro return nil, fmt.Errorf("at block index %d: %v", i, err) } - if b.NumberU64() != uint64(i+1) { + expectedBlockNum := i + 1 + if expectedBlockNum < 0 { + return nil, fmt.Errorf("block index out of range: %d", i) + } + + if b.NumberU64() != uint64(expectedBlockNum) { return nil, fmt.Errorf("block at index %d has wrong number %d", i, b.NumberU64()) } @@ -322,28 +341,34 @@ func blocksFromFile(chainfile string, gblock *types.Block) ([]*types.Block, erro } func readState(file string) (map[common.Address]state.DumpAccount, error) { - f, err := os.ReadFile(file) + var dump state.Dump + + f, err := os.Open(file) if err != nil { - return nil, fmt.Errorf("unable to read state: %v", err) + return nil, err } + defer f.Close() - var dump state.Dump - if err := json.Unmarshal(f, &dump); err != nil { - return nil, fmt.Errorf("unable to unmarshal state: %v", err) + if err := json.NewDecoder(f).Decode(&dump); err != nil { + return nil, err } - state := make(map[common.Address]state.DumpAccount) + stateMap := make(map[common.Address]state.DumpAccount) + + for key := range dump.Accounts { + acct := dump.Accounts[key] - for key, acct := range dump.Accounts { var addr common.Address + if err := addr.UnmarshalText([]byte(key)); err != nil { - return nil, fmt.Errorf("invalid address %q", key) + panic(err) } - state[addr] = acct + acct.Address = &addr + stateMap[addr] = acct } - return state, nil + return stateMap, nil } func readAccounts(file string) (map[common.Address]*senderInfo, error) { diff --git a/pkg/coordinator/utils/sentry/conn.go b/pkg/coordinator/utils/sentry/conn.go index bcfd420e..d381dd70 100644 --- a/pkg/coordinator/utils/sentry/conn.go +++ b/pkg/coordinator/utils/sentry/conn.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "math/big" "net" "net/http" @@ -47,10 +48,13 @@ type Conn struct { } // Read reads a packet from the connection. -func (c *Conn) Read() (uint64, []byte, error) { - c.SetReadDeadline(time.Now().Add(timeout)) +func (c *Conn) Read() (code uint64, data []byte, err error) { + err = c.SetReadDeadline(time.Now().Add(timeout)) + if err != nil { + return 0, nil, err + } - code, data, _, err := c.Conn.Read() + code, data, _, err = c.Conn.Read() if err != nil { return 0, nil, err } @@ -60,7 +64,10 @@ func (c *Conn) Read() (uint64, []byte, error) { // ReadMsg attempts to read a devp2p message with a specific code. func (c *Conn) ReadMsg(proto Proto, code uint64, msg any) error { - c.SetReadDeadline(time.Now().Add(timeout)) + err := c.SetReadDeadline(time.Now().Add(timeout)) + if err != nil { + return err + } for { got, data, err := c.Read() @@ -76,7 +83,10 @@ func (c *Conn) ReadMsg(proto Proto, code uint64, msg any) error { // Write writes a eth packet to the connection. func (c *Conn) Write(proto Proto, code uint64, msg any) error { - c.SetWriteDeadline(time.Now().Add(timeout)) + err := c.SetWriteDeadline(time.Now().Add(timeout)) + if err != nil { + return err + } payload, err := rlp.EncodeToBytes(msg) if err != nil { @@ -92,7 +102,10 @@ var errDisc error = fmt.Errorf("disconnect") // ReadEth reads an Eth sub-protocol wire message. func (c *Conn) ReadEth() (any, error) { - c.SetReadDeadline(time.Now().Add(timeout)) + err := c.SetReadDeadline(time.Now().Add(timeout)) + if err != nil { + return nil, err + } for { code, data, _, err := c.Conn.Read() @@ -105,7 +118,10 @@ func (c *Conn) ReadEth() (any, error) { } if code == pingMsg { - c.Write(baseProto, pongMsg, []byte{}) + if err := c.Write(baseProto, pongMsg, []byte{}); err != nil { + return nil, fmt.Errorf("failed to write pong: %v", err) + } + continue } @@ -118,6 +134,10 @@ func (c *Conn) ReadEth() (any, error) { var msg any + if code > math.MaxInt { + return nil, fmt.Errorf("message code too large: %d", code) + } + switch int(code) { case eth.StatusMsg: msg = new(eth.StatusPacket) @@ -155,12 +175,12 @@ func (c *Conn) ReadEth() (any, error) { // peer performs both the protocol handshake and the status message // exchange with the node in order to peer with it. -func (c *Conn) Peer(chainId *big.Int, genesisHash common.Hash, headHash common.Hash, forkId forkid.ID, status *eth.StatusPacket) error { +func (c *Conn) Peer(chainID *big.Int, genesisHash, headHash common.Hash, forkID forkid.ID, status *eth.StatusPacket) error { if err := c.handshake(); err != nil { return fmt.Errorf("handshake failed: %v", err) } - if err := c.statusExchange(chainId, genesisHash, headHash, forkId, status); err != nil { + if err := c.statusExchange(chainID, genesisHash, headHash, forkID, status); err != nil { return fmt.Errorf("status exchange failed: %v", err) } @@ -240,7 +260,7 @@ func (c *Conn) negotiateEthProtocol(caps []p2p.Cap) { } // statusExchange performs a `Status` message exchange with the given node. -func (c *Conn) statusExchange(chainId *big.Int, genesisHash common.Hash, headHash common.Hash, forkId forkid.ID, status *eth.StatusPacket) error { +func (c *Conn) statusExchange(chainID *big.Int, genesisHash, headHash common.Hash, forkID forkid.ID, status *eth.StatusPacket) error { loop: for { code, data, err := c.Read() @@ -253,21 +273,29 @@ loop: if err := rlp.DecodeBytes(data, &msg); err != nil { return fmt.Errorf("error decoding status packet: %w", err) } - if have, want := msg.ProtocolVersion, c.ourHighestProtoVersion; have != uint32(want) { + if c.ourHighestProtoVersion > math.MaxUint32 { + return fmt.Errorf("protocol version too large: %d", c.ourHighestProtoVersion) + } + if have, want := msg.ProtocolVersion, uint32(c.ourHighestProtoVersion); have != want { return fmt.Errorf("wrong protocol version: have %v, want %v", have, want) } fmt.Println("status msg", msg) break loop case discMsg: var msg []p2p.DiscReason - if rlp.DecodeBytes(data, &msg); len(msg) == 0 { + if err := rlp.DecodeBytes(data, &msg); err != nil { + return fmt.Errorf("failed to decode disconnect message: %v", err) + } + if len(msg) == 0 { return errors.New("invalid disconnect message") } return fmt.Errorf("disconnect received: %v", pretty.Sdump(msg)) case pingMsg: // TODO (renaynay): in the future, this should be an error // (PINGs should not be a response upon fresh connection) - c.Write(baseProto, pongMsg, nil) + if err := c.Write(baseProto, pongMsg, nil); err != nil { + return fmt.Errorf("failed to write pong: %v", err) + } default: return fmt.Errorf("bad status message: code %d", code) } @@ -278,14 +306,17 @@ loop: } if status == nil { + if c.negotiatedProtoVersion > math.MaxUint32 { + return fmt.Errorf("negotiated protocol version too large: %d", c.negotiatedProtoVersion) + } // default status message status = ð.StatusPacket{ ProtocolVersion: uint32(c.negotiatedProtoVersion), - NetworkID: chainId.Uint64(), + NetworkID: chainID.Uint64(), TD: new(big.Int).SetUint64(0), Head: headHash, Genesis: genesisHash, - ForkID: forkId, + ForkID: forkID, } } @@ -299,7 +330,7 @@ loop: // readUntil reads eth protocol messages until a message of the target type is // received. It returns an error if there is a disconnect, timeout expires, // or if the context is cancelled before a message of the desired type can be read. -func readUntil[T any](conn *Conn, ctx context.Context) (*T, error) { +func readUntil[T any](ctx context.Context, conn *Conn) (*T, error) { resultCh := make(chan *T, 1) errCh := make(chan error, 1) @@ -318,8 +349,7 @@ func readUntil[T any](conn *Conn, ctx context.Context) (*T, error) { continue } - switch res := received.(type) { - case *T: + if res, ok := received.(*T); ok { resultCh <- res return } @@ -338,7 +368,7 @@ func readUntil[T any](conn *Conn, ctx context.Context) (*T, error) { // readTransactionMessages reads transaction messages from the connection. // The timeout parameter is optional - if provided and > 0, the function will timeout after the specified duration. -func (conn *Conn) ReadTransactionMessages(timeout ...time.Duration) (*eth.TransactionsPacket, error) { +func (c *Conn) ReadTransactionMessages(timeout ...time.Duration) (*eth.TransactionsPacket, error) { ctx := context.Background() if len(timeout) > 0 && timeout[0] > 0 { @@ -347,7 +377,7 @@ func (conn *Conn) ReadTransactionMessages(timeout ...time.Duration) (*eth.Transa defer cancel() } - return readUntil[eth.TransactionsPacket](conn, ctx) + return readUntil[eth.TransactionsPacket](ctx, c) } // dialAs attempts to dial a given node and perform a handshake using the generated @@ -383,9 +413,9 @@ func dialAs(remoteAddress string) (*Conn, error) { return &conn, nil } -// GetTcpConn dials the TCP wire eth connection to the given client retrieving the +// GetTCPConn dials the TCP wire eth connection to the given client retrieving the // node information using the `admin_nodeInfo` method. -func GetTcpConn(client *execution.Client) (*Conn, error) { +func GetTCPConn(client *execution.Client) (*Conn, error) { r, err := http.Post(client.GetEndpointConfig().URL, "application/json", strings.NewReader( `{"jsonrpc":"2.0","method":"admin_nodeInfo","params":[],"id":1}`, )) diff --git a/pkg/coordinator/utils/sentry/udp.go b/pkg/coordinator/utils/sentry/udp.go index 1bf6345c..e46a0c2a 100644 --- a/pkg/coordinator/utils/sentry/udp.go +++ b/pkg/coordinator/utils/sentry/udp.go @@ -5,6 +5,7 @@ import ( "crypto/ecdsa" "errors" "fmt" + "math" "net" "time" @@ -15,7 +16,7 @@ import ( ) type ( - MessageId int32 + MessageID int32 sentryenv struct { endpoint net.PacketConn @@ -34,15 +35,21 @@ const ( waitTime = 300 * time.Millisecond ) -func BasicPing(remoteAddress string, rpcAdmin string, logger logrus.FieldLogger) { - te := ConnectToP2p(remoteAddress, rpcAdmin, logger) +func BasicPing(remoteAddress, rpcAdmin string, logger logrus.FieldLogger) { + te := connectToP2p(remoteAddress, rpcAdmin, logger) defer te.close() + expiration := time.Now().Add(20 * time.Second).Unix() + if expiration < 0 { + logger.Errorf("Invalid expiration time: %d", expiration) + return + } + pingHash := te.send(&v4wire.Ping{ Version: 4, From: te.localEndpoint(), To: te.remoteEndpoint(), - Expiration: uint64(time.Now().Add(20 * time.Second).Unix()), + Expiration: uint64(expiration), }) if err := te.checkPingPong(pingHash); err != nil { logger.Errorf("PingPong failed: %v", err) @@ -50,7 +57,14 @@ func BasicPing(remoteAddress string, rpcAdmin string, logger logrus.FieldLogger) } func (te *sentryenv) localEndpoint() v4wire.Endpoint { - addr := te.endpoint.LocalAddr().(*net.UDPAddr) + addr, ok := te.endpoint.LocalAddr().(*net.UDPAddr) + if !ok { + panic("expected UDP address") + } + + if addr.Port < 0 || addr.Port > math.MaxUint16 { + panic(fmt.Sprintf("port out of range: %d", addr.Port)) + } return v4wire.Endpoint{ IP: addr.IP.To4(), @@ -63,8 +77,8 @@ func (te *sentryenv) remoteEndpoint() v4wire.Endpoint { return v4wire.NewEndpoint(te.remoteAddr.AddrPort(), 0) } -func (env *sentryenv) close() { - env.endpoint.Close() +func (te *sentryenv) close() { + te.endpoint.Close() } func (te *sentryenv) send(req v4wire.Packet) []byte { @@ -144,7 +158,11 @@ func (te *sentryenv) checkPong(reply v4wire.Packet, pingHash []byte) error { return fmt.Errorf("expected PONG reply, got %v %v", reply.Name(), reply) } - pong := reply.(*v4wire.Pong) + pong, ok := reply.(*v4wire.Pong) + if !ok { + return errors.New("failed to cast reply to *v4wire.Pong") + } + if !bytes.Equal(pong.ReplyTok, pingHash) { return fmt.Errorf("PONG reply token mismatch: got %x, want %x", pong.ReplyTok, pingHash) } @@ -214,7 +232,7 @@ func (te *sentryenv) checkPong(reply v4wire.Packet, pingHash []byte) error { // return ready, nil // } -func ConnectToP2p(remoteAddress string, rpcAdmin string, logger logrus.FieldLogger) *sentryenv { +func connectToP2p(remoteAddress, rpcAdmin string, logger logrus.FieldLogger) *sentryenv { endpoint, err := net.ListenPacket("udp", "0.0.0.0:0") if err != nil { logger.Errorf("Failed to listen: %v", err) diff --git a/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go b/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go index 81c997e3..26fa4706 100644 --- a/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go +++ b/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go @@ -1,4 +1,4 @@ -package tx_load_tool +package txloadtool import ( "context" @@ -7,10 +7,10 @@ import ( "math/big" "time" - "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/core/forkid" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/noku-team/assertoor/pkg/coordinator/clients/execution" "github.com/noku-team/assertoor/pkg/coordinator/helper" @@ -21,22 +21,22 @@ import ( ) type LoadTarget struct { - ctx context.Context - task_ctx *types.TaskContext - wallet *wallet.Wallet - logger logrus.FieldLogger - client *execution.Client + ctx context.Context + taskCtx *types.TaskContext + wallet *wallet.Wallet + logger logrus.FieldLogger + client *execution.Client } // NewLoadTarget creates a new LoadTarget instance -func NewLoadTarget(ctx context.Context, task_ctx *types.TaskContext, logger logrus.FieldLogger, - wallet *wallet.Wallet, client *execution.Client) *LoadTarget { +func NewLoadTarget(ctx context.Context, taskCtx *types.TaskContext, logger logrus.FieldLogger, + w *wallet.Wallet, client *execution.Client) *LoadTarget { return &LoadTarget{ - ctx: ctx, - task_ctx: task_ctx, - wallet: wallet, - logger: logger, - client: client, + ctx: ctx, + taskCtx: taskCtx, + wallet: w, + logger: logger, + client: client, } } @@ -74,20 +74,20 @@ type Load struct { target *LoadTarget testDeadline time.Time TPS int - Duration_s int + DurationS int LogInterval int Result *LoadResult } // NewLoad creates a new Load instance -func NewLoad(target *LoadTarget, TPS int, duration_s int, testDeadline time.Time, logInterval int) *Load { +func NewLoad(target *LoadTarget, tps, durationS int, testDeadline time.Time, logInterval int) *Load { return &Load{ target: target, - TPS: TPS, - Duration_s: duration_s, + TPS: tps, + DurationS: durationS, testDeadline: testDeadline, LogInterval: logInterval, - Result: NewLoadResult(TPS * duration_s), + Result: NewLoadResult(tps * durationS), } } @@ -105,7 +105,7 @@ func (l *Load) Execute() error { time.Sleep(100 * time.Millisecond) l.Result.StartTime = time.Now() - endTime := l.Result.StartTime.Add(time.Second * time.Duration(l.Duration_s)) + endTime := l.Result.StartTime.Add(time.Second * time.Duration(l.DurationS)) l.target.logger.Infof("Starting transaction generation at %s", l.Result.StartTime) // Generate and send transactions @@ -121,7 +121,7 @@ func (l *Load) Execute() error { tx, err := l.target.generateTransaction(i) if err != nil { l.target.logger.Errorf("Failed to create transaction: %v", err) - l.target.task_ctx.SetResult(types.TaskResultFailure) + l.target.taskCtx.SetResult(types.TaskResultFailure) l.Result.Failed = true return @@ -133,7 +133,7 @@ func (l *Load) Execute() error { if err != nil { if !l.Result.Failed { l.target.logger.WithField("client", l.target.client.GetName()).Errorf("Failed to send transaction: %v", err) - l.target.task_ctx.SetResult(types.TaskResultFailure) + l.target.taskCtx.SetResult(types.TaskResultFailure) l.Result.Failed = true } @@ -186,10 +186,10 @@ func (l *Load) Execute() error { // MeasurePropagationLatencies reads P2P events and calculates propagation latencies for each transaction func (l *Load) MeasurePropagationLatencies() (*LoadResult, error) { // Get a P2P connection to read events - conn, err := l.target.getTcpConn() + conn, err := l.target.getTCPConn() if err != nil { l.target.logger.Errorf("Failed to get P2P connection: %v", err) - l.target.task_ctx.SetResult(types.TaskResultFailure) + l.target.taskCtx.SetResult(types.TaskResultFailure) return l.Result, fmt.Errorf("measurement stopped: failed to get P2P connection") } @@ -208,7 +208,7 @@ func (l *Load) MeasurePropagationLatencies() (*LoadResult, error) { } l.target.logger.Errorf("Failed reading p2p events: %v", err) - l.target.task_ctx.SetResult(types.TaskResultFailure) + l.target.taskCtx.SetResult(types.TaskResultFailure) l.Result.Failed = true return l.Result, fmt.Errorf("measurement stopped: failed reading p2p events") @@ -220,33 +220,33 @@ func (l *Load) MeasurePropagationLatencies() (*LoadResult, error) { } for i, tx := range *txes { - tx_data := tx.Data() - // read tx_data that is in the format "tx_index:" - var tx_index int + txData := tx.Data() + // read txData that is in the format "txIndex:" + var txIndex int - _, err := fmt.Sscanf(string(tx_data), "tx_index:%d", &tx_index) + _, err := fmt.Sscanf(string(txData), "txIndex:%d", &txIndex) if err != nil { l.target.logger.Errorf("Failed to parse transaction data: %v", err) - l.target.task_ctx.SetResult(types.TaskResultFailure) + l.target.taskCtx.SetResult(types.TaskResultFailure) l.Result.Failed = true return l.Result, fmt.Errorf("measurement stopped: failed to parse transaction data at event %d", i) } - if tx_index < 0 || tx_index >= l.Result.TotalTxs { - l.target.logger.Errorf("Transaction index out of range: %d", tx_index) - l.target.task_ctx.SetResult(types.TaskResultFailure) + if txIndex < 0 || txIndex >= l.Result.TotalTxs { + l.target.logger.Errorf("Transaction index out of range: %d", txIndex) + l.target.taskCtx.SetResult(types.TaskResultFailure) l.Result.Failed = true return l.Result, fmt.Errorf("measurement stopped: transaction index out of range at event %d", i) } // log the duplicated p2p events, and count duplicated p2p events - if l.Result.LatenciesMus[tx_index] != 0 { + if l.Result.LatenciesMus[txIndex] != 0 { l.Result.DuplicatedP2PEventCount++ } else { l.Result.LastMeasureDelay = time.Since(l.Result.StartTime) - l.Result.LatenciesMus[tx_index] = time.Since(l.Result.TxStartTime[tx_index]).Microseconds() + l.Result.LatenciesMus[txIndex] = time.Since(l.Result.TxStartTime[txIndex]).Microseconds() receivedEvents++ } @@ -316,12 +316,12 @@ func (l *Load) MeasurePropagationLatencies() (*LoadResult, error) { return l.Result, nil } -func (t *LoadTarget) getTcpConn() (*sentry.Conn, error) { +func (t *LoadTarget) getTCPConn() (*sentry.Conn, error) { chainConfig := params.AllDevChainProtocolChanges head, err := t.client.GetRPCClient().GetLatestBlock(t.ctx) if err != nil { - t.task_ctx.SetResult(types.TaskResultFailure) + t.taskCtx.SetResult(types.TaskResultFailure) return nil, err } @@ -335,23 +335,23 @@ func (t *LoadTarget) getTcpConn() (*sentry.Conn, error) { genesis, err := t.client.GetRPCClient().GetEthClient().BlockByNumber(t.ctx, new(big.Int).SetUint64(0)) if err != nil { t.logger.Errorf("Failed to fetch genesis block: %v", err) - t.task_ctx.SetResult(types.TaskResultFailure) + t.taskCtx.SetResult(types.TaskResultFailure) return nil, err } - conn, err := sentry.GetTcpConn(t.client) + conn, err := sentry.GetTCPConn(t.client) if err != nil { t.logger.Errorf("Failed to get TCP connection: %v", err) - t.task_ctx.SetResult(types.TaskResultFailure) + t.taskCtx.SetResult(types.TaskResultFailure) return nil, err } - forkId := forkid.NewID(chainConfig, genesis, head.NumberU64(), head.Time()) + forkID := forkid.NewID(chainConfig, genesis, head.NumberU64(), head.Time()) // handshake - err = conn.Peer(chainConfig.ChainID, genesis.Hash(), head.Hash(), forkId, nil) + err = conn.Peer(chainConfig.ChainID, genesis.Hash(), head.Hash(), forkID, nil) if err != nil { return nil, err } @@ -372,14 +372,14 @@ func (t *LoadTarget) generateTransaction(i int) (*ethtypes.Transaction, error) { tipCap := &helper.BigInt{Value: *big.NewInt(1000000000)} // 1 Gwei txObj := ðtypes.DynamicFeeTx{ - ChainID: t.task_ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), + ChainID: t.taskCtx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), Nonce: nonce, GasTipCap: &tipCap.Value, GasFeeCap: &feeCap.Value, Gas: 50000, To: toAddr, Value: txAmount, - Data: []byte(fmt.Sprintf("tx_index:%d", i)), + Data: []byte(fmt.Sprintf("txIndex:%d", i)), } return ethtypes.NewTx(txObj), nil diff --git a/playbooks/dev/tx-pool-check-short.yaml b/playbooks/dev/tx-pool-check-short.yaml index 57d1f59c..4dee7a50 100644 --- a/playbooks/dev/tx-pool-check-short.yaml +++ b/playbooks/dev/tx-pool-check-short.yaml @@ -10,7 +10,7 @@ tasks: timeout: 30m config: tps: 2000 - duration_s: 5 + durationS: 5 logInterval: 1000 configVars: privateKey: "walletPrivkey" @@ -24,7 +24,7 @@ tasks: timeout: 30m config: tps: 2000 - duration_s: 5 + durationS: 5 logInterval: 1000 configVars: privateKey: "walletPrivkey" \ No newline at end of file diff --git a/playbooks/dev/tx-pool-check.yaml b/playbooks/dev/tx-pool-check.yaml index d3a1e905..25fc3e42 100644 --- a/playbooks/dev/tx-pool-check.yaml +++ b/playbooks/dev/tx-pool-check.yaml @@ -10,7 +10,7 @@ tasks: timeout: 5m config: tps: 1000 - duration_s: 10 + durationS: 10 logInterval: 1000 configVars: privateKey: "walletPrivkey" @@ -24,7 +24,7 @@ tasks: title: "Check transaction pool throughput with 1.000 transactions in one second" config: tps: 1000 - duration_s: 10 + durationS: 10 logInterval: 1000 configVars: privateKey: "walletPrivkey" @@ -38,7 +38,7 @@ tasks: timeout: 5m config: tps: 5000 - duration_s: 5 + durationS: 5 logInterval: 2000 configVars: privateKey: "walletPrivkey" @@ -52,7 +52,7 @@ tasks: title: "Check transaction pool throughput with 5.000 transactions in one second" config: tps: 5000 - duration_s: 5 + durationS: 5 logInterval: 2000 configVars: privateKey: "walletPrivkey"