-
Notifications
You must be signed in to change notification settings - Fork 12
Reject transactions that have already been submitted to the tx pool #924
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 12 commits
62deb8b
c6b18b1
856da05
33d6624
3833d6e
70b0bba
85a2560
7ac8f98
cb159ec
80ce2ad
0b94bc1
e103d57
cc3bad3
41c7839
6ccf8db
3582554
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,39 +18,45 @@ import ( | |
| "github.com/onflow/flow-evm-gateway/config" | ||
| "github.com/onflow/flow-evm-gateway/metrics" | ||
| "github.com/onflow/flow-evm-gateway/models" | ||
| errs "github.com/onflow/flow-evm-gateway/models/errors" | ||
| "github.com/onflow/flow-evm-gateway/services/requester/keystore" | ||
| ) | ||
|
|
||
| const eoaActivityCacheSize = 10_000 | ||
| const ( | ||
| eoaActivityCacheSize = 10_000 | ||
| maxTrackedTxNoncesPerEOA = 30 | ||
| ) | ||
|
|
||
| type pooledEvmTx struct { | ||
| txPayload cadence.String | ||
| txHash gethCommon.Hash | ||
| nonce uint64 | ||
| } | ||
|
|
||
| // BatchTxPool is a `TxPool` implementation that collects and groups | ||
| // transactions based on their EOA signer, and submits them for execution | ||
| // using a batch. | ||
| type eoaActivityMetadata struct { | ||
| lastSubmission time.Time | ||
| txNonces []uint64 | ||
| } | ||
|
|
||
| // BatchTxPool is a `TxPool` implementation that groups incoming transactions | ||
| // based on their EOA signer, and submits them for execution using a batch. | ||
| // | ||
| // The underlying Cadence EVM API used, is `EVM.batchRun`, instead of the | ||
| // `EVM.run` used in `SingleTxPool`. | ||
| // | ||
| // The main advantage of this implementation over the `SingleTxPool`, is the | ||
| // guarantee that transactions originated from the same EOA address, which | ||
| // arrive in a short time interval (about the same as Flow's block production rate), | ||
| // will be executed in the same order their arrived. | ||
| // This helps to reduce the nonce mismatch errors which mainly occur from the | ||
| // re-ordering of Cadence transactions that happens from Collection nodes. | ||
| // guarantee that transactions originating from the same EOA address, which | ||
| // arrive in a short time interval (configurable by the node operator), | ||
| // will be executed in the same order they arrived. | ||
| // This helps to reduce the execution errors which may occur from the | ||
| // re-ordering of Cadence transactions that happens on Collection nodes. | ||
| type BatchTxPool struct { | ||
| *SingleTxPool | ||
| pooledTxs map[gethCommon.Address][]pooledEvmTx | ||
| txMux sync.Mutex | ||
| eoaActivity *expirable.LRU[gethCommon.Address, time.Time] | ||
|
|
||
| pooledTxs map[gethCommon.Address][]pooledEvmTx | ||
| txMux sync.Mutex | ||
| eoaActivityCache *expirable.LRU[gethCommon.Address, eoaActivityMetadata] | ||
| } | ||
|
|
||
| var _ TxPool = &BatchTxPool{} | ||
| var _ TxPool = (*BatchTxPool)(nil) | ||
|
|
||
| func NewBatchTxPool( | ||
| ctx context.Context, | ||
|
|
@@ -77,16 +83,16 @@ func NewBatchTxPool( | |
| return nil, err | ||
| } | ||
|
|
||
| eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time]( | ||
| eoaActivityCache := expirable.NewLRU[gethCommon.Address, eoaActivityMetadata]( | ||
| eoaActivityCacheSize, | ||
| nil, | ||
| config.EOAActivityCacheTTL, | ||
| ) | ||
| batchPool := &BatchTxPool{ | ||
| SingleTxPool: singleTxPool, | ||
| pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), | ||
| txMux: sync.Mutex{}, | ||
| eoaActivity: eoaActivity, | ||
| SingleTxPool: singleTxPool, | ||
| pooledTxs: make(map[gethCommon.Address][]pooledEvmTx), | ||
| txMux: sync.Mutex{}, | ||
| eoaActivityCache: eoaActivityCache, | ||
| } | ||
|
|
||
| go batchPool.processPooledTransactions(ctx) | ||
|
|
@@ -104,11 +110,6 @@ func (t *BatchTxPool) Add( | |
| ) error { | ||
| t.txPublisher.Publish(tx) // publish pending transaction event | ||
|
|
||
| // tx adding should be blocking, so we don't have races when | ||
| // pooled transactions are being processed in the background. | ||
| t.txMux.Lock() | ||
| defer t.txMux.Unlock() | ||
|
|
||
| from, err := models.DeriveTxSender(tx) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -123,6 +124,20 @@ func (t *BatchTxPool) Add( | |
| return err | ||
| } | ||
|
|
||
| eoaActivity, found := t.eoaActivityCache.Get(from) | ||
| nonce := tx.Nonce() | ||
|
|
||
| // Skip transactions that have been already submitted, | ||
| // as they are *likely* to fail. | ||
| if found && slices.Contains(eoaActivity.txNonces, nonce) { | ||
| t.logger.Info(). | ||
| Str("evm_tx", tx.Hash().Hex()). | ||
| Str("from", from.Hex()). | ||
| Uint64("nonce", nonce). | ||
| Msg("tx with same nonce has been already submitted") | ||
| return nil | ||
| } | ||
|
|
||
| // Scenarios | ||
| // 1. EOA activity not found: | ||
| // => We send the transaction individually, without adding it | ||
|
|
@@ -140,27 +155,59 @@ func (t *BatchTxPool) Add( | |
| // For all 3 cases, we record the activity time for the next | ||
| // transactions that might come from the same EOA. | ||
| // [X] is equal to the configured `TxBatchInterval` duration. | ||
| lastActivityTime, found := t.eoaActivity.Get(from) | ||
|
|
||
| if !found { | ||
| // Case 1. EOA activity not found: | ||
| err = t.submitSingleTransaction(ctx, hexEncodedTx) | ||
| } else if time.Since(lastActivityTime) > t.config.TxBatchInterval { | ||
| // Case 2. EOA activity found AND it was more than [X] seconds ago: | ||
| err = t.submitSingleTransaction(ctx, hexEncodedTx) | ||
| } else if time.Since(eoaActivity.lastSubmission) > t.config.TxBatchInterval { | ||
| // If the EOA has pooled transactions, which are not yet processed, | ||
| // due to congestion or anything, make sure to include the current | ||
| // tx on that batch. | ||
| t.txMux.Lock() | ||
| hasBatch := len(t.pooledTxs[from]) > 0 | ||
| if hasBatch { | ||
| userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} | ||
| t.pooledTxs[from] = append(t.pooledTxs[from], userTx) | ||
| } | ||
| t.txMux.Unlock() | ||
|
|
||
| // If it wasn't batched, submit individually | ||
| if !hasBatch { | ||
| // Case 2. EOA activity found AND it was more than [X] seconds ago: | ||
| err = t.submitSingleTransaction(ctx, hexEncodedTx) | ||
| } | ||
| } else { | ||
| // Case 3. EOA activity found AND it was less than [X] seconds ago: | ||
| userTx := pooledEvmTx{txPayload: hexEncodedTx, txHash: tx.Hash(), nonce: tx.Nonce()} | ||
| // Prevent submission of duplicate transactions, based on their tx hash | ||
| if slices.Contains(t.pooledTxs[from], userTx) { | ||
| return errs.ErrDuplicateTransaction | ||
| } | ||
| t.txMux.Lock() | ||
| userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: nonce} | ||
| t.pooledTxs[from] = append(t.pooledTxs[from], userTx) | ||
| t.txMux.Unlock() | ||
| } | ||
|
|
||
| t.eoaActivity.Add(from, time.Now()) | ||
| if err != nil { | ||
| t.logger.Error().Err(err).Msgf( | ||
| "failed to submit single Flow transaction for EOA: %s", | ||
| from.Hex(), | ||
| ) | ||
| return err | ||
| } | ||
|
|
||
| t.txMux.Lock() | ||
| defer t.txMux.Unlock() | ||
|
|
||
| return err | ||
| // Update metadata for the last EOA activity only on successful add/submit. | ||
| eoaActivity, _ = t.eoaActivityCache.Get(from) | ||
| eoaActivity.lastSubmission = time.Now() | ||
| eoaActivity.txNonces = append(eoaActivity.txNonces, nonce) | ||
|
||
| // To avoid the slice of nonces from growing indefinitely, | ||
| // keep only the last `maxTrackedTxNoncesPerEOA` nonces. | ||
| if len(eoaActivity.txNonces) > maxTrackedTxNoncesPerEOA { | ||
| firstKeep := len(eoaActivity.txNonces) - maxTrackedTxNoncesPerEOA | ||
| eoaActivity.txNonces = eoaActivity.txNonces[firstKeep:] | ||
| } | ||
|
|
||
| t.eoaActivityCache.Add(from, eoaActivity) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { | ||
|
|
@@ -188,10 +235,14 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) { | |
| ) | ||
| if err != nil { | ||
| t.logger.Error().Err(err).Msgf( | ||
| "failed to submit Flow transaction from BatchTxPool for EOA: %s", | ||
| "failed to submit batch Flow transaction for EOA: %s", | ||
| address.Hex(), | ||
| ) | ||
| continue | ||
| // In case of any error, add the transactions back to the pool, | ||
| // as a retry mechanism. | ||
| t.txMux.Lock() | ||
| t.pooledTxs[address] = append(t.pooledTxs[address], pooledTxs...) | ||
| t.txMux.Unlock() | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -235,6 +286,9 @@ func (t *BatchTxPool) batchSubmitTransactionsForSameAddress( | |
| } | ||
|
|
||
| if err := t.client.SendTransaction(ctx, *flowTx); err != nil { | ||
| // If there was any error while sending the transaction, | ||
| // we record all transactions as dropped. | ||
| t.collector.TransactionsDropped(len(hexEncodedTxs)) | ||
| return err | ||
| } | ||
|
|
||
|
|
@@ -266,6 +320,9 @@ func (t *BatchTxPool) submitSingleTransaction( | |
| } | ||
|
|
||
| if err := t.client.SendTransaction(ctx, *flowTx); err != nil { | ||
| // If there was any error while sending the transaction, | ||
| // we record it as a dropped transaction. | ||
| t.collector.TransactionsDropped(1) | ||
| return err | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.