Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
62deb8b
Reject transactions that have already been submitted to the tx pool
m-Peter Nov 24, 2025
c6b18b1
Locking should only protected operations on pooledTxs field
m-Peter Nov 28, 2025
856da05
Remove locking around account key fetching
m-Peter Nov 28, 2025
33d6624
Track submitted tx nonces instead of tx hashes
m-Peter Nov 28, 2025
3833d6e
Update trimming logic for tracked tx nonces
m-Peter Nov 29, 2025
70b0bba
Increase default maxTrackedTxNoncesPerEOA to 30
m-Peter Nov 29, 2025
85a2560
Silently skip already submitted transactions instead of returning an …
m-Peter Nov 29, 2025
7ac8f98
Remove dashes from log fields to comply with Grafana
m-Peter Nov 29, 2025
cb159ec
Remove redundant error logging from resolveBlockTag function
m-Peter Nov 29, 2025
80ce2ad
Improve logging & tracking of dropped transactions
m-Peter Nov 30, 2025
0b94bc1
Add lock-protection on writes to eoaActivityMetadata
m-Peter Nov 30, 2025
e103d57
Add retry mechanism on pooled transactions
m-Peter Nov 30, 2025
cc3bad3
Extract EOA activity metadata update to its own method
m-Peter Nov 30, 2025
41c7839
Improve locking on batch transaction submission to better handler con…
m-Peter Dec 4, 2025
6ccf8db
Improve tests for tx submission with nonce validation
m-Peter Dec 4, 2025
3582554
Use context.WithTimeout in submitSingleTransaction method
m-Peter Dec 4, 2025
0b5dac6
Increase the context deadline for tx submission to 4 seconds
m-Peter Dec 8, 2025
3e46cc8
Add comment to describe the workaround of context.WithTimeout for sub…
m-Peter Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions models/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ var (

// Transaction errors

ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)
ErrDuplicateTransaction = fmt.Errorf("%w: %s", ErrInvalid, "transaction already in pool")
ErrFailedTransaction = errors.New("failed transaction")
ErrInvalidTransaction = fmt.Errorf("%w: %w", ErrInvalid, ErrFailedTransaction)

// Storage errors

Expand Down
103 changes: 71 additions & 32 deletions services/requester/batch_tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package requester
import (
"context"
"encoding/hex"
"fmt"
"slices"
"sort"
"sync"
Expand All @@ -22,35 +23,43 @@ import (
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
)

const eoaActivityCacheSize = 10_000
const (
eoaActivityCacheSize = 10_000
eoaActivityCacheTTL = time.Second * 10
maxTrackedTxHashesPerEOA = 15
)

type pooledEvmTx struct {
txPayload cadence.String
txHash gethCommon.Hash
nonce uint64
}

// BatchTxPool is a `TxPool` implementation that collects and groups
// transactions based on their EOA signer, and submits them for execution
// using a batch.
type eoaActivityMetadata struct {
lastSubmission time.Time
txHashes []gethCommon.Hash
}

// BatchTxPool is a `TxPool` implementation that groups incoming transactions
// based on their EOA signer, and submits them for execution using a batch.
//
// The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the
// `EVM.run` used in `SingleTxPool`.
//
// The main advantage of this implementation over the `SingleTxPool`, is the
// guarantee that transactions originated from the same EOA address, which
// arrive in a short time interval (about the same as Flow's block production rate),
// will be executed in the same order their arrived.
// This helps to reduce the nonce mismatch errors which mainly occur from the
// re-ordering of Cadence transactions that happens from Collection nodes.
// guarantee that transactions originating from the same EOA address, which
// arrive in a short time interval (configurable by the node operator),
// will be executed in the same order they arrived.
// This helps to reduce the execution errors which may occur from the
// re-ordering of Cadence transactions that happens on Collection nodes.
type BatchTxPool struct {
*SingleTxPool
pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivity *expirable.LRU[gethCommon.Address, time.Time]

pooledTxs map[gethCommon.Address][]pooledEvmTx
txMux sync.Mutex
eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata]
}

var _ TxPool = &BatchTxPool{}
var _ TxPool = (*BatchTxPool)(nil)

func NewBatchTxPool(
ctx context.Context,
Expand All @@ -77,16 +86,16 @@ func NewBatchTxPool(
return nil, err
}

eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time](
eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata](
eoaActivityCacheSize,
nil,
config.EOAActivityCacheTTL,
eoaActivityCacheTTL,
)
batchPool := &BatchTxPool{
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivity: eoaActivity,
SingleTxPool: singleTxPool,
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
txMux: sync.Mutex{},
eoaActivityCache: eoaActivityCache,
}

go batchPool.processPooledTransactions(ctx)
Expand Down Expand Up @@ -123,6 +132,21 @@ func (t *BatchTxPool) Add(
return err
}

eoaActivity, found := t.eoaActivityCache.Get(from)
txHash := tx.Hash()

// Reject transactions that have already been submitted,
// as they are *likely* to fail. Two transactions with
// identical hashes, are expected to have the exact same
// payload.
if found && slices.Contains(eoaActivity.txHashes, txHash) {
return fmt.Errorf(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we return an error here, or just silently drop the tx? What happens on a geth node?

IIRC, bots will commonly resubmit tx at a specific nonce to increase the gas price in an attempt to get it to be executed earlier in the block, or decrease the gas price to effectively cancel the tx. I'm not totally sure, but I suspect these bots would expect submitting the tx to succeed (even if it was never executed)

Copy link
Collaborator Author

@m-Peter m-Peter Nov 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, matching the functionality from Geth node, a silent drop seems more reasonable here. If we return an error, that might make bots try harder with more requests 😅

Updated in 85a2560 . Also added an info log line, so that we can track all such cases.

"%w: a tx with hash %s has already been submitted",
errs.ErrInvalid,
txHash,
)
}

// Scenarios
// 1. EOA activity not found:
// => We send the transaction individually, without adding it
Expand All @@ -140,27 +164,42 @@ func (t *BatchTxPool) Add(
// For all 3 cases, we record the activity time for the next
// transactions that might come from the same EOA.
// [X] is equal to the configured `TxBatchInterval` duration.
lastActivityTime, found := t.eoaActivity.Get(from)

if !found {
// Case 1. EOA activity not found:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(lastActivityTime) > t.config.TxBatchInterval {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
} else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval {
if len(t.pooledTxs[from]) > 0 {
// If the EOA has pooled transactions, which are not yet processed,
// due to congestion or anything, make sure to include the current
// tx on that batch.
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
} else {
// Case 2. EOA activity found AND it was more than [X] seconds ago:
err = t.submitSingleTransaction(ctx, hexEncodedTx)
}
} else {
// Case 3. EOA activity found AND it was less than [X] seconds ago:
userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()}
// Prevent submission of duplicate transactions, based on their tx hash
if slices.Contains(t.pooledTxs[from], userTx) {
return errs.ErrDuplicateTransaction
}
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()}
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
}

t.eoaActivity.Add(from, time.Now())
if err != nil {
return err
}

// Update metadata for the last EOA activity only on successful add/submit.
eoaActivity.lastSubmission = time.Now()
eoaActivity.txHashes = append(eoaActivity.txHashes, txHash)
// To avoid the slice of hashes from growing indefinitely,
// maintain only a handful of the last tx hashes.
if len(eoaActivity.txHashes) > maxTrackedTxHashesPerEOA {
eoaActivity.txHashes = eoaActivity.txHashes[1:]
}

t.eoaActivityCache.Add(from, eoaActivity)

return err
return nil
}

func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
Expand Down
61 changes: 32 additions & 29 deletions tests/tx_batching_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func Test_MultipleTransactionSubmissionsWithinRecentInterval(t *testing.T) {
// activity of the EOA was X seconds ago, where:
// X = `cfg.TxBatchInterval`.
// For the E2E tests the `cfg.TxBatchInterval` is equal
// to 2 seconds.
// to 2.5 seconds.
for i := range uint64(2) {
signed, _, err := evmSign(
big.NewInt(500_000),
Expand Down Expand Up @@ -446,7 +446,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
// activity of the EOA was X seconds ago, where:
// X = `cfg.TxBatchInterval`.
// For the E2E tests the `cfg.TxBatchInterval` is equal
// to 2 seconds.
// to 2.5 seconds.
for i := range uint64(2) {
signed, _, err := evmSign(
big.NewInt(500_000),
Expand Down Expand Up @@ -514,7 +514,7 @@ func Test_MultipleTransactionSubmissionsWithinNonRecentInterval(t *testing.T) {
)
}

func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
func Test_TransactionSubmissionWithPreviouslySubmittedTransactions(t *testing.T) {
_, cfg, stop := setupGatewayNode(t)
defer stop()

Expand All @@ -525,36 +525,36 @@ func Test_MultipleTransactionSubmissionsWithDuplicates(t *testing.T) {
eoaKey, err := crypto.HexToECDSA(eoaTestPrivateKey)
require.NoError(t, err)

testAddr := common.HexToAddress("55253ed90B70b96C73092D8680915aaF50081194")
nonce := uint64(0)
hashes := make([]common.Hash, 0)

signed, _, err := evmSign(big.NewInt(10), 21000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

txHash, err := rpcTester.sendRawTx(signed)
require.NoError(t, err)
hashes = append(hashes, txHash)
testAddr := common.HexToAddress("0x061B63D29332e4de81bD9F51A48609824CD113a8")
nonces := []uint64{0, 1, 2, 3, 2, 3, 4, 5}

// Increment nonce for the duplicate test transactions that follow
nonce += 1
dupSigned, _, err := evmSign(big.NewInt(10), 15_000_000, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)
var errors []error
hashes := []common.Hash{}
// transfer some funds to the test address
for _, nonce := range nonces {
signed, _, err := evmSign(big.NewInt(1_000_000_000), 23_500, eoaKey, nonce, &testAddr, nil)
require.NoError(t, err)

// Submit 5 identical transactions to test duplicate detection:
// the first should succeed, the rest should be rejected as duplicates.
for i := range 5 {
if i == 0 {
txHash, err := rpcTester.sendRawTx(dupSigned)
require.NoError(t, err)
hashes = append(hashes, txHash)
txHash, err := rpcTester.sendRawTx(signed)
if err != nil {
errors = append(errors, err)
} else {
_, err := rpcTester.sendRawTx(dupSigned)
require.Error(t, err)
require.ErrorContains(t, err, "invalid: transaction already in pool")
hashes = append(hashes, txHash)
}
}

require.Len(t, errors, 2)
assert.ErrorContains(
t,
errors[0],
"a tx with hash 0x2bdf4aa4c3e273a624dddfdbde6614786b6a5329e246c531d3e0e9f92e79e04d has already been submitted",
)
assert.ErrorContains(
t,
errors[1],
"a tx with hash 0xb72e1f83861a63b5ad4b927295af07fa9546b01aac5dfce046a5fb20f9be9f2f has already been submitted",
)

assert.Eventually(t, func() bool {
for _, h := range hashes {
rcp, err := rpcTester.getReceipt(h.String())
Expand Down Expand Up @@ -604,9 +604,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
EnforceGasPrice: true,
LogLevel: zerolog.DebugLevel,
LogWriter: testLogWriter(),
TxStateValidation: config.TxSealValidation,
TxStateValidation: config.LocalIndexValidation,
TxBatchMode: true,
TxBatchInterval: time.Second * 2,
TxBatchInterval: time.Millisecond * 2500, // 2.5 seconds, the same as mainnet
}

bootstrapDone := make(chan struct{})
Expand All @@ -617,6 +617,9 @@ func setupGatewayNode(t *testing.T) (emulator.Emulator, config.Config, func()) {
require.NoError(t, err)
}()

// Allow the Gateway to catch up on indexing
time.Sleep(time.Second * 2)

<-bootstrapDone

return emu, cfg, func() {
Expand Down