@@ -34,7 +34,7 @@ import Control.Concurrent.Async
3434import Control.Concurrent.MVar
3535import Control.DeepSeq
3636import Control.Error.Util (hush )
37- import Control.Exception (evaluate , mask_ , throw )
37+ import Control.Exception (evaluate , mask_ , throw , bracket_ )
3838import Control.Monad
3939
4040import qualified Data.ByteString.Short as SB
@@ -47,7 +47,7 @@ import Data.Foldable (foldl', foldlM)
4747import Data.Function (on )
4848import Data.HashMap.Strict (HashMap )
4949import qualified Data.HashMap.Strict as HashMap
50- import Data.IORef ( modifyIORef' , newIORef , readIORef , writeIORef )
50+ import Data.IORef
5151import Data.Maybe
5252import Data.Ord
5353import qualified Data.Set as S
@@ -84,6 +84,8 @@ import Numeric.AffineSpace
8484import Data.ByteString (ByteString )
8585import Data.Either (partitionEithers )
8686import Control.Lens
87+ import Data.HashSet (HashSet )
88+ import qualified Data.HashSet as HashSet
8789
8890------------------------------------------------------------------------------
8991compareOnGasPrice :: TransactionConfig t -> t -> t -> Ordering
@@ -100,7 +102,8 @@ makeInMemPool :: InMemConfig t
100102makeInMemPool cfg = mask_ $ do
101103 nonce <- randomIO
102104 dataLock <- newInMemMempoolData >>= newMVar
103- return $! InMemoryMempool cfg dataLock nonce
105+ pendingInsertionRef <- newIORef mempty
106+ return $! InMemoryMempool cfg dataLock pendingInsertionRef nonce
104107
105108
106109------------------------------------------------------------------------------
@@ -126,7 +129,7 @@ toMempoolBackend logger mempool = do
126129 , mempoolMember = memberInMem lockMVar
127130 , mempoolLookup = lookupInMem tcfg lockMVar
128131 , mempoolLookupEncoded = lookupEncodedInMem lockMVar
129- , mempoolInsert = insertInMem logger cfg lockMVar
132+ , mempoolInsert = insertInMem logger cfg (_inmemInsertionPending mempool) lockMVar
130133 , mempoolInsertCheck = insertCheckInMem cfg lockMVar
131134 , mempoolInsertCheckVerbose = insertCheckVerboseInMem cfg lockMVar
132135 , mempoolMarkValidated = markValidatedInMem logger tcfg lockMVar
@@ -500,7 +503,7 @@ insertCheckInMem'
500503 . NFData t
501504 => InMemConfig t -- ^ in-memory config
502505 -> MVar (InMemoryMempoolData t ) -- ^ in-memory state
503- -> Vector t -- ^ new transactions
506+ -> Vector ( T2 TransactionHash t ) -- ^ new transactions
504507 -> IO (Vector (T2 TransactionHash t ))
505508insertCheckInMem' cfg lock txs
506509 | V. null txs = pure V. empty
@@ -510,42 +513,54 @@ insertCheckInMem' cfg lock txs
510513 curTxIdx <- withMVarMasked lock $ readIORef . _inmemCurrentTxs
511514
512515 let withHashes :: Vector (T2 TransactionHash t )
513- withHashes = flip V. mapMaybe txs $ \ tx ->
514- let ! h = hasher tx
515- in (T2 h) <$> hush (validateOne cfg badmap curTxIdx now tx h)
516+ withHashes = flip V. mapMaybe txs $ \ (T2 h tx) ->
517+ T2 h <$> hush (validateOne cfg badmap curTxIdx now tx h)
516518
517519 V. mapMaybe hush <$!> _inmemPreInsertBatchChecks cfg withHashes
518- where
519- txcfg = _inmemTxCfg cfg
520- hasher = txHasher txcfg
521520
522521insertInMem
523522 :: forall t logger . (NFData t , Logger logger )
524523 => logger
525524 -> InMemConfig t -- ^ in-memory config
525+ -> IORef (HashSet TransactionHash )
526526 -> MVar (InMemoryMempoolData t ) -- ^ in-memory state
527527 -> InsertType
528528 -> Vector t -- ^ new transactions
529529 -> IO ()
530- insertInMem logger cfg lock runCheck txs0 = do
531- logFunctionText logger Debug $ " insertInMem: " <> sshow (runCheck, V. length txs0)
532- txhashes <- insertCheck
533- withMVarMasked lock $ \ mdata -> do
534- pending <- readIORef (_inmemPending mdata)
535- logFunctionText logger Debug $ " insertInMem: pending txs: " <> sshow (HashMap. keys pending)
536- let cnt = HashMap. size pending
537- let txs = V. take (max 0 (maxNumPending - cnt)) txhashes
538- let T2 ! pending' ! newHashesDL = V. foldl' insOne (T2 pending id ) txs
539- logFunctionText logger Debug $ " insertInMem: updated pending txs: " <> sshow (HashMap. keys pending')
540- let ! newHashes = V. fromList $ newHashesDL []
541- writeIORef (_inmemPending mdata) $! force pending'
542- modifyIORef' (_inmemRecentLog mdata) $
543- recordRecentTransactions maxRecent newHashes
530+ insertInMem logger cfg pendingInsertionRef lock runCheck txs0 = do
531+ pendingInsertionsDirty <- readIORef pendingInsertionRef
532+ let
533+ txs' = flip V. mapMaybe txs0 $ \ tx ->
534+ let hash = hasher tx
535+ in if not (HashSet. member hash pendingInsertionsDirty)
536+ then Just (T2 hash tx)
537+ else Nothing
538+ bracket_
539+ (atomicModifyIORef' pendingInsertionRef $ \ pendingInsertions ->
540+ (foldl' (flip HashSet. insert) pendingInsertions (sfst <$> txs'), () ))
541+ (do
542+ logFunctionText logger Debug $ " insertInMem: " <> sshow (runCheck, V. length txs')
543+ txhashes <- insertCheck txs'
544+ withMVarMasked lock $ \ mdata -> do
545+ pending <- readIORef (_inmemPending mdata)
546+ logFunctionText logger Debug $ " insertInMem: pending txs: " <> sshow (HashMap. keys pending)
547+ let cnt = HashMap. size pending
548+ let txs = V. take (max 0 (maxNumPending - cnt)) txhashes
549+ let T2 ! pending' ! newHashesDL = V. foldl' insOne (T2 pending id ) txs
550+ logFunctionText logger Debug $ " insertInMem: updated pending txs: " <> sshow (HashMap. keys pending')
551+ let ! newHashes = V. fromList $ newHashesDL []
552+ writeIORef (_inmemPending mdata) $! force pending'
553+ modifyIORef' (_inmemRecentLog mdata) $
554+ recordRecentTransactions maxRecent newHashes
555+ )
556+ (atomicModifyIORef' pendingInsertionRef $ \ pendingInsertions ->
557+ (foldl' (flip HashSet. delete) pendingInsertions (sfst <$> txs'), () ))
558+
544559 where
545- insertCheck :: IO (Vector (T2 TransactionHash t ))
546- insertCheck = case runCheck of
547- CheckedInsert -> insertCheckInMem' cfg lock txs0
548- UncheckedInsert -> return $! V. map ( \ tx -> T2 (hasher tx) tx) txs0
560+ insertCheck :: Vector ( T2 TransactionHash t ) -> IO (Vector (T2 TransactionHash t ))
561+ insertCheck txs' = case runCheck of
562+ CheckedInsert -> insertCheckInMem' cfg lock txs'
563+ UncheckedInsert -> return txs'
549564
550565 txcfg = _inmemTxCfg cfg
551566 encodeTx = codecEncode (txCodec txcfg)
0 commit comments