@@ -19,6 +19,7 @@ import Data.Aeson qualified as Aeson
1919import Data.Function (on )
2020import Data.Functor.Contravariant ((>$<) )
2121import Data.Hashable
22+ import Data.Sequence (Seq )
2223import Data.Sequence qualified as Seq
2324import Data.Time.Clock.POSIX (POSIXTime )
2425import Data.Time.Clock.POSIX qualified as Time
@@ -37,11 +38,11 @@ import Ouroboros.Network.PeerSharing (PeerSharingAPI, PeerSharingRegistry,
3738 newPeerSharingAPI , newPeerSharingRegistry ,
3839 ps_POLICY_PEER_SHARE_MAX_PEERS , ps_POLICY_PEER_SHARE_STICKY_TIME )
3940import Ouroboros.Network.TxSubmission.Inbound.V2
40- import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (.. ))
41+ import Ouroboros.Network.TxSubmission.Mempool.Simple (Mempool (.. ), MempoolSeq ( .. ) )
4142import Ouroboros.Network.TxSubmission.Mempool.Simple qualified as Mempool
4243
4344import DMQ.Configuration
44- import DMQ.Protocol.SigSubmission.Type (Sig (sigExpiresAt ), SigId )
45+ import DMQ.Protocol.SigSubmission.Type (Sig (sigId , sigExpiresAt ), SigId )
4546import DMQ.Tracer
4647
4748
@@ -54,7 +55,7 @@ data NodeKernel crypto ntnAddr m =
5455 -- the PeerSharing protocol
5556 , peerSharingRegistry :: ! (PeerSharingRegistry ntnAddr m )
5657 , peerSharingAPI :: ! (PeerSharingAPI ntnAddr StdGen m )
57- , mempool :: ! (Mempool m (Sig crypto ))
58+ , mempool :: ! (Mempool m SigId (Sig crypto ))
5859 , sigChannelVar :: ! (TxChannelsVar m ntnAddr SigId (Sig crypto ))
5960 , sigMempoolSem :: ! (TxMempoolSem m )
6061 , sigSharedTxStateVar :: ! (SharedTxStateVar m ntnAddr SigId (Sig crypto ))
@@ -146,22 +147,36 @@ mempoolWorker :: forall crypto m.
146147 , MonadSTM m
147148 , MonadTime m
148149 )
149- => Mempool m (Sig crypto )
150+ => Mempool m SigId (Sig crypto )
150151 -> m Void
151152mempoolWorker (Mempool v) = loop
152153 where
153154 loop = do
154155 now <- getCurrentPOSIXTime
155156 rt <- atomically $ do
156- (sigs :: Seq. Seq (Sig crypto )) <- readTVar v
157- let sigs' :: Seq. Seq (Sig crypto )
158- (resumeTime, sigs') =
159- foldr (\ a (rt, as) -> if sigExpiresAt a <= now
160- then (rt, as)
161- else (rt `min` sigExpiresAt a, a Seq. <| as))
162- (now, Seq. empty)
163- sigs
164- writeTVar v sigs'
157+ MempoolSeq { mempoolSeq, mempoolSet } <- readTVar v
158+ let mempoolSeq' :: Seq (Sig crypto )
159+ mempoolSet' , expiredSet' :: Set SigId
160+
161+ (resumeTime, expiredSet', mempoolSeq') =
162+ foldr (\ sig (rt, expiredSet, sigs) ->
163+ if sigExpiresAt sig <= now
164+ then ( rt
165+ , sigId sig `Set.insert` expiredSet
166+ , sigs
167+ )
168+ else ( rt `min` sigExpiresAt sig
169+ , expiredSet
170+ , sig Seq. <| sigs
171+ )
172+ )
173+ (now, Set. empty, Seq. empty)
174+ mempoolSeq
175+
176+ mempoolSet' = mempoolSet `Set.difference` expiredSet'
177+
178+ writeTVar v MempoolSeq { mempoolSet = mempoolSet',
179+ mempoolSeq = mempoolSeq' }
165180 return resumeTime
166181
167182 now' <- getCurrentPOSIXTime
0 commit comments