Skip to content
2 changes: 1 addition & 1 deletion pkg/coordinator/utils/sentry/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *Conn) Write(proto Proto, code uint64, msg any) error {
return err
}

var errDisc error = fmt.Errorf("disconnect")
var errDisc = fmt.Errorf("disconnect")

// ReadEth reads an Eth sub-protocol wire message.
func (c *Conn) ReadEth() (any, error) {
Expand Down
74 changes: 74 additions & 0 deletions pkg/coordinator/utils/tx_load_tool/load_target.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package txloadtool

import (
"context"
crand "crypto/rand"
"fmt"
"math/big"

"github.com/erigontech/assertoor/pkg/coordinator/clients/execution"
"github.com/erigontech/assertoor/pkg/coordinator/helper"
"github.com/erigontech/assertoor/pkg/coordinator/types"
"github.com/erigontech/assertoor/pkg/coordinator/wallet"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/sirupsen/logrus"
)

// LoadTarget represents the target for the load test
type LoadTarget struct {
ctx context.Context
taskCtx *types.TaskContext
wallet *wallet.Wallet
logger logrus.FieldLogger
node *execution.Client
}

// NewLoadTarget creates a new LoadTarget instance
func NewLoadTarget(ctx context.Context, taskCtx *types.TaskContext, logger logrus.FieldLogger,
w *wallet.Wallet, client *execution.Client) *LoadTarget {
return &LoadTarget{
ctx: ctx,
taskCtx: taskCtx,
wallet: w,
logger: logger,
node: client,
}
}

// GenerateTransaction creates a new transaction for the load test
func (t *LoadTarget) GenerateTransaction(i int) (*ethtypes.Transaction, error) {
tx, err := t.wallet.BuildTransaction(t.ctx, func(_ context.Context, nonce uint64, _ bind.SignerFn) (*ethtypes.Transaction, error) {
addr := t.wallet.GetAddress()
toAddr := &addr

txAmount, _ := crand.Int(crand.Reader, big.NewInt(0).SetUint64(10*1e18))

feeCap := &helper.BigInt{Value: *big.NewInt(100000000000)} // 100 Gwei
tipCap := &helper.BigInt{Value: *big.NewInt(1000000000)} // 1 Gwei

txObj := &ethtypes.DynamicFeeTx{
ChainID: t.taskCtx.Scheduler.GetServices().ClientPool().GetExecutionPool().GetBlockCache().GetChainID(),
Nonce: nonce,
GasTipCap: &tipCap.Value,
GasFeeCap: &feeCap.Value,
Gas: 50000,
To: toAddr,
Value: txAmount,
Data: []byte(fmt.Sprintf("txIndex:%d", i)),
}

return ethtypes.NewTx(txObj), nil
})

if err != nil {
return nil, err
}

return tx, nil
}

// SendTransaction sends a transaction to the execution node
func (t *LoadTarget) SendTransaction(tx *ethtypes.Transaction) error {
return t.node.GetRPCClient().SendTransaction(t.ctx, tx)
}
111 changes: 111 additions & 0 deletions pkg/coordinator/utils/tx_load_tool/peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package txloadtool

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/eth/protocols/eth"

"math/big"

"github.com/erigontech/assertoor/pkg/coordinator/clients/execution"
"github.com/erigontech/assertoor/pkg/coordinator/types"
"github.com/erigontech/assertoor/pkg/coordinator/utils/sentry"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/params"
"github.com/sirupsen/logrus"
)

// Peer connects to an execution client (a bockchain node) on the p2p network (i.e., the peer of the node)
type Peer struct {
ctx context.Context
taskCtx *types.TaskContext
logger logrus.FieldLogger
node *execution.Client
conn *sentry.Conn
}

// NewPeer creates a new peer
func NewPeer(ctx context.Context, taskCtx *types.TaskContext, logger logrus.FieldLogger, client *execution.Client) *Peer {
return &Peer{
ctx: ctx,
taskCtx: taskCtx,
logger: logger,
node: client,
conn: nil,
}
}

// Close closes the connection to the execution node
func (p *Peer) Close() error {
if p.conn != nil {
err := p.conn.Close()
p.conn = nil

return err
}

return nil
}

// Connect establishes a connection to the execution node and performs the handshake
func (p *Peer) Connect() error {
chainConfig := params.AllDevChainProtocolChanges

head, err := p.node.GetRPCClient().GetLatestBlock(p.ctx)
if err != nil {
p.taskCtx.SetResult(types.TaskResultFailure)
return err
}

chainID, err := p.node.GetRPCClient().GetEthClient().ChainID(p.ctx)
if err != nil {
return err
}

chainConfig.ChainID = chainID

genesis, err := p.node.GetRPCClient().GetEthClient().BlockByNumber(p.ctx, new(big.Int).SetUint64(0))
if err != nil {
p.logger.Errorf("Failed to fetch genesis block: %v", err)
p.taskCtx.SetResult(types.TaskResultFailure)

return err
}

conn, err := sentry.GetTCPConn(p.node)
if err != nil {
p.logger.Errorf("Failed to get TCP connection: %v", err)
p.taskCtx.SetResult(types.TaskResultFailure)

return err
}

p.conn = conn
forkID := forkid.NewID(chainConfig, genesis, head.NumberU64(), head.Time())

// handshake
err = p.conn.Peer(chainConfig.ChainID, genesis.Hash(), head.Hash(), forkID, nil)
if err != nil {
return err
}

p.logger.Infof("Connected to %s", p.node.GetName())

return nil
}

func (p *Peer) ReadTransactionMessages(timeout time.Duration) (*eth.TransactionsPacket, error) {
// Check if the connection is nil
if p.conn == nil {
p.logger.Errorf("Peer has no active connection, cannot read transaction messages")
p.taskCtx.SetResult(types.TaskResultFailure)

return nil, fmt.Errorf("peer has no active connection, cannot read transaction messages")
}

txs, err := p.conn.ReadTransactionMessages(timeout)

return txs, err
}
Loading