Skip to content

Commit 7ebac20

Browse files
cmd publish (#608)
* add command publish * cmd publish scanner fix; add worker pool safe job; add rate-limit * gen-doc * changes requested due to PR review * gen-docs * publish: improve output; script to create tx samples * fix linter issue * fix(monitor): handle missing safe/finalized blocks gracefully Instead of failing when nodes don't support safe/finalized block queries, log debug messages and default to 0. This improves compatibility with older or non-standard node implementations. --------- Co-authored-by: John Hilliard <[email protected]>
1 parent 8d62c07 commit 7ebac20

File tree

11 files changed

+801
-6
lines changed

11 files changed

+801
-6
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ Note: Do not modify this section! It is auto-generated by `cobra` using `make ge
7575

7676
- [polycli parseethwallet](doc/polycli_parseethwallet.md) - Extract the private key from an eth wallet.
7777

78+
- [polycli publish](doc/polycli_publish.md) - Publish transactions to the network with high-throughput
79+
7880
- [polycli retest](doc/polycli_retest.md) - Convert the standard ETH test fillers into something to be replayed against an RPC
7981

8082
- [polycli rpcfuzz](doc/polycli_rpcfuzz.md) - Continually run a variety of RPC calls and fuzzers.

cmd/monitor/monitor.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,17 +241,17 @@ func getChainState(ctx context.Context, ec *ethclient.Client, txPoolStatusSuppor
241241

242242
safeBlock, err := ec.HeaderByNumber(ctx, big.NewInt(int64(rpc.SafeBlockNumber)))
243243
if err != nil {
244-
return nil, fmt.Errorf("couldn't fetch safe block number: %s", err.Error())
245-
}
246-
if safeBlock != nil {
244+
log.Debug().Err(err).Msg("Unable to fetch safe block number")
245+
cs.SafeBlock = 0
246+
} else if safeBlock != nil {
247247
cs.SafeBlock = safeBlock.Number.Uint64()
248248
}
249249

250250
finalizedBlock, err := ec.HeaderByNumber(ctx, big.NewInt(int64(rpc.FinalizedBlockNumber)))
251251
if err != nil {
252-
return nil, fmt.Errorf("couldn't fetch finalized block number: %s", err.Error())
253-
}
254-
if finalizedBlock != nil {
252+
log.Debug().Err(err).Msg("Unable to fetch finalized block number")
253+
cs.FinalizedBlock = 0
254+
} else if finalizedBlock != nil {
255255
cs.FinalizedBlock = finalizedBlock.Number.Uint64()
256256
}
257257

cmd/publish/input.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package publish
2+
3+
import (
4+
"bufio"
5+
"encoding/hex"
6+
"fmt"
7+
"iter"
8+
"os"
9+
"strings"
10+
11+
"github.com/ethereum/go-ethereum/core/types"
12+
"github.com/rs/zerolog/log"
13+
)
14+
15+
const scannerBufferSize = 1024 * 1024
16+
17+
func getInputData(inputFileName *string, args []string) (iter.Seq[string], inputDataSource, error) {
18+
// firstly check and see if we have an input file
19+
if inputFileName != nil && *inputFileName != "" {
20+
// If we get here, we're going to assume the user
21+
// wants to load transactions from a file and we're not going to look
22+
// for other input sources
23+
return dataFromFile(*inputFileName)
24+
}
25+
26+
// secondly check and see if we have any args
27+
if len(args) > 0 {
28+
// checks if any of the args start with 0x, if so, return only the args
29+
// that start with 0x
30+
txArgs := make([]string, 0)
31+
for _, arg := range args {
32+
if arg[:2] == "0x" {
33+
txArgs = append(txArgs, arg)
34+
}
35+
}
36+
37+
// If we get here, we're going to assume the user wants to load transactions
38+
// from the command line and we're not going to look for other input sources
39+
if len(txArgs) > 0 {
40+
return dataFromArgs(txArgs)
41+
}
42+
}
43+
44+
// if we get here, we're going to assume the user wants to load transactions
45+
// from stdin or from the command line
46+
return dataFromStdin()
47+
}
48+
49+
func dataFromArgs(args []string) (iter.Seq[string], inputDataSource, error) {
50+
log.Info().Msg("Reading data from args")
51+
52+
return func(yield func(string) bool) {
53+
for _, arg := range args {
54+
if !yield(arg) {
55+
return
56+
}
57+
}
58+
}, InputDataSourceArgs, nil
59+
}
60+
61+
// dataFromFile returns an iterator that reads lines from a file
62+
func dataFromFile(filename string) (iter.Seq[string], inputDataSource, error) {
63+
f, err := os.Open(filename)
64+
if err != nil {
65+
return func(yield func(string) bool) {}, InputDataSourceFile, err
66+
}
67+
log.Info().
68+
Str("filename", filename).
69+
Msg("Reading data from file")
70+
71+
return func(yield func(string) bool) {
72+
// Ensure the file is closed after the function exits
73+
defer f.Close()
74+
s := bufio.NewScanner(f)
75+
sBuf := make([]byte, 0)
76+
s.Buffer(sBuf, scannerBufferSize)
77+
for s.Scan() {
78+
if !yield(s.Text()) {
79+
return
80+
}
81+
}
82+
if err := s.Err(); err != nil {
83+
log.Error().
84+
Err(err).
85+
Str("filename", filename).
86+
Msg("error scanning file")
87+
}
88+
}, InputDataSourceFile, nil
89+
}
90+
91+
// dataFromStdin returns an iterator that reads lines from stdin
92+
func dataFromStdin() (iter.Seq[string], inputDataSource, error) {
93+
fmt.Println("Reading data from Stdin, type the transactions you want to publish:")
94+
95+
return func(yield func(string) bool) {
96+
s := bufio.NewScanner(os.Stdin)
97+
sBuf := make([]byte, 0)
98+
s.Buffer(sBuf, scannerBufferSize)
99+
for s.Scan() {
100+
if !yield(s.Text()) {
101+
return
102+
}
103+
}
104+
if err := s.Err(); err != nil {
105+
log.Error().
106+
Err(err).
107+
Msg("error scanning stdin")
108+
}
109+
}, InputDataSourceStdin, nil
110+
}
111+
112+
// inputDataItemToTx converts an input data item that represents a transaction
113+
// rlp hex encoded into a transaction
114+
func inputDataItemToTx(inputDataItem string) (*types.Transaction, error) {
115+
tx := new(types.Transaction)
116+
117+
inputDataItem = strings.TrimPrefix(inputDataItem, "0x")
118+
119+
// Check if the string has an odd length
120+
if len(inputDataItem)%2 != 0 {
121+
// Prepend a '0' to make it even-length
122+
inputDataItem = "0" + inputDataItem
123+
}
124+
125+
b, err := hex.DecodeString(inputDataItem)
126+
if err != nil {
127+
return nil, err
128+
}
129+
130+
if err := tx.UnmarshalBinary(b); err != nil {
131+
return nil, err
132+
}
133+
134+
return tx, nil
135+
}

cmd/publish/output.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package publish
2+
3+
import (
4+
"fmt"
5+
"sync/atomic"
6+
"time"
7+
)
8+
9+
const (
10+
InputDataSourceFile = "file"
11+
InputDataSourceArgs = "args"
12+
InputDataSourceStdin = "stdin"
13+
)
14+
15+
type inputDataSource string
16+
17+
type output struct {
18+
InputDataSource inputDataSource
19+
InputDataCount atomic.Uint64
20+
ValidInputs atomic.Uint64
21+
InvalidInputs atomic.Uint64
22+
StartTime time.Time
23+
EndTime time.Time
24+
TxsSentSuccessfully atomic.Uint64
25+
TxsSentUnsuccessfully atomic.Uint64
26+
}
27+
28+
func (s *output) Start() {
29+
s.StartTime = time.Now()
30+
}
31+
32+
func (s *output) Stop() {
33+
s.EndTime = time.Now()
34+
}
35+
36+
func (s *output) Print() {
37+
elapsed := s.EndTime.Sub(s.StartTime)
38+
elapsedSeconds := elapsed.Seconds()
39+
if elapsedSeconds == 0 {
40+
elapsedSeconds = 1
41+
}
42+
txSent := s.TxsSentSuccessfully.Load() + s.TxsSentUnsuccessfully.Load()
43+
txsSendPerSecond := 0.0
44+
if elapsedSeconds > 0.0001 {
45+
txsSendPerSecond = float64(txSent) / elapsedSeconds
46+
}
47+
successRatio := float64(0)
48+
if txSent > 0 {
49+
successRatio = float64(s.TxsSentSuccessfully.Load()) / float64(txSent) * 100
50+
}
51+
52+
summaryString := fmt.Sprintf(`-----------------------------------
53+
Summary
54+
-----------------------------------
55+
Concurrency: %d
56+
JobQueueSize: %d
57+
RateLimit: %d
58+
-----------------------------------
59+
Input Data Source: %s
60+
Input Data Count: %d
61+
Valid Inputs: %d
62+
Invalid Inputs: %d
63+
-----------------------------------
64+
Elapsed Time: %s
65+
Txs Sent: %d
66+
Txs Sent Per Second: %.2f
67+
Txs Sent Successfully: %d
68+
Txs Sent Unsuccessfully: %d
69+
Success Ratio: %.2f%%
70+
-----------------------------------`,
71+
*publishInputArgs.concurrency,
72+
*publishInputArgs.jobQueueSize,
73+
*publishInputArgs.rateLimit,
74+
75+
s.InputDataSource,
76+
s.InputDataCount.Load(),
77+
s.ValidInputs.Load(),
78+
s.InvalidInputs.Load(),
79+
80+
elapsed,
81+
txSent,
82+
txsSendPerSecond,
83+
s.TxsSentSuccessfully.Load(),
84+
s.TxsSentUnsuccessfully.Load(),
85+
successRatio)
86+
87+
fmt.Println(summaryString)
88+
}

0 commit comments

Comments
 (0)