Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build/devenv/cciptestinterfaces/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math/big"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
Expand Down Expand Up @@ -153,7 +152,8 @@ type Chain interface {
// SendMessage sends a CCIP message to the specified destination chain with the specified message options.
SendMessage(ctx context.Context, dest uint64, fields MessageFields, opts MessageOptions) (MessageSentEvent, error)
// SendMessageWithNonce sends a CCIP message to the specified destination chain with the specified message options and nonce.
SendMessageWithNonce(ctx context.Context, dest uint64, fields MessageFields, opts MessageOptions, sender *bind.TransactOpts, nonce *atomic.Uint64, disableTokenAmountCheck bool) (MessageSentEvent, error)
// A nil nonce instructs the client to use the pending nonce from the RPC node.
SendMessageWithNonce(ctx context.Context, dest uint64, fields MessageFields, opts MessageOptions, sender *bind.TransactOpts, nonce *uint64, disableTokenAmountCheck bool) (MessageSentEvent, error)
// GetUserNonce returns the nonce for the given user address on this chain.
GetUserNonce(ctx context.Context, userAddress protocol.UnknownAddress) (uint64, error)
// GetExpectedNextSequenceNumber gets an expected sequence number for message to the specified destination chain.
Expand Down
10 changes: 3 additions & 7 deletions build/devenv/evm/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func (m *CCIP17EVM) SendMessage(ctx context.Context, dest uint64, fields cciptes
return m.SendMessageWithNonce(ctx, dest, fields, opts, nil, nil, false)
}

func (m *CCIP17EVM) SendMessageWithNonce(ctx context.Context, dest uint64, fields cciptestinterfaces.MessageFields, opts cciptestinterfaces.MessageOptions, sender *bind.TransactOpts, nonce *atomic.Uint64, disableTokenAmountCheck bool) (cciptestinterfaces.MessageSentEvent, error) {
func (m *CCIP17EVM) SendMessageWithNonce(ctx context.Context, dest uint64, fields cciptestinterfaces.MessageFields, opts cciptestinterfaces.MessageOptions, sender *bind.TransactOpts, nonce *uint64, disableTokenAmountCheck bool) (cciptestinterfaces.MessageSentEvent, error) {
l := m.logger
srcChain := m.chain
if sender == nil {
Expand Down Expand Up @@ -640,24 +640,20 @@ func (m *CCIP17EVM) SendMessageWithNonce(ctx context.Context, dest uint64, field
return cciptestinterfaces.MessageSentEvent{}, err
}

var loadNonce *big.Int = nil
var loadNonce *big.Int
if nonce != nil {
loadNonce = big.NewInt(int64(nonce.Load()))
loadNonce = new(big.Int).SetUint64(*nonce)
}
senderKeyCopy := &bind.TransactOpts{
From: sender.From,
Signer: sender.Signer,
Nonce: loadNonce,
Value: msgValue,
}
fmt.Printf("sender: %s, srcChain: %d, nonce: %s\n", senderKeyCopy.From.String(), srcChain.Selector, loadNonce.String())
tx, err := rout.CcipSend(senderKeyCopy, dest, msg)
if err != nil {
return cciptestinterfaces.MessageSentEvent{}, fmt.Errorf("failed to send CCIP message: %w, extraArgs: %x", err, extraArgs)
}
if nonce != nil {
nonce.Add(1)
}
txHash := tx.Hash()

_, err = srcChain.Confirm(tx)
Expand Down
25 changes: 23 additions & 2 deletions build/devenv/staging-load.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,30 @@ message_profiles=[
{name="PTT", hasData=true, hasToken=true},
{name="token only", hasToken=true},
]

[[test_profiles]]
enabled=true
chains_as_source=[
{selector="16281711391670634445", ratio=1}, # polygon amoy
]
chains_as_dest=[
{selector="14767482510784806043", ratio=1}, # avalanche fuji
{selector="16015286601757825753", ratio=1}, # ethereum sepolia
{selector="10344971235874465080", ratio=1}, # arbitrum sepolia
{selector="3478487238524512106", ratio=1}, # base sepolia
]
messages=[
{ratio=100, message_profile="data only"},
# {ratio=35, message_profile="PTT"},
# {ratio=53, message_profile="token only"}
]
test_duration = "2m"
# Should be "integer/duration" like "1/1s" or "10/5m"
message_rate = "4/1s"
load_duration = "30m"


[[test_profiles]]
enabled=false
chains_as_source=[
{selector="16015286601757825753", ratio=1}, # ethereum sepolia
{selector="3478487238524512106", ratio=1}, # arbitrum sepolia
Expand All @@ -28,7 +49,7 @@ message_profiles=[


[[test_profiles]]
enabled=true
enabled=false
chains_as_source=[
{selector="16281711391670634445", ratio=1}, # polygon amoy
{selector="14767482510784806043", ratio=1}, # avalanche-fuji
Expand Down
29 changes: 18 additions & 11 deletions build/devenv/tests/e2e/gun.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ type EVMTXGun struct {
seqNosMu sync.Mutex
sentMsgCh chan SentMessage // Channel for real-time message notifications
closeOnce sync.Once // Ensure channel is closed only once
nonceMu sync.Mutex
nonce map[NonceKey]*atomic.Uint64
nonce sync.Map // map[NonceKey]*uint64
messageProfiles []load.MessageProfileConfig
userSelector map[uint64]func() *bind.TransactOpts
}
Expand All @@ -88,7 +87,6 @@ func NewEVMTransactionGun(cfg *ccv.Cfg, e *deployment.Environment, selectors []u
impl: impls,
sentMsgSet: make(map[SentMessage]struct{}),
sentMsgCh: make(chan SentMessage, sentMessageChannelBufferSize),
nonce: make(map[NonceKey]*atomic.Uint64),
srcSelectors: srcSelectors,
destSelectors: destSelectors,
userSelector: userSelector,
Expand Down Expand Up @@ -123,7 +121,6 @@ func NewEVMTransactionGunFromTestConfig(cfg *ccv.Cfg, testProfile *load.TestProf
impl: impls,
sentMsgSet: make(map[SentMessage]struct{}),
sentMsgCh: make(chan SentMessage, sentMessageChannelBufferSize),
nonce: make(map[NonceKey]*atomic.Uint64),
srcSelectors: srcSelectors,
destSelectors: destSelectors,
messageProfiles: messageProfiles,
Expand All @@ -132,10 +129,7 @@ func NewEVMTransactionGunFromTestConfig(cfg *ccv.Cfg, testProfile *load.TestProf
}

func (m *EVMTXGun) initNonce(key NonceKey, userAddress common.Address) error {
m.nonceMu.Lock()
defer m.nonceMu.Unlock()

if m.nonce[key] != nil {
if _, loaded := m.nonce.Load(key); loaded {
return nil
}

Expand All @@ -144,8 +138,12 @@ func (m *EVMTXGun) initNonce(key NonceKey, userAddress common.Address) error {
return fmt.Errorf("failed to get pending nonce for selector %d: %w", key.Selector, err)
}

m.nonce[key] = &atomic.Uint64{}
m.nonce[key].Store(n)
// Allocate a pointer so the stored value can be incremented atomically across
// goroutines without replacing the map entry. LoadOrStore ensures exactly one
// pointer wins even if multiple goroutines race through initialization.
ptr := new(uint64)
*ptr = n
m.nonce.LoadOrStore(key, ptr)
return nil
}

Expand Down Expand Up @@ -174,6 +172,15 @@ func (m *EVMTXGun) Call(_ *wasp.Generator) *wasp.Response {
return &wasp.Response{Error: err.Error(), Failed: true}
}

nonceVal, ok := m.nonce.Load(nonceKey)
if !ok {
return &wasp.Response{Error: fmt.Sprintf("nonce not initialized for key %+v", nonceKey), Failed: true}
}
noncePtr := nonceVal.(*uint64)
// Atomically claim the next nonce. AddUint64 returns the new value, so
// subtracting 1 gives us the nonce we own exclusively for this send.
currentNonce := atomic.AddUint64(noncePtr, 1) - 1

b := ccv.NewDefaultCLDFBundle(m.e)
m.e.OperationsBundle = b

Expand All @@ -182,7 +189,7 @@ func (m *EVMTXGun) Call(_ *wasp.Generator) *wasp.Response {
return &wasp.Response{Error: "impl is not CCIP17EVM", Failed: true}
}

sentEvent, err := c.SendMessageWithNonce(ctx, destSelector, fields, opts, sender, m.nonce[nonceKey], true)
sentEvent, err := c.SendMessageWithNonce(ctx, destSelector, fields, opts, sender, &currentNonce, true)
if err != nil {
return &wasp.Response{Error: fmt.Errorf("failed to send message: %w", err).Error(), Failed: true}
}
Expand Down
6 changes: 6 additions & 0 deletions build/devenv/tests/e2e/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,9 @@ func TestStaging(t *testing.T) {

var wg sync.WaitGroup
for _, testProfile := range testConfig.TestProfiles {
if !testProfile.Enabled {
continue
}
for _, chainInfo := range testProfile.ChainsAsSource {
wg.Add(1)
go func(chainInfo load.ChainProfileConfig) {
Expand All @@ -771,6 +774,9 @@ func TestStaging(t *testing.T) {
}
wg.Wait()

// Wait for old txns and nonces to settled before we start the load test
time.Sleep(30 * time.Second)

for idx, testProfile := range testConfig.TestProfiles {
if !testProfile.Enabled {
continue
Expand Down
Loading