@@ -3,6 +3,7 @@ package requester
33import (
44 "context"
55 "encoding/hex"
6+ "fmt"
67 "slices"
78 "sort"
89 "sync"
@@ -22,14 +23,22 @@ import (
2223 "github.com/onflow/flow-evm-gateway/services/requester/keystore"
2324)
2425
25- const eoaActivityCacheSize = 10_000
26+ const (
27+ eoaActivityCacheSize = 10_000
28+ eoaActivityCacheTTL = time .Second * 10
29+ maxTrackedTxHashesPerEOA = 15
30+ )
2631
2732type pooledEvmTx struct {
2833 txPayload cadence.String
29- txHash gethCommon.Hash
3034 nonce uint64
3135}
3236
37+ type eoaActivityMetadata struct {
38+ submittedAt time.Time
39+ txHashes []gethCommon.Hash
40+ }
41+
3342// BatchTxPool is a `TxPool` implementation that collects and groups
3443// transactions based on their EOA signer, and submits them for execution
3544// using a batch.
@@ -39,18 +48,19 @@ type pooledEvmTx struct {
3948//
4049// The main advantage of this implementation over the `SingleTxPool`, is the
4150// guarantee that transactions originated from the same EOA address, which
42- // arrive in a short time interval (about the same as Flow's block production rate ),
43- // will be executed in the same order their arrived.
44- // This helps to reduce the nonce mismatch errors which mainly occur from the
51+ // arrive in a short time interval (configurable by the node operator ),
52+ // will be executed in the same order they arrived.
53+ // This helps to reduce the execution errors which may occur from the
4554// re-ordering of Cadence transactions that happens from Collection nodes.
4655type BatchTxPool struct {
4756 * SingleTxPool
57+
4858 pooledTxs map [gethCommon.Address ][]pooledEvmTx
4959 txMux sync.Mutex
50- eoaActivity * expirable.LRU [gethCommon.Address , time. Time ]
60+ eoaActivity * expirable.LRU [gethCommon.Address , eoaActivityMetadata ]
5161}
5262
53- var _ TxPool = & BatchTxPool {}
63+ var _ TxPool = ( * BatchTxPool )( nil )
5464
5565func NewBatchTxPool (
5666 ctx context.Context ,
@@ -77,10 +87,10 @@ func NewBatchTxPool(
7787 return nil , err
7888 }
7989
80- eoaActivity := expirable .NewLRU [gethCommon.Address , time. Time ](
90+ eoaActivity := expirable .NewLRU [gethCommon.Address , eoaActivityMetadata ](
8191 eoaActivityCacheSize ,
8292 nil ,
83- config . EOAActivityCacheTTL ,
93+ eoaActivityCacheTTL ,
8494 )
8595 batchPool := & BatchTxPool {
8696 SingleTxPool : singleTxPool ,
@@ -123,6 +133,21 @@ func (t *BatchTxPool) Add(
123133 return err
124134 }
125135
136+ lastActivity , found := t .eoaActivity .Get (from )
137+ txHash := tx .Hash ()
138+
139+ // Reject transactions that have already been submitted,
140+ // as they are *likely* to fail. Two transactions with
141+ // identical hashes, are expected to have the exact same
142+ // payload.
143+ if found && slices .Contains (lastActivity .txHashes , txHash ) {
144+ return fmt .Errorf (
145+ "%w: a tx with hash %s has already been submitted" ,
146+ errs .ErrInvalid ,
147+ txHash ,
148+ )
149+ }
150+
126151 // Scenarios
127152 // 1. EOA activity not found:
128153 // => We send the transaction individually, without adding it
@@ -140,25 +165,30 @@ func (t *BatchTxPool) Add(
140165 // For all 3 cases, we record the activity time for the next
141166 // transactions that might come from the same EOA.
142167 // [X] is equal to the configured `TxBatchInterval` duration.
143- lastActivityTime , found := t .eoaActivity .Get (from )
144-
145168 if ! found {
146169 // Case 1. EOA activity not found:
147170 err = t .submitSingleTransaction (ctx , hexEncodedTx )
148- } else if time .Since (lastActivityTime ) > t .config .TxBatchInterval {
171+ } else if time .Since (lastActivity . submittedAt ) > t .config .TxBatchInterval {
149172 // Case 2. EOA activity found AND it was more than [X] seconds ago:
150173 err = t .submitSingleTransaction (ctx , hexEncodedTx )
151174 } else {
152175 // Case 3. EOA activity found AND it was less than [X] seconds ago:
153- userTx := pooledEvmTx {txPayload : hexEncodedTx , txHash : tx .Hash (), nonce : tx .Nonce ()}
154- // Prevent submission of duplicate transactions, based on their tx hash
155- if slices .Contains (t .pooledTxs [from ], userTx ) {
156- return errs .ErrDuplicateTransaction
157- }
176+ userTx := pooledEvmTx {txPayload : hexEncodedTx , nonce : tx .Nonce ()}
158177 t .pooledTxs [from ] = append (t .pooledTxs [from ], userTx )
159178 }
160179
161- t .eoaActivity .Add (from , time .Now ())
180+ // Update metadata for the last EOA activity only on successful add/submit.
181+ if err == nil {
182+ lastActivity .submittedAt = time .Now ()
183+ lastActivity .txHashes = append (lastActivity .txHashes , txHash )
184+ // To avoid the slice of hashes from growing indefinitely,
185+ // maintain only a handful of the last tx hashes.
186+ if len (lastActivity .txHashes ) > maxTrackedTxHashesPerEOA {
187+ lastActivity .txHashes = lastActivity .txHashes [1 :]
188+ }
189+
190+ t .eoaActivity .Add (from , lastActivity )
191+ }
162192
163193 return err
164194}
0 commit comments