44 "context"
55 "crypto/rand"
66 "encoding/json"
7+ "errors"
78 "fmt"
89 "math/big"
910 "time"
2526 }
2627)
2728
29+ type ThroughoutMeasure struct {
30+ LoadTPS int `json:"load_tps"`
31+ ProcessedTPS int `json:"processed_tps"`
32+ }
33+
2834type Task struct {
2935 ctx * types.TaskContext
3036 options * types.TaskOptions
@@ -102,8 +108,8 @@ func (t *Task) Execute(ctx context.Context) error {
102108 client := executionClients [n .Int64 ()]
103109
104110 t .logger .Infof ("Measuring TxPool transaction propagation *throughput*" )
105- t .logger .Infof ("Targeting client: %s, TPS: %d, Duration: %d seconds" ,
106- client .GetName (), t .config .TPS , t .config .DurationS )
111+ t .logger .Infof ("Targeting client: %s, Starting TPS: %d, Ending TPS: %d, Increment TPS: %d, Duration: %d seconds" ,
112+ client .GetName (), t .config .StartingTPS , t . config . EndingTPS , t . config . IncrementTPS , t .config .DurationS )
107113
108114 // Wait for the specified seconds before starting the task
109115 if t .config .SecondsBeforeRunning > 0 {
@@ -117,19 +123,69 @@ func (t *Task) Execute(ctx context.Context) error {
117123 }
118124 }
119125
120- // Prepare to collect transaction latencies
121- testDeadline := time .Now ().Add (time .Duration (t .config .DurationS + 60 * 30 ) * time .Second )
122-
126+ // Create a new load target for the transaction propagation measurement
123127 loadTarget := txloadtool .NewLoadTarget (ctx , t .ctx , t .logger , t .wallet , client )
124- load := txloadtool .NewLoad (loadTarget , t .config .TPS , t .config .DurationS , testDeadline , t .config .LogInterval )
128+
129+ percentile := 0.99 // 0.95 should be enough, change in the future if needed
130+ singleMeasureDeadline := time .Now ().Add (time .Duration (t .config .DurationS + 60 * 30 ) * time .Second )
131+
132+ // slice of pairs: sending tps, processed TPS values
133+ var throughoutMeasures []ThroughoutMeasure
134+
135+ // Iterate over the TPS range and crate a plot processedTps vs sendingTps
136+ t .logger .Infof ("Iterating over the TPS range, starting TPS: %d, ending TPS: %d, increment TPS: %d" ,
137+ t .config .StartingTPS , t .config .EndingTPS , t .config .IncrementTPS )
138+
139+ for sendingTps := t .config .StartingTPS ; sendingTps <= t .config .EndingTPS ; sendingTps += t .config .IncrementTPS {
140+ // measure the throughput with the current sendingTps
141+ processedTps , err := t .measureTpsWithLoad (loadTarget , sendingTps , t .config .DurationS , singleMeasureDeadline , percentile )
142+ if err != nil {
143+ t .logger .Errorf ("Error during throughput measurement with sendingTps=%d, duration=%d: %v" , sendingTps , t .config .DurationS , err )
144+ t .ctx .SetResult (types .TaskResultFailure )
145+
146+ return err
147+ }
148+
149+ // add to throughoutMeasures
150+ throughoutMeasures = append (throughoutMeasures , ThroughoutMeasure {
151+ LoadTPS : sendingTps ,
152+ ProcessedTPS : processedTps ,
153+ })
154+ }
155+
156+ t .logger .Infof ("Finished measuring throughput, collected %d measures" , len (throughoutMeasures ))
157+
158+ // Set the throughput measures in the task context outputs
159+ // from this plot we can compute the Maximum Sustainable Throughput or Capacity limit
160+ t .ctx .Outputs .SetVar ("throughput_measures" , throughoutMeasures ) // log coordinated_omission_event_count and missed_p2p_event_count?
161+
162+ outputs := map [string ]interface {}{
163+ "throughput_measures" : throughoutMeasures ,
164+ }
165+
166+ outputsJSON , _ := json .Marshal (outputs )
167+ t .logger .Infof ("outputs_json: %s" , string (outputsJSON ))
168+
169+ // Set the task result to success
170+ t .ctx .SetResult (types .TaskResultSuccess )
171+
172+ return nil
173+ }
174+
175+ func (t * Task ) measureTpsWithLoad (loadTarget * txloadtool.LoadTarget , sendingTps , durationS int ,
176+ testDeadline time.Time , percentile float64 ) (int , error ) {
177+ t .logger .Infof ("Single measure of throughput, sending TPS: %d, duration: %d secs" , sendingTps , durationS )
178+
179+ // Prepare to collect transaction latencies
180+ load := txloadtool .NewLoad (loadTarget , sendingTps , durationS , testDeadline , t .config .LogInterval )
125181
126182 // Generate and sending transactions, waiting for their propagation
127183 execErr := load .Execute ()
128184 if execErr != nil {
129185 t .logger .Errorf ("Error during transaction load execution: %v" , execErr )
130186 t .ctx .SetResult (types .TaskResultFailure )
131187
132- return execErr
188+ return 0 , execErr
133189 }
134190
135191 // Collect the transactions and their latencies
@@ -138,12 +194,12 @@ func (t *Task) Execute(ctx context.Context) error {
138194 t .logger .Errorf ("Error measuring transaction propagation latencies: %v" , measureErr )
139195 t .ctx .SetResult (types .TaskResultFailure )
140196
141- return measureErr
197+ return 0 , measureErr
142198 }
143199
144200 // Check if the context was cancelled or other errors occurred
145201 if result .Failed {
146- return fmt .Errorf ("error measuring transaction propagation latencies: load failed" )
202+ return 0 , fmt .Errorf ("error measuring transaction propagation latencies: load failed" )
147203 }
148204
149205 // Send txes to other clients, for speeding up tx mining
@@ -166,33 +222,23 @@ func (t *Task) Execute(ctx context.Context) error {
166222
167223 t .logger .Infof ("Total transactions sent: %d" , result .TotalTxs )
168224
169- // Calculate statistics
225+ if percentile != 0.99 {
226+ // Calculate the percentile of latencies using result.LatenciesMus
227+ // Not implemented yet
228+ notImpl := errors .New ("percentile selection not implemented, use 0.99" )
229+ return 0 , notImpl
230+ }
231+
232+ t .logger .Infof ("Using 0.99 percentile for latency calculation" )
233+
170234 t .logger .Infof ("Last measure delay since start time: %s" , result .LastMeasureDelay )
171235
172- processedTxPerSecond := float64 (result .TotalTxs ) / result .LastMeasureDelay .Seconds ()
236+ processedTpsF := float64 (result .TotalTxs ) / result .LastMeasureDelay .Seconds ()
237+ processedTps := int (processedTpsF ) // round
173238
174239 t .logger .Infof ("Processed %d transactions in %.2fs, mean throughput: %.2f tx/s" ,
175- result .TotalTxs , result .LastMeasureDelay .Seconds (), processedTxPerSecond )
240+ result .TotalTxs , result .LastMeasureDelay .Seconds (), processedTpsF )
176241 t .logger .Infof ("Sent %d transactions in %.2fs" , result .TotalTxs , result .LastMeasureDelay .Seconds ())
177242
178- t .ctx .Outputs .SetVar ("mean_tps_throughput" , processedTxPerSecond )
179- t .ctx .Outputs .SetVar ("tx_count" , result .TotalTxs )
180- t .ctx .Outputs .SetVar ("duplicated_p2p_event_count" , result .DuplicatedP2PEventCount )
181- t .ctx .Outputs .SetVar ("missed_p2p_event_count" , result .NotReceivedP2PEventCount )
182- t .ctx .Outputs .SetVar ("coordinated_omission_event_count" , result .CoordinatedOmissionEventCount )
183-
184- t .ctx .SetResult (types .TaskResultSuccess )
185-
186- outputs := map [string ]interface {}{
187- "tx_count" : result .TotalTxs ,
188- "mean_tps_throughput" : processedTxPerSecond ,
189- "duplicated_p2p_event_count" : result .DuplicatedP2PEventCount ,
190- "coordinated_omission_events_count" : result .CoordinatedOmissionEventCount ,
191- "missed_p2p_event_count" : result .NotReceivedP2PEventCount ,
192- }
193-
194- outputsJSON , _ := json .Marshal (outputs )
195- t .logger .Infof ("outputs_json: %s" , string (outputsJSON ))
196-
197- return nil
243+ return processedTps , nil
198244}
0 commit comments