8888)
8989
9090var (
91- evictionInterval = time .Minute // Time interval to check for evictable transactions
92- statsReportInterval = 8 * time .Second // Time interval to report transaction pool stats
91+ evictionInterval = time .Minute // Time interval to check for evictable transactions
92+ statsReportInterval = 8 * time .Second // Time interval to report transaction pool stats
93+ privateTxCleanupInterval = 1 * time .Hour
9394)
9495
9596var (
@@ -164,7 +165,8 @@ type TxPoolConfig struct {
164165 AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
165166 GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
166167
167- Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
168+ Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
169+ PrivateTxLifetime time.Duration // Maximum amount of time to keep private transactions private
168170
169171 TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config.
170172}
@@ -183,7 +185,8 @@ var DefaultTxPoolConfig = TxPoolConfig{
183185 AccountQueue : 64 ,
184186 GlobalQueue : 1024 ,
185187
186- Lifetime : 3 * time .Hour ,
188+ Lifetime : 3 * time .Hour ,
189+ PrivateTxLifetime : 3 * 24 * time .Hour ,
187190}
188191
189192// sanitize checks the provided user configurations and changes anything that's
@@ -222,6 +225,10 @@ func (config *TxPoolConfig) sanitize() TxPoolConfig {
222225 log .Warn ("Sanitizing invalid txpool lifetime" , "provided" , conf .Lifetime , "updated" , DefaultTxPoolConfig .Lifetime )
223226 conf .Lifetime = DefaultTxPoolConfig .Lifetime
224227 }
228+ if conf .PrivateTxLifetime < 1 {
229+ log .Warn ("Sanitizing invalid txpool private tx lifetime" , "provided" , conf .PrivateTxLifetime , "updated" , DefaultTxPoolConfig .PrivateTxLifetime )
230+ conf .PrivateTxLifetime = DefaultTxPoolConfig .PrivateTxLifetime
231+ }
225232 return conf
226233}
227234
@@ -261,6 +268,7 @@ type TxPool struct {
261268 NewMegabundleHooks []func (common.Address , * types.MevBundle )
262269 all * txLookup // All transactions to allow lookups
263270 priced * txPricedList // All transactions sorted by price
271+ privateTxs * timestampedTxHashSet
264272
265273 chainHeadCh chan ChainHeadEvent
266274 chainHeadSub event.Subscription
@@ -296,6 +304,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
296304 beats : make (map [common.Address ]time.Time ),
297305 megabundles : make (map [common.Address ]types.MevBundle ),
298306 all : newTxLookup (),
307+ privateTxs : newExpiringTxHashSet (config .PrivateTxLifetime ),
299308 chainHeadCh : make (chan ChainHeadEvent , chainHeadChanSize ),
300309 reqResetCh : make (chan * txpoolResetRequest ),
301310 reqPromoteCh : make (chan * accountSet ),
@@ -346,9 +355,10 @@ func (pool *TxPool) loop() {
346355 var (
347356 prevPending , prevQueued , prevStales int
348357 // Start the stats reporting and transaction eviction tickers
349- report = time .NewTicker (statsReportInterval )
350- evict = time .NewTicker (evictionInterval )
351- journal = time .NewTicker (pool .config .Rejournal )
358+ report = time .NewTicker (statsReportInterval )
359+ evict = time .NewTicker (evictionInterval )
360+ journal = time .NewTicker (pool .config .Rejournal )
361+ privateTx = time .NewTicker (privateTxCleanupInterval )
352362 // Track the previous head headers for transaction reorgs
353363 head = pool .chain .CurrentBlock ()
354364 )
@@ -412,6 +422,10 @@ func (pool *TxPool) loop() {
412422 }
413423 pool .mu .Unlock ()
414424 }
425+
426+ // Remove stale hashes that must be kept private
427+ case <- privateTx .C :
428+ pool .privateTxs .prune ()
415429 }
416430 }
417431}
@@ -532,6 +546,11 @@ func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types.
532546 return pending , queued
533547}
534548
549+ // IsPrivateTxHash indicates whether the transaction should be shared with peers
550+ func (pool * TxPool ) IsPrivateTxHash (hash common.Hash ) bool {
551+ return pool .privateTxs .Contains (hash )
552+ }
553+
535554// Pending retrieves all currently processable transactions, grouped by origin
536555// account and sorted by nonce. The returned transaction set is a copy and can be
537556// freely modified by calling code.
@@ -958,7 +977,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
958977// This method is used to add transactions from the RPC API and performs synchronous pool
959978// reorganization and event propagation.
960979func (pool * TxPool ) AddLocals (txs []* types.Transaction ) []error {
961- return pool .addTxs (txs , ! pool .config .NoLocals , true )
980+ return pool .addTxs (txs , ! pool .config .NoLocals , true , false )
962981}
963982
964983// AddLocal enqueues a single local transaction into the pool if it is valid. This is
@@ -974,12 +993,18 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
974993// This method is used to add transactions from the p2p network and does not wait for pool
975994// reorganization and internal event propagation.
976995func (pool * TxPool ) AddRemotes (txs []* types.Transaction ) []error {
977- return pool .addTxs (txs , false , false )
996+ return pool .addTxs (txs , false , false , false )
997+ }
998+
999+ // AddPrivateRemote adds transactions to the pool, but does not broadcast these transactions to any peers.
1000+ func (pool * TxPool ) AddPrivateRemote (tx * types.Transaction ) error {
1001+ errs := pool .addTxs ([]* types.Transaction {tx }, false , false , true )
1002+ return errs [0 ]
9781003}
9791004
9801005// This is like AddRemotes, but waits for pool reorganization. Tests use this method.
9811006func (pool * TxPool ) AddRemotesSync (txs []* types.Transaction ) []error {
982- return pool .addTxs (txs , false , true )
1007+ return pool .addTxs (txs , false , true , false )
9831008}
9841009
9851010// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
@@ -998,7 +1023,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
9981023}
9991024
10001025// addTxs attempts to queue a batch of transactions if they are valid.
1001- func (pool * TxPool ) addTxs (txs []* types.Transaction , local , sync bool ) []error {
1026+ func (pool * TxPool ) addTxs (txs []* types.Transaction , local , sync , private bool ) []error {
10021027 // Filter out known ones without obtaining the pool lock or recovering signatures
10031028 var (
10041029 errs = make ([]error , len (txs ))
@@ -1027,6 +1052,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
10271052 return errs
10281053 }
10291054
1055+ // Track private transactions, so they don't get leaked to the public mempool
1056+ if private {
1057+ for _ , tx := range news {
1058+ pool .privateTxs .Add (tx .Hash ())
1059+ }
1060+ }
1061+
10301062 // Process all the new transaction and merge any errors into the original slice
10311063 pool .mu .Lock ()
10321064 newErrs , dirtyAddrs := pool .addTxsLocked (news , local )
@@ -1321,7 +1353,11 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
13211353 if len (events ) > 0 {
13221354 var txs []* types.Transaction
13231355 for _ , set := range events {
1324- txs = append (txs , set .Flatten ()... )
1356+ for _ , tx := range set .Flatten () {
1357+ if ! pool .IsPrivateTxHash (tx .Hash ()) {
1358+ txs = append (txs , tx )
1359+ }
1360+ }
13251361 }
13261362 pool .txFeed .Send (NewTxsEvent {txs })
13271363 }
@@ -1931,6 +1967,59 @@ func (t *txLookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
19311967 return found
19321968}
19331969
1970+ type timestampedTxHashSet struct {
1971+ lock sync.RWMutex
1972+ hashes []common.Hash
1973+ timestamps map [common.Hash ]time.Time
1974+ ttl time.Duration
1975+ }
1976+
1977+ func newExpiringTxHashSet (ttl time.Duration ) * timestampedTxHashSet {
1978+ s := & timestampedTxHashSet {
1979+ hashes : make ([]common.Hash , 0 ),
1980+ timestamps : make (map [common.Hash ]time.Time ),
1981+ ttl : ttl ,
1982+ }
1983+
1984+ return s
1985+ }
1986+
1987+ func (s * timestampedTxHashSet ) Add (hash common.Hash ) {
1988+ s .lock .Lock ()
1989+ defer s .lock .Unlock ()
1990+
1991+ s .hashes = append (s .hashes , hash )
1992+ s .timestamps [hash ] = time .Now ().Add (s .ttl )
1993+ }
1994+
1995+ func (s * timestampedTxHashSet ) Contains (hash common.Hash ) bool {
1996+ s .lock .RLock ()
1997+ defer s .lock .RUnlock ()
1998+ _ , ok := s .timestamps [hash ]
1999+ return ok
2000+ }
2001+
2002+ func (s * timestampedTxHashSet ) prune () {
2003+ s .lock .Lock ()
2004+ defer s .lock .Unlock ()
2005+
2006+ var (
2007+ count int
2008+ now = time .Now ()
2009+ )
2010+ for _ , hash := range s .hashes {
2011+ ts := s .timestamps [hash ]
2012+ if ts .After (now ) {
2013+ break
2014+ }
2015+
2016+ delete (s .timestamps , hash )
2017+ count += 1
2018+ }
2019+
2020+ s .hashes = s .hashes [count :]
2021+ }
2022+
19342023// numSlots calculates the number of slots needed for a single transaction.
19352024func numSlots (tx * types.Transaction ) int {
19362025 return int ((tx .Size () + txSlotSize - 1 ) / txSlotSize )
0 commit comments