Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/coordinator/tasks/tx_pool_clean/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,17 @@ func (t *Task) LoadConfig() error {
return nil
}

func (t *Task) Execute(ctx context.Context) error {
func (t *Task) Execute(_ context.Context) error {
clientPool := t.ctx.Scheduler.GetServices().ClientPool()
executionClients := clientPool.GetExecutionPool().GetReadyEndpoints(true)

t.logger.Infof("Found %d execution clients", len(executionClients))

for _, client := range executionClients {
t.cleanRecursive(client)
err := t.cleanRecursive(client)
if err != nil {
return err
}
}

t.ctx.SetResult(types.TaskResultSuccess)
Expand Down
8 changes: 3 additions & 5 deletions pkg/coordinator/tasks/tx_pool_latency_analysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing
- **`tps`**:
The total number of transactions to send in one second.

- **`duration_s`**:
The test duration (the number of transactions to send is calculated as `tps * duration_s`).
- **`durationS`**:
The test duration (the number of transactions to send is calculated as `tps * durationS`).

- **`logInterval`**:
The interval at which the script logs progress (e.g., every 100 transactions).
Expand All @@ -38,10 +38,8 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing
- name: tx_pool_latency_analysis
config:
tps: 1000
duration_s: 10
durationS: 10
logInterval: 1000
configVars:
privateKey: "walletPrivkey"
```


6 changes: 3 additions & 3 deletions pkg/coordinator/tasks/tx_pool_latency_analysis/config.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package txpool_latency_analysis
package txpoollatencyanalysis

type Config struct {
PrivateKey string `yaml:"privateKey" json:"privateKey"`

TPS int `yaml:"tps" json:"tps"`
Duration_s int `yaml:"duration_s" json:"duration_s"`
DurationS int `yaml:"durationS" json:"durationS"`
LogInterval int `yaml:"logInterval" json:"logInterval"`
SecondsBeforeRunning int64 `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"`
}

func DefaultConfig() Config {
return Config{
TPS: 100,
Duration_s: 60,
DurationS: 60,
LogInterval: 100,
SecondsBeforeRunning: 0,
}
Expand Down
63 changes: 37 additions & 26 deletions pkg/coordinator/tasks/tx_pool_latency_analysis/task.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package txpool_latency_analysis
package txpoollatencyanalysis

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math"
"math/rand"
"math/big"
"os"
"time"

"github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"

"github.com/ethereum/go-ethereum/crypto"
"github.com/noku-team/assertoor/pkg/coordinator/types"
"github.com/noku-team/assertoor/pkg/coordinator/utils/hdr"
txloadtool "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"
"github.com/noku-team/assertoor/pkg/coordinator/wallet"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -66,8 +66,8 @@ func (t *Task) LoadConfig() error {
return err
}

if err := config.Validate(); err != nil {
return err
if validationErr := config.Validate(); validationErr != nil {
return validationErr
}

privKey, _ := crypto.HexToECDSA(config.PrivateKey)
Expand Down Expand Up @@ -97,11 +97,16 @@ func (t *Task) Execute(ctx context.Context) error {
return nil
}

client := executionClients[rand.Intn(len(executionClients))]
n, err := rand.Int(rand.Reader, big.NewInt(int64(len(executionClients))))
if err != nil {
return fmt.Errorf("failed to generate random number: %w", err)
}

client := executionClients[n.Int64()]

t.logger.Infof("Measuring TxPool transaction propagation *latency*")
t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds",
client.GetName(), t.config.TPS, t.config.Duration_s)
client.GetName(), t.config.TPS, t.config.DurationS)

// Wait for the specified seconds before starting the task
if t.config.SecondsBeforeRunning > 0 {
Expand All @@ -116,10 +121,10 @@ func (t *Task) Execute(ctx context.Context) error {
}

// Prepare to collect transaction latencies
var testDeadline = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second)
var testDeadline = time.Now().Add(time.Duration(t.config.DurationS+60*30) * time.Second)

load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval)
loadTarget := txloadtool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
load := txloadtool.NewLoad(loadTarget, t.config.TPS, t.config.DurationS, testDeadline, t.config.LogInterval)

// Generate and sending transactions, waiting for their propagation
err = load.Execute()
Expand All @@ -145,24 +150,30 @@ func (t *Task) Execute(ctx context.Context) error {
}

// Send txes to other clients, for speeding up tx mining
t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))
// todo: fix sending to other clients
// t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))

for _, tx := range result.Txs {
for _, otherClient := range executionClients {
if otherClient.GetName() == client.GetName() {
continue
}
// for _, tx := range result.Txs {
// for _, otherClient := range executionClients {
// if otherClient.GetName() == client.GetName() {
// continue
// }

otherClient.GetRPCClient().SendTransaction(ctx, tx)
}
}
// if sendErr := otherClient.GetRPCClient().SendTransaction(ctx, tx); sendErr != nil {
// t.logger.Errorf("Failed to send transaction to other client: %v", sendErr)
// t.ctx.SetResult(types.TaskResultFailure)

// return sendErr
// }
// }
// }

t.logger.Infof("Total transactions sent: %d", result.TotalTxs)

// Calculate statistics
var maxLatency int64 = 0
var maxLatency int64

var minLatency int64 = math.MaxInt64
minLatency := int64(math.MaxInt64)

for _, lat := range result.LatenciesMus {
if lat > maxLatency {
Expand All @@ -178,9 +189,9 @@ func (t *Task) Execute(ctx context.Context) error {
maxLatency, maxLatency/1000, minLatency, minLatency/1000)

// Generate HDR plot
plot, err := hdr.HdrPlot(result.LatenciesMus)
if err != nil {
t.logger.Errorf("Failed to generate HDR plot: %v", err)
plot, plotErr := hdr.Plot(result.LatenciesMus)
if plotErr != nil {
t.logger.Errorf("Failed to generate HDR plot: %v", plotErr)
t.ctx.SetResult(types.TaskResultFailure)

return nil
Expand All @@ -190,7 +201,7 @@ func (t *Task) Execute(ctx context.Context) error {

plotFilePath := "tx_pool_latency_hdr_plot.csv"

err = os.WriteFile(plotFilePath, []byte(plot), 0644)
err = os.WriteFile(plotFilePath, []byte(plot), 0o600)
if err != nil {
t.logger.Errorf("Failed to write HDR plot to file: %v", err)
t.ctx.SetResult(types.TaskResultFailure)
Expand Down
6 changes: 3 additions & 3 deletions pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p
- **`tps`**:
The total number of transactions to send in one second.

- **`duration_s`**:
The test duration (the number of transactions to send is calculated as `tps * duration_s`).
- **`durationS`**:
The test duration (the number of transactions to send is calculated as `tps * durationS`).

- **`logInterval`**:
The interval at which the script logs progress (e.g., every 100 transactions).
Expand All @@ -32,7 +32,7 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p
- name: tx_pool_throughput_analysis
config:
tps: 1000
duration_s: 10
durationS: 10
logInterval: 1000
configVars:
privateKey: "walletPrivkey"
Expand Down
6 changes: 3 additions & 3 deletions pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package tx_pool_throughput_analysis
package txpoolthroughputanalysis

type Config struct {
PrivateKey string `yaml:"privateKey" json:"privateKey"`

TPS int `yaml:"tps" json:"tps"`
Duration_s int `yaml:"duration_s" json:"duration_s"`
DurationS int `yaml:"durationS" json:"durationS"`
LogInterval int `yaml:"logInterval" json:"logInterval"`
SecondsBeforeRunning int `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"`
}

func DefaultConfig() Config {
return Config{
TPS: 100,
Duration_s: 60,
DurationS: 60,
LogInterval: 100,
SecondsBeforeRunning: 0,
}
Expand Down
74 changes: 42 additions & 32 deletions pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package tx_pool_throughput_analysis
package txpoolthroughputanalysis

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math/rand"
"math/big"
"time"

"github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"

"github.com/ethereum/go-ethereum/crypto"
"github.com/noku-team/assertoor/pkg/coordinator/types"
txloadtool "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"
"github.com/noku-team/assertoor/pkg/coordinator/wallet"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -63,8 +63,8 @@ func (t *Task) LoadConfig() error {
return err
}

if err := config.Validate(); err != nil {
return err
if validationErr := config.Validate(); validationErr != nil {
return validationErr
}

privKey, _ := crypto.HexToECDSA(config.PrivateKey)
Expand Down Expand Up @@ -94,11 +94,16 @@ func (t *Task) Execute(ctx context.Context) error {
return nil
}

client := executionClients[rand.Intn(len(executionClients))]
n, randErr := rand.Int(rand.Reader, big.NewInt(int64(len(executionClients))))
if randErr != nil {
return fmt.Errorf("failed to generate random number: %w", randErr)
}

client := executionClients[n.Int64()]

t.logger.Infof("Measuring TxPool transaction propagation *throughput*")
t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds",
client.GetName(), t.config.TPS, t.config.Duration_s)
client.GetName(), t.config.TPS, t.config.DurationS)

// Wait for the specified seconds before starting the task
if t.config.SecondsBeforeRunning > 0 {
Expand All @@ -113,27 +118,27 @@ func (t *Task) Execute(ctx context.Context) error {
}

// Prepare to collect transaction latencies
var testDeadline = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second)
testDeadline := time.Now().Add(time.Duration(t.config.DurationS+60*30) * time.Second)

load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval)
loadTarget := txloadtool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
load := txloadtool.NewLoad(loadTarget, t.config.TPS, t.config.DurationS, testDeadline, t.config.LogInterval)

// Generate and sending transactions, waiting for their propagation
err = load.Execute()
if err != nil {
t.logger.Errorf("Error during transaction load execution: %v", err)
execErr := load.Execute()
if execErr != nil {
t.logger.Errorf("Error during transaction load execution: %v", execErr)
t.ctx.SetResult(types.TaskResultFailure)

return err
return execErr
}

// Collect the transactions and their latencies
result, err := load.MeasurePropagationLatencies()
if err != nil {
t.logger.Errorf("Error measuring transaction propagation latencies: %v", err)
result, measureErr := load.MeasurePropagationLatencies()
if measureErr != nil {
t.logger.Errorf("Error measuring transaction propagation latencies: %v", measureErr)
t.ctx.SetResult(types.TaskResultFailure)

return err
return measureErr
}

// Check if the context was cancelled or other errors occurred
Expand All @@ -142,30 +147,35 @@ func (t *Task) Execute(ctx context.Context) error {
}

// Send txes to other clients, for speeding up tx mining
t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))
// t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))

for _, tx := range result.Txs {
for _, otherClient := range executionClients {
if otherClient.GetName() == client.GetName() {
continue
}
// for _, tx := range result.Txs {
// for _, otherClient := range executionClients {
// if otherClient.GetName() == client.GetName() {
// continue
// }

otherClient.GetRPCClient().SendTransaction(ctx, tx)
}
}
// if sendErr := otherClient.GetRPCClient().SendTransaction(ctx, tx); sendErr != nil {
// t.logger.Errorf("Failed to send transaction to other client: %v", sendErr)
// t.ctx.SetResult(types.TaskResultFailure)

// return sendErr
// }
// }
// }

t.logger.Infof("Total transactions sent: %d", result.TotalTxs)

// Calculate statistics
t.logger.Infof("Last measure delay since start time: %s", result.LastMeasureDelay)

processed_tx_per_second := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds()
processedTxPerSecond := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds()

t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s",
result.TotalTxs, result.LastMeasureDelay.Seconds(), processed_tx_per_second)
result.TotalTxs, result.LastMeasureDelay.Seconds(), processedTxPerSecond)
t.logger.Infof("Sent %d transactions in %.2fs", result.TotalTxs, result.LastMeasureDelay.Seconds())

t.ctx.Outputs.SetVar("mean_tps_throughput", processed_tx_per_second)
t.ctx.Outputs.SetVar("mean_tps_throughput", processedTxPerSecond)
t.ctx.Outputs.SetVar("tx_count", result.TotalTxs)
t.ctx.Outputs.SetVar("duplicated_p2p_event_count", result.DuplicatedP2PEventCount)
t.ctx.Outputs.SetVar("missed_p2p_event_count", result.NotReceivedP2PEventCount)
Expand All @@ -175,7 +185,7 @@ func (t *Task) Execute(ctx context.Context) error {

outputs := map[string]interface{}{
"tx_count": result.TotalTxs,
"mean_tps_throughput": processed_tx_per_second,
"mean_tps_throughput": processedTxPerSecond,
"duplicated_p2p_event_count": result.DuplicatedP2PEventCount,
"coordinated_omission_events_count": result.CoordinatedOmissionEventCount,
"missed_p2p_event_count": result.NotReceivedP2PEventCount,
Expand Down
Loading