Skip to content

Commit 32a7510

Browse files
Lint/all (#11)
* chore(lint) * refactor: rename ConnectToP2p to connectToP2p for consistency in naming conventions * refactor: standardize naming conventions and improve code consistency across tx_pool_latency_analysis and tx_pool_throughput_analysis tasks * refactor: simplify variable declarations for improved readability in chain and conn utilities * fix: add checks for negative values and maximum limits in chain, conn, and udp utilities * fix: add check to skip non-positive values in histogram plotting * refactor: comment out transaction sending logic in tx_pool_latency_analysis and tx_pool_throughput_analysis tasks for future review * fix: update histogram plotting to skip negative values * fix: add TODO comment for fixing transaction sending logic in tx_pool_latency_analysis
1 parent e4ded66 commit 32a7510

File tree

14 files changed

+289
-187
lines changed

14 files changed

+289
-187
lines changed

pkg/coordinator/tasks/tx_pool_clean/task.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,17 @@ func (t *Task) LoadConfig() error {
6969
return nil
7070
}
7171

72-
func (t *Task) Execute(ctx context.Context) error {
72+
func (t *Task) Execute(_ context.Context) error {
7373
clientPool := t.ctx.Scheduler.GetServices().ClientPool()
7474
executionClients := clientPool.GetExecutionPool().GetReadyEndpoints(true)
7575

7676
t.logger.Infof("Found %d execution clients", len(executionClients))
7777

7878
for _, client := range executionClients {
79-
t.cleanRecursive(client)
79+
err := t.cleanRecursive(client)
80+
if err != nil {
81+
return err
82+
}
8083
}
8184

8285
t.ctx.SetResult(types.TaskResultSuccess)

pkg/coordinator/tasks/tx_pool_latency_analysis/README.md

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing
1212
- **`tps`**:
1313
The total number of transactions to send in one second.
1414

15-
- **`duration_s`**:
16-
The test duration (the number of transactions to send is calculated as `tps * duration_s`).
15+
- **`durationS`**:
16+
The test duration (the number of transactions to send is calculated as `tps * durationS`).
1717

1818
- **`logInterval`**:
1919
The interval at which the script logs progress (e.g., every 100 transactions).
@@ -38,10 +38,8 @@ The `tx_pool_latency_analysis` task evaluates latency of transaction processing
3838
- name: tx_pool_latency_analysis
3939
config:
4040
tps: 1000
41-
duration_s: 10
41+
durationS: 10
4242
logInterval: 1000
4343
configVars:
4444
privateKey: "walletPrivkey"
4545
```
46-
47-

pkg/coordinator/tasks/tx_pool_latency_analysis/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
package txpool_latency_analysis
1+
package txpoollatencyanalysis
22

33
type Config struct {
44
PrivateKey string `yaml:"privateKey" json:"privateKey"`
55

66
TPS int `yaml:"tps" json:"tps"`
7-
Duration_s int `yaml:"duration_s" json:"duration_s"`
7+
DurationS int `yaml:"durationS" json:"durationS"`
88
LogInterval int `yaml:"logInterval" json:"logInterval"`
99
SecondsBeforeRunning int64 `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"`
1010
}
1111

1212
func DefaultConfig() Config {
1313
return Config{
1414
TPS: 100,
15-
Duration_s: 60,
15+
DurationS: 60,
1616
LogInterval: 100,
1717
SecondsBeforeRunning: 0,
1818
}

pkg/coordinator/tasks/tx_pool_latency_analysis/task.go

Lines changed: 37 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
1-
package txpool_latency_analysis
1+
package txpoollatencyanalysis
22

33
import (
44
"context"
5+
"crypto/rand"
56
"encoding/json"
67
"fmt"
78
"math"
8-
"math/rand"
9+
"math/big"
910
"os"
1011
"time"
1112

12-
"github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"
13-
1413
"github.com/ethereum/go-ethereum/crypto"
1514
"github.com/noku-team/assertoor/pkg/coordinator/types"
1615
"github.com/noku-team/assertoor/pkg/coordinator/utils/hdr"
16+
txloadtool "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"
1717
"github.com/noku-team/assertoor/pkg/coordinator/wallet"
1818
"github.com/sirupsen/logrus"
1919
)
@@ -66,8 +66,8 @@ func (t *Task) LoadConfig() error {
6666
return err
6767
}
6868

69-
if err := config.Validate(); err != nil {
70-
return err
69+
if validationErr := config.Validate(); validationErr != nil {
70+
return validationErr
7171
}
7272

7373
privKey, _ := crypto.HexToECDSA(config.PrivateKey)
@@ -97,11 +97,16 @@ func (t *Task) Execute(ctx context.Context) error {
9797
return nil
9898
}
9999

100-
client := executionClients[rand.Intn(len(executionClients))]
100+
n, err := rand.Int(rand.Reader, big.NewInt(int64(len(executionClients))))
101+
if err != nil {
102+
return fmt.Errorf("failed to generate random number: %w", err)
103+
}
104+
105+
client := executionClients[n.Int64()]
101106

102107
t.logger.Infof("Measuring TxPool transaction propagation *latency*")
103108
t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds",
104-
client.GetName(), t.config.TPS, t.config.Duration_s)
109+
client.GetName(), t.config.TPS, t.config.DurationS)
105110

106111
// Wait for the specified seconds before starting the task
107112
if t.config.SecondsBeforeRunning > 0 {
@@ -116,10 +121,10 @@ func (t *Task) Execute(ctx context.Context) error {
116121
}
117122

118123
// Prepare to collect transaction latencies
119-
var testDeadline = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second)
124+
var testDeadline = time.Now().Add(time.Duration(t.config.DurationS+60*30) * time.Second)
120125

121-
load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
122-
load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval)
126+
loadTarget := txloadtool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
127+
load := txloadtool.NewLoad(loadTarget, t.config.TPS, t.config.DurationS, testDeadline, t.config.LogInterval)
123128

124129
// Generate and sending transactions, waiting for their propagation
125130
err = load.Execute()
@@ -145,24 +150,30 @@ func (t *Task) Execute(ctx context.Context) error {
145150
}
146151

147152
// Send txes to other clients, for speeding up tx mining
148-
t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))
153+
// todo: fix sending to other clients
154+
// t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))
149155

150-
for _, tx := range result.Txs {
151-
for _, otherClient := range executionClients {
152-
if otherClient.GetName() == client.GetName() {
153-
continue
154-
}
156+
// for _, tx := range result.Txs {
157+
// for _, otherClient := range executionClients {
158+
// if otherClient.GetName() == client.GetName() {
159+
// continue
160+
// }
155161

156-
otherClient.GetRPCClient().SendTransaction(ctx, tx)
157-
}
158-
}
162+
// if sendErr := otherClient.GetRPCClient().SendTransaction(ctx, tx); sendErr != nil {
163+
// t.logger.Errorf("Failed to send transaction to other client: %v", sendErr)
164+
// t.ctx.SetResult(types.TaskResultFailure)
165+
166+
// return sendErr
167+
// }
168+
// }
169+
// }
159170

160171
t.logger.Infof("Total transactions sent: %d", result.TotalTxs)
161172

162173
// Calculate statistics
163-
var maxLatency int64 = 0
174+
var maxLatency int64
164175

165-
var minLatency int64 = math.MaxInt64
176+
minLatency := int64(math.MaxInt64)
166177

167178
for _, lat := range result.LatenciesMus {
168179
if lat > maxLatency {
@@ -178,9 +189,9 @@ func (t *Task) Execute(ctx context.Context) error {
178189
maxLatency, maxLatency/1000, minLatency, minLatency/1000)
179190

180191
// Generate HDR plot
181-
plot, err := hdr.HdrPlot(result.LatenciesMus)
182-
if err != nil {
183-
t.logger.Errorf("Failed to generate HDR plot: %v", err)
192+
plot, plotErr := hdr.Plot(result.LatenciesMus)
193+
if plotErr != nil {
194+
t.logger.Errorf("Failed to generate HDR plot: %v", plotErr)
184195
t.ctx.SetResult(types.TaskResultFailure)
185196

186197
return nil
@@ -190,7 +201,7 @@ func (t *Task) Execute(ctx context.Context) error {
190201

191202
plotFilePath := "tx_pool_latency_hdr_plot.csv"
192203

193-
err = os.WriteFile(plotFilePath, []byte(plot), 0644)
204+
err = os.WriteFile(plotFilePath, []byte(plot), 0o600)
194205
if err != nil {
195206
t.logger.Errorf("Failed to write HDR plot to file: %v", err)
196207
t.ctx.SetResult(types.TaskResultFailure)

pkg/coordinator/tasks/tx_pool_throughput_analysis/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p
1212
- **`tps`**:
1313
The total number of transactions to send in one second.
1414

15-
- **`duration_s`**:
16-
The test duration (the number of transactions to send is calculated as `tps * duration_s`).
15+
- **`durationS`**:
16+
The test duration (the number of transactions to send is calculated as `tps * durationS`).
1717

1818
- **`logInterval`**:
1919
The interval at which the script logs progress (e.g., every 100 transactions).
@@ -32,7 +32,7 @@ The `tx_pool_throughput_analysis` task evaluates the throughput of transaction p
3232
- name: tx_pool_throughput_analysis
3333
config:
3434
tps: 1000
35-
duration_s: 10
35+
durationS: 10
3636
logInterval: 1000
3737
configVars:
3838
privateKey: "walletPrivkey"

pkg/coordinator/tasks/tx_pool_throughput_analysis/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
package tx_pool_throughput_analysis
1+
package txpoolthroughputanalysis
22

33
type Config struct {
44
PrivateKey string `yaml:"privateKey" json:"privateKey"`
55

66
TPS int `yaml:"tps" json:"tps"`
7-
Duration_s int `yaml:"duration_s" json:"duration_s"`
7+
DurationS int `yaml:"durationS" json:"durationS"`
88
LogInterval int `yaml:"logInterval" json:"logInterval"`
99
SecondsBeforeRunning int `yaml:"secondsBeforeRunning" json:"secondsBeforeRunning"`
1010
}
1111

1212
func DefaultConfig() Config {
1313
return Config{
1414
TPS: 100,
15-
Duration_s: 60,
15+
DurationS: 60,
1616
LogInterval: 100,
1717
SecondsBeforeRunning: 0,
1818
}

pkg/coordinator/tasks/tx_pool_throughput_analysis/task.go

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1-
package tx_pool_throughput_analysis
1+
package txpoolthroughputanalysis
22

33
import (
44
"context"
5+
"crypto/rand"
56
"encoding/json"
67
"fmt"
7-
"math/rand"
8+
"math/big"
89
"time"
910

10-
"github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"
11-
1211
"github.com/ethereum/go-ethereum/crypto"
1312
"github.com/noku-team/assertoor/pkg/coordinator/types"
13+
txloadtool "github.com/noku-team/assertoor/pkg/coordinator/utils/tx_load_tool"
1414
"github.com/noku-team/assertoor/pkg/coordinator/wallet"
1515
"github.com/sirupsen/logrus"
1616
)
@@ -63,8 +63,8 @@ func (t *Task) LoadConfig() error {
6363
return err
6464
}
6565

66-
if err := config.Validate(); err != nil {
67-
return err
66+
if validationErr := config.Validate(); validationErr != nil {
67+
return validationErr
6868
}
6969

7070
privKey, _ := crypto.HexToECDSA(config.PrivateKey)
@@ -94,11 +94,16 @@ func (t *Task) Execute(ctx context.Context) error {
9494
return nil
9595
}
9696

97-
client := executionClients[rand.Intn(len(executionClients))]
97+
n, randErr := rand.Int(rand.Reader, big.NewInt(int64(len(executionClients))))
98+
if randErr != nil {
99+
return fmt.Errorf("failed to generate random number: %w", randErr)
100+
}
101+
102+
client := executionClients[n.Int64()]
98103

99104
t.logger.Infof("Measuring TxPool transaction propagation *throughput*")
100105
t.logger.Infof("Targeting client: %s, TPS: %d, Duration: %d seconds",
101-
client.GetName(), t.config.TPS, t.config.Duration_s)
106+
client.GetName(), t.config.TPS, t.config.DurationS)
102107

103108
// Wait for the specified seconds before starting the task
104109
if t.config.SecondsBeforeRunning > 0 {
@@ -113,27 +118,27 @@ func (t *Task) Execute(ctx context.Context) error {
113118
}
114119

115120
// Prepare to collect transaction latencies
116-
var testDeadline = time.Now().Add(time.Duration(t.config.Duration_s+60*30) * time.Second)
121+
testDeadline := time.Now().Add(time.Duration(t.config.DurationS+60*30) * time.Second)
117122

118-
load_target := tx_load_tool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
119-
load := tx_load_tool.NewLoad(load_target, t.config.TPS, t.config.Duration_s, testDeadline, t.config.LogInterval)
123+
loadTarget := txloadtool.NewLoadTarget(ctx, t.ctx, t.logger, t.wallet, client)
124+
load := txloadtool.NewLoad(loadTarget, t.config.TPS, t.config.DurationS, testDeadline, t.config.LogInterval)
120125

121126
// Generate and sending transactions, waiting for their propagation
122-
err = load.Execute()
123-
if err != nil {
124-
t.logger.Errorf("Error during transaction load execution: %v", err)
127+
execErr := load.Execute()
128+
if execErr != nil {
129+
t.logger.Errorf("Error during transaction load execution: %v", execErr)
125130
t.ctx.SetResult(types.TaskResultFailure)
126131

127-
return err
132+
return execErr
128133
}
129134

130135
// Collect the transactions and their latencies
131-
result, err := load.MeasurePropagationLatencies()
132-
if err != nil {
133-
t.logger.Errorf("Error measuring transaction propagation latencies: %v", err)
136+
result, measureErr := load.MeasurePropagationLatencies()
137+
if measureErr != nil {
138+
t.logger.Errorf("Error measuring transaction propagation latencies: %v", measureErr)
134139
t.ctx.SetResult(types.TaskResultFailure)
135140

136-
return err
141+
return measureErr
137142
}
138143

139144
// Check if the context was cancelled or other errors occurred
@@ -142,30 +147,35 @@ func (t *Task) Execute(ctx context.Context) error {
142147
}
143148

144149
// Send txes to other clients, for speeding up tx mining
145-
t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))
150+
// t.logger.Infof("Sending %d transactions to other clients for mining", len(result.Txs))
146151

147-
for _, tx := range result.Txs {
148-
for _, otherClient := range executionClients {
149-
if otherClient.GetName() == client.GetName() {
150-
continue
151-
}
152+
// for _, tx := range result.Txs {
153+
// for _, otherClient := range executionClients {
154+
// if otherClient.GetName() == client.GetName() {
155+
// continue
156+
// }
152157

153-
otherClient.GetRPCClient().SendTransaction(ctx, tx)
154-
}
155-
}
158+
// if sendErr := otherClient.GetRPCClient().SendTransaction(ctx, tx); sendErr != nil {
159+
// t.logger.Errorf("Failed to send transaction to other client: %v", sendErr)
160+
// t.ctx.SetResult(types.TaskResultFailure)
161+
162+
// return sendErr
163+
// }
164+
// }
165+
// }
156166

157167
t.logger.Infof("Total transactions sent: %d", result.TotalTxs)
158168

159169
// Calculate statistics
160170
t.logger.Infof("Last measure delay since start time: %s", result.LastMeasureDelay)
161171

162-
processed_tx_per_second := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds()
172+
processedTxPerSecond := float64(result.TotalTxs) / result.LastMeasureDelay.Seconds()
163173

164174
t.logger.Infof("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s",
165-
result.TotalTxs, result.LastMeasureDelay.Seconds(), processed_tx_per_second)
175+
result.TotalTxs, result.LastMeasureDelay.Seconds(), processedTxPerSecond)
166176
t.logger.Infof("Sent %d transactions in %.2fs", result.TotalTxs, result.LastMeasureDelay.Seconds())
167177

168-
t.ctx.Outputs.SetVar("mean_tps_throughput", processed_tx_per_second)
178+
t.ctx.Outputs.SetVar("mean_tps_throughput", processedTxPerSecond)
169179
t.ctx.Outputs.SetVar("tx_count", result.TotalTxs)
170180
t.ctx.Outputs.SetVar("duplicated_p2p_event_count", result.DuplicatedP2PEventCount)
171181
t.ctx.Outputs.SetVar("missed_p2p_event_count", result.NotReceivedP2PEventCount)
@@ -175,7 +185,7 @@ func (t *Task) Execute(ctx context.Context) error {
175185

176186
outputs := map[string]interface{}{
177187
"tx_count": result.TotalTxs,
178-
"mean_tps_throughput": processed_tx_per_second,
188+
"mean_tps_throughput": processedTxPerSecond,
179189
"duplicated_p2p_event_count": result.DuplicatedP2PEventCount,
180190
"coordinated_omission_events_count": result.CoordinatedOmissionEventCount,
181191
"missed_p2p_event_count": result.NotReceivedP2PEventCount,

0 commit comments

Comments
 (0)