@@ -3,6 +3,7 @@ package requester
33import (
44 "context"
55 "encoding/hex"
6+ "fmt"
67 "slices"
78 "sort"
89 "sync"
@@ -22,14 +23,21 @@ import (
2223 "github.com/onflow/flow-evm-gateway/services/requester/keystore"
2324)
2425
25- const eoaActivityCacheSize = 10_000
26+ const (
27+ eoaActivityCacheSize = 10_000
28+ eoaActivityCacheTTL = time .Second * 10
29+ )
2630
2731type pooledEvmTx struct {
2832 txPayload cadence.String
29- txHash gethCommon.Hash
3033 nonce uint64
3134}
3235
36+ type eoaActivityMetadata struct {
37+ submittedAt time.Time
38+ txHashes []gethCommon.Hash
39+ }
40+
3341// BatchTxPool is a `TxPool` implementation that collects and groups
3442// transactions based on their EOA signer, and submits them for execution
3543// using a batch.
@@ -39,18 +47,19 @@ type pooledEvmTx struct {
3947//
4048// The main advantage of this implementation over the `SingleTxPool`, is the
4149// guarantee that transactions originated from the same EOA address, which
42- // arrive in a short time interval (about the same as Flow's block production rate ),
43- // will be executed in the same order their arrived.
44- // This helps to reduce the nonce mismatch errors which mainly occur from the
50+ // arrive in a short time interval (configurable by the node operator ),
51+ // will be executed in the same order they arrived.
52+ // This helps to reduce the execution errors which may occur from the
4553// re-ordering of Cadence transactions that happens from Collection nodes.
4654type BatchTxPool struct {
4755 * SingleTxPool
56+
4857 pooledTxs map [gethCommon.Address ][]pooledEvmTx
4958 txMux sync.Mutex
50- eoaActivity * expirable.LRU [gethCommon.Address , time. Time ]
59+ eoaActivity * expirable.LRU [gethCommon.Address , eoaActivityMetadata ]
5160}
5261
53- var _ TxPool = & BatchTxPool {}
62+ var _ TxPool = ( * BatchTxPool )( nil )
5463
5564func NewBatchTxPool (
5665 ctx context.Context ,
@@ -77,10 +86,10 @@ func NewBatchTxPool(
7786 return nil , err
7887 }
7988
80- eoaActivity := expirable .NewLRU [gethCommon.Address , time. Time ](
89+ eoaActivity := expirable .NewLRU [gethCommon.Address , eoaActivityMetadata ](
8190 eoaActivityCacheSize ,
8291 nil ,
83- config . EOAActivityCacheTTL ,
92+ eoaActivityCacheTTL ,
8493 )
8594 batchPool := & BatchTxPool {
8695 SingleTxPool : singleTxPool ,
@@ -123,6 +132,21 @@ func (t *BatchTxPool) Add(
123132 return err
124133 }
125134
135+ lastActivity , found := t .eoaActivity .Get (from )
136+ txHash := tx .Hash ()
137+
138+ // Reject transactions that have already been submitted,
139+ // as they are *likely* to fail. Two transactions with
140+ // identical hashes, are expected to have the exact same
141+ // payload.
142+ if found && slices .Contains (lastActivity .txHashes , txHash ) {
143+ return fmt .Errorf (
144+ "%w: a tx with hash %s has already been submitted" ,
145+ errs .ErrInvalid ,
146+ txHash ,
147+ )
148+ }
149+
126150 // Scenarios
127151 // 1. EOA activity not found:
128152 // => We send the transaction individually, without adding it
@@ -140,25 +164,30 @@ func (t *BatchTxPool) Add(
140164 // For all 3 cases, we record the activity time for the next
141165 // transactions that might come from the same EOA.
142166 // [X] is equal to the configured `TxBatchInterval` duration.
143- lastActivityTime , found := t .eoaActivity .Get (from )
144-
145167 if ! found {
146168 // Case 1. EOA activity not found:
147169 err = t .submitSingleTransaction (ctx , hexEncodedTx )
148- } else if time .Since (lastActivityTime ) > t .config .TxBatchInterval {
170+ } else if time .Since (lastActivity . submittedAt ) > t .config .TxBatchInterval {
149171 // Case 2. EOA activity found AND it was more than [X] seconds ago:
150172 err = t .submitSingleTransaction (ctx , hexEncodedTx )
151173 } else {
152174 // Case 3. EOA activity found AND it was less than [X] seconds ago:
153- userTx := pooledEvmTx {txPayload : hexEncodedTx , txHash : tx .Hash (), nonce : tx .Nonce ()}
154- // Prevent submission of duplicate transactions, based on their tx hash
155- if slices .Contains (t .pooledTxs [from ], userTx ) {
156- return errs .ErrDuplicateTransaction
157- }
175+ userTx := pooledEvmTx {txPayload : hexEncodedTx , nonce : tx .Nonce ()}
158176 t .pooledTxs [from ] = append (t .pooledTxs [from ], userTx )
159177 }
160178
161- t .eoaActivity .Add (from , time .Now ())
179+ // Update metadata for the last EOA activity only on successful add/submit.
180+ if err == nil {
181+ lastActivity .submittedAt = time .Now ()
182+ lastActivity .txHashes = append (lastActivity .txHashes , txHash )
183+ // To avoid the slice of hashes from growing indefinitely,
184+ // maintain only a handful of the last tx hashes.
185+ if len (lastActivity .txHashes ) > 15 {
186+ lastActivity .txHashes = lastActivity .txHashes [1 :]
187+ }
188+
189+ t .eoaActivity .Add (from , lastActivity )
190+ }
162191
163192 return err
164193}
0 commit comments