diff --git a/pkg/coordinator/tasks/tasks.go b/pkg/coordinator/tasks/tasks.go index 95f60ed5..98126636 100644 --- a/pkg/coordinator/tasks/tasks.go +++ b/pkg/coordinator/tasks/tasks.go @@ -38,10 +38,10 @@ import ( runtaskoptions "github.com/noku-team/assertoor/pkg/coordinator/tasks/run_task_options" runtasks "github.com/noku-team/assertoor/pkg/coordinator/tasks/run_tasks" runtasksconcurrent "github.com/noku-team/assertoor/pkg/coordinator/tasks/run_tasks_concurrent" - txpoollatencyanalysis "github.com/noku-team/assertoor/pkg/coordinator/tasks/tx_pool_latency_analysis" - txpoolthroughputanalysis "github.com/noku-team/assertoor/pkg/coordinator/tasks/tx_pool_throughput_analysis" - txpoolclean "github.com/noku-team/assertoor/pkg/coordinator/tasks/tx_pool_clean" sleep "github.com/noku-team/assertoor/pkg/coordinator/tasks/sleep" + txpoolclean "github.com/noku-team/assertoor/pkg/coordinator/tasks/tx_pool_clean" + txpoolthroughputanalysis "github.com/noku-team/assertoor/pkg/coordinator/tasks/tx_pool_throughput_analysis" + txpoollatencyanalysis "github.com/noku-team/assertoor/pkg/coordinator/tasks/tx_pool_latency_analysis" ) var AvailableTaskDescriptors = []*types.TaskDescriptor{ diff --git a/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md b/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md index 45513029..52783061 100644 --- a/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md +++ b/pkg/coordinator/tasks/tx_pool_latency_analysis/README.md @@ -15,7 +15,7 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing - **`duration_s`**: The test duration (the number of transactions to send is calculated as `tps * duration_s`). -- **`measureInterval`**: +- **`logInterval`**: The interval at which the script logs progress (e.g., every 100 transactions). ### Outputs @@ -37,9 +37,9 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing ```yaml - name: tx_pool_latency_analysis config: - tps: 100 + tps: 1000 duration_s: 10 - measureInterval: 1000 + logInterval: 1000 configVars: privateKey: "walletPrivkey" ``` diff --git a/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go b/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go index 5bfed789..3d2924f6 100644 --- a/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go +++ b/pkg/coordinator/tasks/tx_pool_latency_analysis/config.go @@ -1,11 +1,11 @@ -package txpoollatencyanalysis +package txpool_latency_analysis type Config struct { PrivateKey string `yaml:"privateKey" json:"privateKey"` TPS int `yaml:"tps" json:"tps"` Duration_s int `yaml:"duration_s" json:"duration_s"` - MeasureInterval int `yaml:"measureInterval" json:"measureInterval"` + LogInterval int `yaml:"logInterval" json:"logInterval"` SecondsBeforeRunning int64 `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"` } @@ -13,7 +13,7 @@ func DefaultConfig() Config { return Config{ TPS: 100, Duration_s: 60, - MeasureInterval: 100, + LogInterval: 100, SecondsBeforeRunning: 0, } } diff --git a/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go b/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go index 31100ae6..79f5e6ad 100644 --- a/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go +++ b/pkg/coordinator/tasks/tx_pool_latency_analysis/task.go @@ -1,24 +1,16 @@ -package txpoollatencyanalysis +package txpool_latency_analysis import ( "context" - crand "crypto/rand" "encoding/json" "fmt" - "math/big" + "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool" "math/rand" "time" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/core/forkid" - ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/params" - "github.com/noku-team/assertoor/pkg/coordinator/clients/execution" - "github.com/noku-team/assertoor/pkg/coordinator/helper" "github.com/noku-team/assertoor/pkg/coordinator/types" "github.com/noku-team/assertoor/pkg/coordinator/utils/hdr" - "github.com/noku-team/assertoor/pkg/coordinator/utils/sentry" "github.com/noku-team/assertoor/pkg/coordinator/wallet" "github.com/sirupsen/logrus" ) @@ -105,15 +97,6 @@ func (t *Task) Execute(ctx context.Context) error { t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds", client.GetName(), t.config.TPS, t.config.Duration_s) - conn, err := t.getTcpConn(ctx, client) - if err != nil { - t.logger.Errorf("Failed to get wire eth TCP connection: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - return nil - } - - defer conn.Close() - // Wait for the specified seconds before starting the task if t.config.SecondsBeforeRunning > 0 { t.logger.Infof("Waiting for %d seconds before starting the task...", t.config.SecondsBeforeRunning) @@ -127,164 +110,35 @@ func (t *Task) Execute(ctx context.Context) error { } // Prepare to collect transaction latencies - var totNumberOfTxes int = t.config.TPS * t.config.Duration_s - var txs []*ethtypes.Transaction = make([]*ethtypes.Transaction, totNumberOfTxes) - var txStartTime []time.Time = make([]time.Time, totNumberOfTxes) var testDeadline time.Time = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second) - var latenciesMus = make([]int64, totNumberOfTxes) - - startTime := time.Now() - isFailed := false - sentTxCount := 0 - duplicatedP2PEventCount := 0 - coordinatedOmissionEventCount := 0 - - // Start generating and sending transactions - go func() { - startExecTime := time.Now() - endTime := startExecTime.Add(time.Second * time.Duration(t.config.Duration_s)) - - // Generate and send transactions - for i := 0; i < totNumberOfTxes; i++ { - // Calculate how much time we have left - remainingTime := time.Until(endTime) - - // Calculate sleep time to distribute remaining transactions evenly - sleepTime := remainingTime / time.Duration(totNumberOfTxes-i) - - // generate and send tx - go func(i int) { - - tx, err := t.generateTransaction(ctx, i) - if err != nil { - t.logger.Errorf("Failed to create transaction: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - txStartTime[i] = time.Now() - err = client.GetRPCClient().SendTransaction(ctx, tx) - if err != nil { - t.logger.WithField("client", client.GetName()).Errorf("Failed to send transaction: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - txs[i] = tx - sentTxCount++ - - // log transaction sending - if sentTxCount%t.config.MeasureInterval == 0 { - elapsed := time.Since(startTime) - t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds()) - } - - }(i) - - // Sleep to control the TPS - if i < totNumberOfTxes-1 { - if sleepTime > 0 { - time.Sleep(sleepTime) - } else { - coordinatedOmissionEventCount++ - } - } - select { - case <-ctx.Done(): - t.logger.Warnf("Task cancelled, stopping transaction generation.") - return - default: - // if testDeadline reached, stop sending txes - if isFailed { - return - } - if time.Now().After(testDeadline) { - t.logger.Infof("Reached duration limit, stopping transaction generation.") - return - } - } - } - }() - - // Wait P2P event messages - func() { - var receivedEvents int = 0 - for { - txes, err := conn.ReadTransactionMessages() - if err != nil { - t.logger.Errorf("Failed reading p2p events: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - for _, tx := range *txes { - tx_data := tx.Data() - // read tx_data that is in the format "tx_index:" - var tx_index int - _, err := fmt.Sscanf(string(tx_data), "tx_index:%d", &tx_index) - if err != nil { - t.logger.Errorf("Failed to parse transaction data: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - if tx_index < 0 || tx_index >= totNumberOfTxes { - t.logger.Errorf("Transaction index out of range: %d", tx_index) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - // log the duplicated p2p events, and count duplicated p2p events - // todo: add a timeout of N seconds that activates if duplicatedP2PEventCount + receivedEvents >= totNumberOfTxes, if exceeded, exit the function - if latenciesMus[tx_index] != 0 { - duplicatedP2PEventCount++ - } - - latenciesMus[tx_index] = time.Since(txStartTime[tx_index]).Microseconds() - receivedEvents++ - - if receivedEvents%t.config.MeasureInterval == 0 { - t.logger.Infof("Received %d p2p events", receivedEvents) - } - } - - if receivedEvents >= totNumberOfTxes { - t.logger.Infof("Reading of p2p events finished") - return - } - - select { - case <-ctx.Done(): - t.logger.Warnf("Task cancelled, stopping reading p2p events.") - return - default: - // check test deadline - if time.Now().After(testDeadline) { - t.logger.Warnf("Reached duration limit, stopping reading p2p events.") - return - } - } - } - }() + load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client) + load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval) - lastMeasureDelay := time.Since(startTime) - t.logger.Infof("Last measure delay since start time: %s", lastMeasureDelay) + // Generate and sending transactions, waiting for their propagation + err = load.Execute() + if err != nil { + t.logger.Errorf("Error during transaction load execution: %v", err) + t.ctx.SetResult(types.TaskResultFailure) + return err + } - if coordinatedOmissionEventCount > 0 { - t.logger.Warnf("Coordinated omission events: %d", coordinatedOmissionEventCount) + // Collect the transactions and their latencies + result, err := load.MeasurePropagationLatencies() + if err != nil { + t.logger.Errorf("Error measuring transaction propagation latencies: %v", err) + t.ctx.SetResult(types.TaskResultFailure) + return err } - if duplicatedP2PEventCount > 0 { - t.logger.Warnf("Duplicated p2p events: %d", duplicatedP2PEventCount) + // Check if the context was cancelled or other errors occurred + if result.Failed { + return fmt.Errorf("Error measuring transaction propagation latencies: load failed") } // Send txes to other clients, for speeding up tx mining - for _, tx := range txs { + t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs)) + for _, tx := range result.Txs { for _, otherClient := range executionClients { if otherClient.GetName() == client.GetName() { continue @@ -293,29 +147,12 @@ func (t *Task) Execute(ctx context.Context) error { otherClient.GetRPCClient().SendTransaction(ctx, tx) } } - - // Check if the context was cancelled or other errors occurred - if ctx.Err() != nil && !isFailed { - return nil - } - - // Check if we received all transactions p2p events - notReceivedP2PEventCount := 0 - for i := 0; i < totNumberOfTxes; i++ { - if latenciesMus[i] == 0 { - notReceivedP2PEventCount++ - // Assign a default value for missing P2P events - latenciesMus[i] = (time.Duration(t.config.Duration_s) * time.Second).Microseconds() - } - } - if notReceivedP2PEventCount > 0 { - t.logger.Warnf("Missed p2p events: %d (assigned latency=duration)", notReceivedP2PEventCount) - } + t.logger.Infof("Total transactions sent: %d", result.TotalTxs) // Calculate statistics var maxLatency int64 = 0 var minLatency int64 = 0 - for _, lat := range latenciesMus { + for _, lat := range result.LatenciesMus { if lat > maxLatency { maxLatency = lat } @@ -326,29 +163,30 @@ func (t *Task) Execute(ctx context.Context) error { t.logger.Infof("Max latency: %d mus, Min latency: %d mus", maxLatency, minLatency) // Generate HDR plot - plot, err := hdr.HdrPlot(latenciesMus) + plot, err := hdr.HdrPlot(result.LatenciesMus) if err != nil { t.logger.Errorf("Failed to generate HDR plot: %v", err) t.ctx.SetResult(types.TaskResultFailure) return nil } - t.ctx.Outputs.SetVar("tx_count", totNumberOfTxes) + t.ctx.Outputs.SetVar("tx_count", result.TotalTxs) t.ctx.Outputs.SetVar("min_latency_mus", minLatency) t.ctx.Outputs.SetVar("max_latency_mus", maxLatency) - t.ctx.Outputs.SetVar("duplicated_p2p_event_count", duplicatedP2PEventCount) - t.ctx.Outputs.SetVar("missed_p2p_event_count", notReceivedP2PEventCount) - t.ctx.Outputs.SetVar("coordinated_omission_event_count", coordinatedOmissionEventCount) + t.ctx.Outputs.SetVar("duplicated_p2p_event_count", result.DuplicatedP2PEventCount) + t.ctx.Outputs.SetVar("missed_p2p_event_count", result.NotReceivedP2PEventCount) + t.ctx.Outputs.SetVar("coordinated_omission_event_count", result.CoordinatedOmissionEventCount) t.ctx.SetResult(types.TaskResultSuccess) outputs := map[string]interface{}{ - "tx_count": totNumberOfTxes, + "tx_count": result.TotalTxs, "min_latency_mus": minLatency, "max_latency_mus": maxLatency, "tx_pool_latency_hdr_plot": plot, - "duplicated_p2p_event_count": duplicatedP2PEventCount, - "coordinated_omission_events_count": coordinatedOmissionEventCount, + "duplicated_p2p_event_count": result.DuplicatedP2PEventCount, + "coordinated_omission_events_count": result.CoordinatedOmissionEventCount, + "missed_p2p_event_count": result.NotReceivedP2PEventCount, } outputsJSON, _ := json.Marshal(outputs) @@ -356,77 +194,3 @@ func (t *Task) Execute(ctx context.Context) error { return nil } - -func (t *Task) getTcpConn(ctx context.Context, client *execution.Client) (*sentry.Conn, error) { - chainConfig := params.AllDevChainProtocolChanges - - head, err := client.GetRPCClient().GetLatestBlock(ctx) - if err != nil { - t.ctx.SetResult(types.TaskResultFailure) - return nil, err - } - - chainID, err := client.GetRPCClient().GetEthClient().ChainID(ctx) - if err != nil { - return nil, err - } - - chainConfig.ChainID = chainID - - genesis, err := client.GetRPCClient().GetEthClient().BlockByNumber(ctx, new(big.Int).SetUint64(0)) - if err != nil { - t.logger.Errorf("Failed to fetch genesis block: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - return nil, err - } - - conn, err := sentry.GetTcpConn(client) - if err != nil { - t.logger.Errorf("Failed to get TCP connection: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - return nil, err - } - - forkId := forkid.NewID(chainConfig, genesis, head.NumberU64(), head.Time()) - - // handshake - err = conn.Peer(chainConfig.ChainID, genesis.Hash(), head.Hash(), forkId, nil) - if err != nil { - return nil, err - } - - t.logger.Infof("Connected to %s", client.GetName()) - - return conn, nil -} - -func (t *Task) generateTransaction(ctx context.Context, i int) (*ethtypes.Transaction, error) { - tx, err := t.wallet.BuildTransaction(ctx, func(_ context.Context, nonce uint64, _ bind.SignerFn) (*ethtypes.Transaction, error) { - addr := t.wallet.GetAddress() - toAddr := &addr - - txAmount, _ := crand.Int(crand.Reader, big.NewInt(0).SetUint64(10*1e18)) - - feeCap := &helper.BigInt{Value: *big.NewInt(100000000000)} // 100 Gwei - tipCap := &helper.BigInt{Value: *big.NewInt(1000000000)} // 1 Gwei - - txObj := ðtypes.DynamicFeeTx{ - ChainID: t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), - Nonce: nonce, - GasTipCap: &tipCap.Value, - GasFeeCap: &feeCap.Value, - Gas: 50000, - To: toAddr, - Value: txAmount, - Data: []byte(fmt.Sprintf("tx_index:%d", i)), - } - - return ethtypes.NewTx(txObj), nil - }) - - if err != nil { - return nil, err - } - - return tx, nil -} diff --git a/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md b/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md index 7c1fc94c..fb10e921 100644 --- a/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md +++ b/pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md @@ -15,7 +15,7 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p - **`duration_s`**: The test duration (the number of transactions to send is calculated as `tps * duration_s`). -- **`measureInterval`**: +- **`logInterval`**: The interval at which the script logs progress (e.g., every 100 transactions). ### Outputs @@ -31,9 +31,9 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p ```yaml - name: tx_pool_throughput_analysis config: - tps: 100 + tps: 1000 duration_s: 10 - measureInterval: 1000 + logInterval: 1000 configVars: privateKey: "walletPrivkey" ``` diff --git a/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go b/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go index a6940c37..80e516e8 100644 --- a/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go +++ b/pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go @@ -1,11 +1,11 @@ -package txpoolcheck +package tx_pool_throughput_analysis type Config struct { PrivateKey string `yaml:"privateKey" json:"privateKey"` TPS int `yaml:"tps" json:"tps"` Duration_s int `yaml:"duration_s" json:"duration_s"` - MeasureInterval int `yaml:"measureInterval" json:"measureInterval"` + LogInterval int `yaml:"logInterval" json:"logInterval"` SecondsBeforeRunning int `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"` } @@ -13,7 +13,7 @@ func DefaultConfig() Config { return Config{ TPS: 100, Duration_s: 60, - MeasureInterval: 100, + LogInterval: 100, SecondsBeforeRunning: 0, } } diff --git a/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go b/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go index b2a0d86c..67b9a411 100644 --- a/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go +++ b/pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go @@ -1,23 +1,15 @@ -package txpoolcheck +package tx_pool_throughput_analysis import ( "context" - crand "crypto/rand" "encoding/json" "fmt" - "math/big" + "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool" "math/rand" "time" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/core/forkid" - ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/params" - "github.com/noku-team/assertoor/pkg/coordinator/clients/execution" - "github.com/noku-team/assertoor/pkg/coordinator/helper" "github.com/noku-team/assertoor/pkg/coordinator/types" - "github.com/noku-team/assertoor/pkg/coordinator/utils/sentry" "github.com/noku-team/assertoor/pkg/coordinator/wallet" "github.com/sirupsen/logrus" ) @@ -104,15 +96,6 @@ func (t *Task) Execute(ctx context.Context) error { t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds", client.GetName(), t.config.TPS, t.config.Duration_s) - conn, err := t.getTcpConn(ctx, client) - if err != nil { - t.logger.Errorf("Failed to get wire eth TCP connection: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - return nil - } - - defer conn.Close() - // Wait for the specified seconds before starting the task if t.config.SecondsBeforeRunning > 0 { t.logger.Infof("Waiting for %d seconds before starting the task...", t.config.SecondsBeforeRunning) @@ -126,164 +109,35 @@ func (t *Task) Execute(ctx context.Context) error { } // Prepare to collect transaction latencies - var totNumberOfTxes int = t.config.TPS * t.config.Duration_s - var txs []*ethtypes.Transaction = make([]*ethtypes.Transaction, totNumberOfTxes) - var txStartTime []time.Time = make([]time.Time, totNumberOfTxes) var testDeadline time.Time = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second) - var latenciesMus = make([]int64, totNumberOfTxes) - - startTime := time.Now() - isFailed := false - sentTxCount := 0 - duplicatedP2PEventCount := 0 - coordinatedOmissionEventCount := 0 - - // Start generating and sending transactions - go func() { - startExecTime := time.Now() - endTime := startExecTime.Add(time.Second * time.Duration(t.config.Duration_s)) - - // Generate and send transactions - for i := 0; i < totNumberOfTxes; i++ { - // Calculate how much time we have left - remainingTime := time.Until(endTime) - - // Calculate sleep time to distribute remaining transactions evenly - sleepTime := remainingTime / time.Duration(totNumberOfTxes-i) - - // generate and send tx - go func(i int) { - - tx, err := t.generateTransaction(ctx, i) - if err != nil { - t.logger.Errorf("Failed to create transaction: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - txStartTime[i] = time.Now() - err = client.GetRPCClient().SendTransaction(ctx, tx) - if err != nil { - t.logger.WithField("client", client.GetName()).Errorf("Failed to send transaction: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - txs[i] = tx - sentTxCount++ - - // log transaction sending - if sentTxCount%t.config.MeasureInterval == 0 { - elapsed := time.Since(startTime) - t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, elapsed.Seconds()) - } - - }(i) - - // Sleep to control the TPS - if i < totNumberOfTxes-1 { - if sleepTime > 0 { - time.Sleep(sleepTime) - } else { - coordinatedOmissionEventCount++ - } - } - select { - case <-ctx.Done(): - t.logger.Warnf("Task cancelled, stopping transaction generation.") - return - default: - // if testDeadline reached, stop sending txes - if isFailed { - return - } - if time.Now().After(testDeadline) { - t.logger.Infof("Reached duration limit, stopping transaction generation.") - return - } - } - } - }() - - // Wait P2P event messages - func() { - var receivedEvents int = 0 - for { - txes, err := conn.ReadTransactionMessages() - if err != nil { - t.logger.Errorf("Failed reading p2p events: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - for _, tx := range *txes { - tx_data := tx.Data() - // read tx_data that is in the format "tx_index:" - var tx_index int - _, err := fmt.Sscanf(string(tx_data), "tx_index:%d", &tx_index) - if err != nil { - t.logger.Errorf("Failed to parse transaction data: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - if tx_index < 0 || tx_index >= totNumberOfTxes { - t.logger.Errorf("Transaction index out of range: %d", tx_index) - t.ctx.SetResult(types.TaskResultFailure) - isFailed = true - return - } - - // log the duplicated p2p events, and count duplicated p2p events - // todo: add a timeout of N seconds that activates if duplicatedP2PEventCount + receivedEvents >= totNumberOfTxes, if exceeded, exit the function - if latenciesMus[tx_index] != 0 { - duplicatedP2PEventCount++ - } - - latenciesMus[tx_index] = time.Since(txStartTime[tx_index]).Microseconds() - receivedEvents++ - - if receivedEvents%t.config.MeasureInterval == 0 { - t.logger.Infof("Received %d p2p events", receivedEvents) - } - } - - if receivedEvents >= totNumberOfTxes { - t.logger.Infof("Reading of p2p events finished") - return - } - - select { - case <-ctx.Done(): - t.logger.Warnf("Task cancelled, stopping reading p2p events.") - return - default: - // check test deadline - if time.Now().After(testDeadline) { - t.logger.Warnf("Reached duration limit, stopping reading p2p events.") - return - } - } - } - }() + load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client) + load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval) - lastMeasureDelay := time.Since(startTime) - t.logger.Infof("Last measure delay since start time: %s", lastMeasureDelay) + // Generate and sending transactions, waiting for their propagation + err = load.Execute() + if err != nil { + t.logger.Errorf("Error during transaction load execution: %v", err) + t.ctx.SetResult(types.TaskResultFailure) + return err + } - if coordinatedOmissionEventCount > 0 { - t.logger.Warnf("Coordinated omission events: %d", coordinatedOmissionEventCount) + // Collect the transactions and their latencies + result, err := load.MeasurePropagationLatencies() + if err != nil { + t.logger.Errorf("Error measuring transaction propagation latencies: %v", err) + t.ctx.SetResult(types.TaskResultFailure) + return err } - if duplicatedP2PEventCount > 0 { - t.logger.Warnf("Duplicated p2p events: %d", duplicatedP2PEventCount) + // Check if the context was cancelled or other errors occurred + if result.Failed { + return fmt.Errorf("Error measuring transaction propagation latencies: load failed") } // Send txes to other clients, for speeding up tx mining - for _, tx := range txs { + t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs)) + for _, tx := range result.Txs { for _, otherClient := range executionClients { if otherClient.GetName() == client.GetName() { continue @@ -292,39 +146,31 @@ func (t *Task) Execute(ctx context.Context) error { otherClient.GetRPCClient().SendTransaction(ctx, tx) } } + t.logger.Infof("Total transactions sent: %d", result.TotalTxs) - // Check if the context was cancelled or other errors occurred - if ctx.Err() != nil && !isFailed { - return nil - } + // Calculate statistics + t.logger.Infof("Last measure delay since start time: %s", result.LastMeasureDelay) - // Check if we received all transactions p2p events - notReceivedP2PEventCount := 0 - for i := 0; i < totNumberOfTxes; i++ { - if latenciesMus[i] == 0 { - notReceivedP2PEventCount++ - // Assign a default value for missing P2P events - latenciesMus[i] = (time.Duration(t.config.Duration_s) * time.Second).Microseconds() - } - } - if notReceivedP2PEventCount > 0 { - t.logger.Warnf("Missed p2p events: %d (assigned latency=duration)", notReceivedP2PEventCount) - } + processed_tx_per_second := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds() - // Calculate statistics - processed_tx_per_second := float64(sentTxCount) / lastMeasureDelay.Seconds() + t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s", + result.TotalTxs, result.LastMeasureDelay.Seconds(), processed_tx_per_second) + t.logger.Infof("Sent %d transactions in %.2fs", result.TotalTxs, result.LastMeasureDelay.Seconds()) t.ctx.Outputs.SetVar("mean_tps_throughput", processed_tx_per_second) - t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s", sentTxCount, lastMeasureDelay.Seconds(), processed_tx_per_second) - t.ctx.Outputs.SetVar("tx_count", totNumberOfTxes) - t.logger.Infof("Sent %d transactions in %.2fs", sentTxCount, lastMeasureDelay.Seconds()) + t.ctx.Outputs.SetVar("tx_count", result.TotalTxs) + t.ctx.Outputs.SetVar("duplicated_p2p_event_count", result.DuplicatedP2PEventCount) + t.ctx.Outputs.SetVar("missed_p2p_event_count", result.NotReceivedP2PEventCount) + t.ctx.Outputs.SetVar("coordinated_omission_event_count", result.CoordinatedOmissionEventCount) t.ctx.SetResult(types.TaskResultSuccess) outputs := map[string]interface{}{ - "tx_count": totNumberOfTxes, + "tx_count": result.TotalTxs, "mean_tps_throughput": processed_tx_per_second, - "coordinated_omission_events_count": coordinatedOmissionEventCount, + "duplicated_p2p_event_count": result.DuplicatedP2PEventCount, + "coordinated_omission_events_count": result.CoordinatedOmissionEventCount, + "missed_p2p_event_count": result.NotReceivedP2PEventCount, } outputsJSON, _ := json.Marshal(outputs) @@ -332,77 +178,3 @@ func (t *Task) Execute(ctx context.Context) error { return nil } - -func (t *Task) getTcpConn(ctx context.Context, client *execution.Client) (*sentry.Conn, error) { - chainConfig := params.AllDevChainProtocolChanges - - head, err := client.GetRPCClient().GetLatestBlock(ctx) - if err != nil { - t.ctx.SetResult(types.TaskResultFailure) - return nil, err - } - - chainID, err := client.GetRPCClient().GetEthClient().ChainID(ctx) - if err != nil { - return nil, err - } - - chainConfig.ChainID = chainID - - genesis, err := client.GetRPCClient().GetEthClient().BlockByNumber(ctx, new(big.Int).SetUint64(0)) - if err != nil { - t.logger.Errorf("Failed to fetch genesis block: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - return nil, err - } - - conn, err := sentry.GetTcpConn(client) - if err != nil { - t.logger.Errorf("Failed to get TCP connection: %v", err) - t.ctx.SetResult(types.TaskResultFailure) - return nil, err - } - - forkId := forkid.NewID(chainConfig, genesis, head.NumberU64(), head.Time()) - - // handshake - err = conn.Peer(chainConfig.ChainID, genesis.Hash(), head.Hash(), forkId, nil) - if err != nil { - return nil, err - } - - t.logger.Infof("Connected to %s", client.GetName()) - - return conn, nil -} - -func (t *Task) generateTransaction(ctx context.Context, i int) (*ethtypes.Transaction, error) { - tx, err := t.wallet.BuildTransaction(ctx, func(_ context.Context, nonce uint64, _ bind.SignerFn) (*ethtypes.Transaction, error) { - addr := t.wallet.GetAddress() - toAddr := &addr - - txAmount, _ := crand.Int(crand.Reader, big.NewInt(0).SetUint64(10*1e18)) - - feeCap := &helper.BigInt{Value: *big.NewInt(100000000000)} // 100 Gwei - tipCap := &helper.BigInt{Value: *big.NewInt(1000000000)} // 1 Gwei - - txObj := ðtypes.DynamicFeeTx{ - ChainID: t.ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), - Nonce: nonce, - GasTipCap: &tipCap.Value, - GasFeeCap: &feeCap.Value, - Gas: 50000, - To: toAddr, - Value: txAmount, - Data: []byte(fmt.Sprintf("tx_index:%d", i)), - } - - return ethtypes.NewTx(txObj), nil - }) - - if err != nil { - return nil, err - } - - return tx, nil -} diff --git a/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go b/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go new file mode 100644 index 00000000..94cf0ac9 --- /dev/null +++ b/pkg/coordinator/utils/tx_load_tool/tx_load_tool.go @@ -0,0 +1,377 @@ +package tx_load_tool + +import ( + "context" + crand "crypto/rand" + "fmt" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/core/forkid" + "github.com/ethereum/go-ethereum/params" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/noku-team/assertoor/pkg/coordinator/clients/execution" + "github.com/noku-team/assertoor/pkg/coordinator/helper" + "github.com/noku-team/assertoor/pkg/coordinator/types" + "github.com/noku-team/assertoor/pkg/coordinator/utils/sentry" + "github.com/noku-team/assertoor/pkg/coordinator/wallet" + "github.com/sirupsen/logrus" +) + +type LoadTarget struct { + ctx context.Context + task_ctx *types.TaskContext + wallet *wallet.Wallet + logger logrus.FieldLogger + client *execution.Client +} + +// NewLoadTarget creates a new LoadTarget instance +func NewLoadTarget(ctx context.Context, task_ctx *types.TaskContext, logger logrus.FieldLogger, + wallet *wallet.Wallet, client *execution.Client) *LoadTarget { + return &LoadTarget{ + ctx: ctx, + task_ctx: task_ctx, + wallet: wallet, + logger: logger, + client: client, + } +} + +type LoadResult struct { + Failed bool + StartTime time.Time + EndTime time.Time // sent time of the last transaction + // Data collected during the load test + TotalTxs int + Txs []*ethtypes.Transaction + TxStartTime []time.Time + LatenciesMus []int64 + LastMeasureDelay time.Duration + // Statistics + SentTxCount int + DuplicatedP2PEventCount int + CoordinatedOmissionEventCount int + NotReceivedP2PEventCount int +} + +// NewLoadResult creates a new LoadResult instance +func NewLoadResult(totNumberOfTxes int) *LoadResult { + return &LoadResult{ + TotalTxs: totNumberOfTxes, + Txs: make([]*ethtypes.Transaction, totNumberOfTxes), + TxStartTime: make([]time.Time, totNumberOfTxes), + LatenciesMus: make([]int64, totNumberOfTxes), + SentTxCount: 0, + DuplicatedP2PEventCount: 0, + CoordinatedOmissionEventCount: 0, + } +} + +type Load struct { + target *LoadTarget + testDeadline time.Time + TPS int + Duration_s int + LogInterval int + Result *LoadResult +} + +// NewLoad creates a new Load instance +func NewLoad(target *LoadTarget, TPS int, duration_s int, testDeadline time.Time, logInterval int) *Load { + return &Load{ + target: target, + TPS: TPS, + Duration_s: duration_s, + testDeadline: testDeadline, + LogInterval: logInterval, + Result: NewLoadResult(TPS * duration_s), + } +} + +// ExecuteTPSLevel generates and sends transactions at the specified TPS level for the specified duration +func (l *Load) Execute() error { + // Prepare to collect transaction latencies + l.Result.Failed = false + l.Result.SentTxCount = 0 + l.Result.DuplicatedP2PEventCount = 0 + l.Result.CoordinatedOmissionEventCount = 0 + + // Start generating and sending transactions + go func() { + // Sleep to ensure the start time is recorded correctly + time.Sleep(100 * time.Millisecond) + + l.Result.StartTime = time.Now() + endTime := l.Result.StartTime.Add(time.Second * time.Duration(l.Duration_s)) + l.target.logger.Infof("Starting transaction generation at %s", l.Result.StartTime) + + // Generate and send transactions + for i := 0; i < l.Result.TotalTxs; i++ { + // Calculate how much time we have left + remainingTime := time.Until(endTime) + + // Calculate sleep time to distribute remaining transactions evenly + sleepTime := remainingTime / time.Duration(l.Result.TotalTxs-i) + + // generate and send tx + go func(i int) { + + tx, err := l.target.generateTransaction(i) + if err != nil { + l.target.logger.Errorf("Failed to create transaction: %v", err) + l.target.task_ctx.SetResult(types.TaskResultFailure) + l.Result.Failed = true + return + } + + l.Result.TxStartTime[i] = time.Now() + err = l.target.sendTransaction(tx) + if err != nil { + l.target.logger.WithField("client", l.target.client.GetName()).Errorf("Failed to send transaction: %v", err) + l.target.task_ctx.SetResult(types.TaskResultFailure) + l.Result.Failed = true + return + } + + l.Result.Txs[i] = tx + l.Result.SentTxCount++ + + // log transaction sending + if l.Result.SentTxCount%l.LogInterval == 0 { + elapsed := time.Since(l.Result.StartTime) + l.target.logger.Infof("Sent %d transactions in %.2fs", l.Result.SentTxCount, elapsed.Seconds()) + } + + }(i) + + // Sleep to control the TPS + if i < l.Result.TotalTxs-1 { + if sleepTime > 0 { + time.Sleep(sleepTime) + } else { + l.Result.CoordinatedOmissionEventCount++ + } + } + + select { + case <-l.target.ctx.Done(): + l.target.logger.Warnf("Task cancelled, stopping transaction generation.") + return + default: + // if testDeadline reached, stop sending txes + if l.Result.Failed { + return + } + if time.Now().After(l.testDeadline) { + l.target.logger.Infof("Reached duration limit, stopping transaction generation.") + return + } + } + } + + l.Result.EndTime = time.Now() + l.target.logger.Infof("Finished sending transactions at %s", l.Result.EndTime) + }() + + return nil +} + +// MeasurePropagationLatencies reads P2P events and calculates propagation latencies for each transaction +func (l *Load) MeasurePropagationLatencies() (*LoadResult, error) { + + // Get a P2P connection to read events + conn, err := l.target.getTcpConn() + if err != nil { + l.target.logger.Errorf("Failed to get P2P connection: %v", err) + l.target.task_ctx.SetResult(types.TaskResultFailure) + return l.Result, fmt.Errorf("measurement stopped: failed to get P2P connection") + } + + defer conn.Close() + + // Wait P2P event messages + var receivedEvents int = 0 + for { + txes, err := conn.ReadTransactionMessages(time.Duration(60) * time.Second) + if err != nil { + if err.Error() == "timeoutExpired" { + l.target.logger.Warnf("Timeout expired while reading p2p events") + break + } + + l.target.logger.Errorf("Failed reading p2p events: %v", err) + l.target.task_ctx.SetResult(types.TaskResultFailure) + l.Result.Failed = true + return l.Result, fmt.Errorf("measurement stopped: failed reading p2p events") + } + + for i, tx := range *txes { + tx_data := tx.Data() + // read tx_data that is in the format "tx_index:" + var tx_index int + _, err := fmt.Sscanf(string(tx_data), "tx_index:%d", &tx_index) + if err != nil { + l.target.logger.Errorf("Failed to parse transaction data: %v", err) + l.target.task_ctx.SetResult(types.TaskResultFailure) + l.Result.Failed = true + return l.Result, fmt.Errorf("measurement stopped: failed to parse transaction data at event %d", i) + } + if tx_index < 0 || tx_index >= l.Result.TotalTxs { + l.target.logger.Errorf("Transaction index out of range: %d", tx_index) + l.target.task_ctx.SetResult(types.TaskResultFailure) + l.Result.Failed = true + return l.Result, fmt.Errorf("measurement stopped: transaction index out of range at event %d", i) + } + + // log the duplicated p2p events, and count duplicated p2p events + if l.Result.LatenciesMus[tx_index] != 0 { + l.Result.DuplicatedP2PEventCount++ + } else { + l.Result.LatenciesMus[tx_index] = time.Since(l.Result.TxStartTime[tx_index]).Microseconds() + receivedEvents++ + } + + if receivedEvents%l.LogInterval == 0 { + l.target.logger.Infof("Received %d p2p events", receivedEvents) + } + } + + if receivedEvents >= l.Result.TotalTxs { + l.target.logger.Infof("Reading of p2p events finished") + break + } + + select { + case <-l.target.ctx.Done(): + l.target.logger.Warnf("Task cancelled, stopping reading p2p events.") + l.Result.Failed = true + return l.Result, fmt.Errorf("measurement stopped: task cancelled") + default: + // check test deadline + if time.Now().After(l.testDeadline) { + l.target.logger.Warnf("Reached duration limit, stopping reading p2p events.") + l.Result.Failed = true + return l.Result, fmt.Errorf("measurement stopped: reached duration limit") + } + // check if the execution failed + if l.Result.Failed { + l.target.logger.Warnf("Execution failed, stopping reading p2p events.") + return l.Result, fmt.Errorf("measurement stopped: execution failed") + } + } + } + + // check if the execution failed + if l.Result.Failed { + l.target.logger.Warnf("Execution failed, stopping reading p2p events.") + return l.Result, fmt.Errorf("measurement stopped: execution failed") + } + + // Calculate the last measure delay + l.Result.LastMeasureDelay = time.Since(l.Result.StartTime) + l.target.logger.Infof("Last measure delay since start time: %s", l.Result.LastMeasureDelay) + + if l.Result.CoordinatedOmissionEventCount > 0 { + l.target.logger.Warnf("Coordinated omission events: %d", l.Result.CoordinatedOmissionEventCount) + } + + if l.Result.DuplicatedP2PEventCount > 0 { + l.target.logger.Warnf("Duplicated p2p events: %d", l.Result.DuplicatedP2PEventCount) + } + + // Check if we received all transactions p2p events + l.Result.NotReceivedP2PEventCount = 0 + for i := 0; i < l.Result.TotalTxs; i++ { + if l.Result.LatenciesMus[i] == 0 { + l.Result.NotReceivedP2PEventCount++ + // Assign a default value for missing P2P events + l.Result.LatenciesMus[i] = (time.Duration(l.Duration_s) * time.Second).Microseconds() + } + } + if l.Result.NotReceivedP2PEventCount > 0 { + l.target.logger.Warnf("Missed p2p events: %d (assigned latency=duration)", l.Result.NotReceivedP2PEventCount) + } + + return l.Result, nil +} + +func (t *LoadTarget) getTcpConn() (*sentry.Conn, error) { + chainConfig := params.AllDevChainProtocolChanges + + head, err := t.client.GetRPCClient().GetLatestBlock(t.ctx) + if err != nil { + t.task_ctx.SetResult(types.TaskResultFailure) + return nil, err + } + + chainID, err := t.client.GetRPCClient().GetEthClient().ChainID(t.ctx) + if err != nil { + return nil, err + } + + chainConfig.ChainID = chainID + + genesis, err := t.client.GetRPCClient().GetEthClient().BlockByNumber(t.ctx, new(big.Int).SetUint64(0)) + if err != nil { + t.logger.Errorf("Failed to fetch genesis block: %v", err) + t.task_ctx.SetResult(types.TaskResultFailure) + return nil, err + } + + conn, err := sentry.GetTcpConn(t.client) + if err != nil { + t.logger.Errorf("Failed to get TCP connection: %v", err) + t.task_ctx.SetResult(types.TaskResultFailure) + return nil, err + } + + forkId := forkid.NewID(chainConfig, genesis, head.NumberU64(), head.Time()) + + // handshake + err = conn.Peer(chainConfig.ChainID, genesis.Hash(), head.Hash(), forkId, nil) + if err != nil { + return nil, err + } + + t.logger.Infof("Connected to %s", t.client.GetName()) + + return conn, nil +} + +func (t *LoadTarget) generateTransaction(i int) (*ethtypes.Transaction, error) { + tx, err := t.wallet.BuildTransaction(t.ctx, func(_ context.Context, nonce uint64, _ bind.SignerFn) (*ethtypes.Transaction, error) { + addr := t.wallet.GetAddress() + toAddr := &addr + + txAmount, _ := crand.Int(crand.Reader, big.NewInt(0).SetUint64(10*1e18)) + + feeCap := &helper.BigInt{Value: *big.NewInt(100000000000)} // 100 Gwei + tipCap := &helper.BigInt{Value: *big.NewInt(1000000000)} // 1 Gwei + + txObj := ðtypes.DynamicFeeTx{ + ChainID: t.task_ctx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(), + Nonce: nonce, + GasTipCap: &tipCap.Value, + GasFeeCap: &feeCap.Value, + Gas: 50000, + To: toAddr, + Value: txAmount, + Data: []byte(fmt.Sprintf("tx_index:%d", i)), + } + + return ethtypes.NewTx(txObj), nil + }) + + if err != nil { + return nil, err + } + + return tx, nil +} + +func (t *LoadTarget) sendTransaction(tx *ethtypes.Transaction) error { + return t.client.GetRPCClient().SendTransaction(t.ctx, tx) +} diff --git a/playbooks/dev/tx-pool-check-short.yaml b/playbooks/dev/tx-pool-check-short.yaml index 46ff4338..5f9cbcbd 100644 --- a/playbooks/dev/tx-pool-check-short.yaml +++ b/playbooks/dev/tx-pool-check-short.yaml @@ -11,7 +11,7 @@ tasks: config: tps: 1000 duration_s: 10 - measureInterval: 1000 + logInterval: 1000 configVars: privateKey: "walletPrivkey" - name: tx_pool_clean @@ -25,6 +25,6 @@ tasks: config: tps: 1000 duration_s: 10 - measureInterval: 1000 + logInterval: 1000 configVars: privateKey: "walletPrivkey" \ No newline at end of file diff --git a/playbooks/dev/tx-pool-check.yaml b/playbooks/dev/tx-pool-check.yaml index 41e7c36f..f3700e7d 100644 --- a/playbooks/dev/tx-pool-check.yaml +++ b/playbooks/dev/tx-pool-check.yaml @@ -11,7 +11,7 @@ tasks: config: tps: 1000 duration_s: 10 - measureInterval: 1000 + logInterval: 1000 configVars: privateKey: "walletPrivkey" - name: tx_pool_clean @@ -25,7 +25,7 @@ tasks: config: tps: 1000 duration_s: 10 - measureInterval: 1000 + logInterval: 1000 configVars: privateKey: "walletPrivkey" # - name: tx_pool_clean @@ -38,7 +38,7 @@ tasks: # timeout: 10m # config: # txCount: 10000 -# measureInterval: 1000 +# logInterval: 1000 # highLatency: 5000 # failOnHighLatency: true # configVars: @@ -53,7 +53,7 @@ tasks: # timeout: 10m # config: # qps: 10000 -# measureInterval: 1000 +# logInterval: 1000 # configVars: # privateKey: "walletPrivkey" # - name: tx_pool_clean @@ -66,7 +66,7 @@ tasks: # timeout: 30m # config: # txCount: 15000 -# measureInterval: 1500 +# logInterval: 1500 # highLatency: 7000 # failOnHighLatency: true # configVars: @@ -81,6 +81,6 @@ tasks: # title: "Check transaction pool throughput with 15.000 transactions in one second" # config: # qps: 15000 -# measureInterval: 1500 +# logInterval: 1500 # configVars: # privateKey: "walletPrivkey"