Skip to content

Commit ea7b50d

Browse files
authored
Merge pull request #852 from onflow/mpeter/batch-tx-pool-improvements
Improve performance of `BatchTxPool` for single-tx EOAs
2 parents d284776 + 78f20d0 commit ea7b50d

File tree

3 files changed

+16
-45
lines changed

3 files changed

+16
-45
lines changed

cmd/run/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,5 @@ func init() {
294294
Cmd.Flags().DurationVar(&cfg.TxRequestLimitDuration, "tx-request-limit-duration", time.Second*3, "Time interval upon which to enforce transaction submission rate limiting.")
295295
Cmd.Flags().BoolVar(&cfg.TxBatchMode, "tx-batch-mode", false, "Enable batch transaction submission, to avoid nonce mismatch issues for high-volume EOAs.")
296296
Cmd.Flags().DurationVar(&cfg.TxBatchInterval, "tx-batch-interval", time.Millisecond*1200, "Time interval upon which to submit the transaction batches to the Flow network.")
297+
Cmd.Flags().DurationVar(&cfg.EOAActivityCacheTTL, "eoa-activity-cache-ttl", time.Second*10, "Time interval used to track EOA activity. Tx send more frequently than this interval will be batched. Useful only when batch transaction submission is enabled.")
297298
}

config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,8 @@ type Config struct {
107107
// TxBatchInterval is the time interval upon which to submit the transaction batches to the
108108
// Flow network.
109109
TxBatchInterval time.Duration
110+
// EOAActivityCacheTTL is the time interval used to track EOA activity. Tx send more
111+
// frequently than this interval will be batched.
112+
// Useful only when batch transaction submission is enabled.
113+
EOAActivityCacheTTL time.Duration
110114
}

services/requester/batch_tx_pool.go

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package requester
33
import (
44
"context"
55
"encoding/hex"
6-
"fmt"
76
"sort"
87
"sync"
98
"time"
@@ -21,16 +20,7 @@ import (
2120
"github.com/onflow/flow-evm-gateway/services/requester/keystore"
2221
)
2322

24-
const (
25-
eoaActivityCacheSize = 10_000
26-
// Buffer capacity of the channel used for individual transaction
27-
// submission. Since `BatchTxPool.Add()` is called quite frequently,
28-
// we don't want sending to the channel to block, until there's a
29-
// receive ready. That's why we set a high buffer capacity, to allow
30-
// `BatchTxPool.Add()` to do its job quickly and release any resources
31-
// held, like locks etc.
32-
txChanBufferSize = 1_000
33-
)
23+
const eoaActivityCacheSize = 10_000
3424

3525
type pooledEvmTx struct {
3626
txPayload cadence.String
@@ -53,13 +43,8 @@ type pooledEvmTx struct {
5343
type BatchTxPool struct {
5444
*SingleTxPool
5545
pooledTxs map[gethCommon.Address][]pooledEvmTx
56-
txChan chan cadence.String
5746
txMux sync.Mutex
5847
eoaActivity *expirable.LRU[gethCommon.Address, time.Time]
59-
60-
// Signal channel used to prevent blocking writes
61-
// on `txChan` when the node is shutting down.
62-
done chan struct{}
6348
}
6449

6550
var _ TxPool = &BatchTxPool{}
@@ -88,19 +73,16 @@ func NewBatchTxPool(
8873
eoaActivity := expirable.NewLRU[gethCommon.Address, time.Time](
8974
eoaActivityCacheSize,
9075
nil,
91-
config.TxBatchInterval,
76+
config.EOAActivityCacheTTL,
9277
)
9378
batchPool := &BatchTxPool{
9479
SingleTxPool: singleTxPool,
9580
pooledTxs: make(map[gethCommon.Address][]pooledEvmTx),
96-
txChan: make(chan cadence.String, txChanBufferSize),
9781
txMux: sync.Mutex{},
9882
eoaActivity: eoaActivity,
99-
done: make(chan struct{}),
10083
}
10184

10285
go batchPool.processPooledTransactions(ctx)
103-
go batchPool.processIndividualTransactions(ctx)
10486

10587
return batchPool
10688
}
@@ -152,21 +134,22 @@ func (t *BatchTxPool) Add(
152134
// transactions that might come from the same EOA.
153135
// [X] is equal to the configured `TxBatchInterval` duration.
154136
lastActivityTime, found := t.eoaActivity.Get(from)
155-
if !found || time.Since(lastActivityTime) > t.config.TxBatchInterval {
156-
select {
157-
case <-t.done:
158-
return fmt.Errorf("the server is shutting down")
159-
default:
160-
t.txChan <- hexEncodedTx
161-
}
137+
138+
if !found {
139+
// Case 1. EOA activity not found:
140+
err = t.submitSingleTransaction(ctx, hexEncodedTx)
141+
} else if time.Since(lastActivityTime) > t.config.TxBatchInterval {
142+
// Case 2. EOA activity found AND it was more than [X] seconds ago:
143+
err = t.submitSingleTransaction(ctx, hexEncodedTx)
162144
} else {
145+
// Case 3. EOA activity found AND it was less than [X] seconds ago:
163146
userTx := pooledEvmTx{txPayload: hexEncodedTx, nonce: tx.Nonce()}
164147
t.pooledTxs[from] = append(t.pooledTxs[from], userTx)
165148
}
166149

167150
t.eoaActivity.Add(from, time.Now())
168151

169-
return nil
152+
return err
170153
}
171154

172155
func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
@@ -213,23 +196,6 @@ func (t *BatchTxPool) processPooledTransactions(ctx context.Context) {
213196
}
214197
}
215198

216-
func (t *BatchTxPool) processIndividualTransactions(ctx context.Context) {
217-
for {
218-
select {
219-
case <-ctx.Done():
220-
close(t.done)
221-
return
222-
case hexEncodedTx := <-t.txChan:
223-
if err := t.submitSingleTransaction(ctx, hexEncodedTx); err != nil {
224-
t.logger.Error().Err(err).Msg(
225-
"failed to submit Flow transaction from BatchTxPool for single EOA tx",
226-
)
227-
continue
228-
}
229-
}
230-
}
231-
}
232-
233199
func (t *BatchTxPool) batchSubmitTransactionsForSameAddress(
234200
ctx context.Context,
235201
latestBlock *flow.Block,

0 commit comments

Comments
 (0)