@@ -55,6 +55,7 @@ import Data.Monoid (Sum (..))
5555import Data.Monoid.Synchronisation (FirstToFinish (.. ))
5656import Data.OrdPSQ (OrdPSQ )
5757import Data.OrdPSQ qualified as OrdPSQ
58+ import Data.Proxy (Proxy (.. ))
5859import Data.Set (Set )
5960import Data.Set qualified as Set
6061import Data.Typeable (Typeable )
@@ -654,64 +655,66 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
654655 (MultiNodeScript script _) =
655656 withJobPool $ \ jobpool -> do
656657 stdGenVar <- newTVarIO stdGen0
657- cc <- startServerConnectionHandler stdGenVar MainServer dataFlow0 [accInit] serverAddr jobpool
658- loop stdGenVar (Map. singleton serverAddr [accInit]) (Map. singleton serverAddr cc) script jobpool
658+ connStateIdSupply <- atomically $ CM. newConnStateIdSupply (Proxy @ m )
659+ cc <- startServerConnectionHandler stdGenVar connStateIdSupply MainServer dataFlow0 [accInit] serverAddr jobpool
660+ loop stdGenVar connStateIdSupply (Map. singleton serverAddr [accInit]) (Map. singleton serverAddr cc) script jobpool
659661 where
660662
661663 loop :: StrictTVar m StdGen
664+ -> CM. ConnStateIdSupply m
662665 -> Map. Map peerAddr acc
663666 -> Map. Map peerAddr (StrictTQueue m (ConnectionHandlerMessage peerAddr req ))
664667 -> [ConnectionEvent req peerAddr ]
665668 -> JobPool () m ()
666669 -> m ()
667- loop _ _ _ [] _ = threadDelay 3600
668- loop stdGenVar nodeAccs servers (event : events) jobpool =
670+ loop _ _ _ _ [] _ = threadDelay 3600
671+ loop stdGenVar connStateIdSupply nodeAccs servers (event : events) jobpool =
669672 case event of
670673
671674 StartClient delay localAddr -> do
672675 threadDelay delay
673- cc <- startClientConnectionHandler stdGenVar (Client localAddr) localAddr jobpool
674- loop stdGenVar nodeAccs (Map. insert localAddr cc servers) events jobpool
676+ cc <- startClientConnectionHandler stdGenVar connStateIdSupply (Client localAddr) localAddr jobpool
677+ loop stdGenVar connStateIdSupply nodeAccs (Map. insert localAddr cc servers) events jobpool
675678
676679 StartServer delay localAddr nodeAcc -> do
677680 threadDelay delay
678- cc <- startServerConnectionHandler stdGenVar (Node localAddr) Duplex [nodeAcc] localAddr jobpool
679- loop stdGenVar (Map. insert localAddr [nodeAcc] nodeAccs) (Map. insert localAddr cc servers) events jobpool
681+ cc <- startServerConnectionHandler stdGenVar connStateIdSupply (Node localAddr) Duplex [nodeAcc] localAddr jobpool
682+ loop stdGenVar connStateIdSupply (Map. insert localAddr [nodeAcc] nodeAccs) (Map. insert localAddr cc servers) events jobpool
680683
681684 InboundConnection delay nodeAddr -> do
682685 threadDelay delay
683686 sendMsg nodeAddr $ NewConnection serverAddr
684- loop stdGenVar nodeAccs servers events jobpool
687+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
685688
686689 OutboundConnection delay nodeAddr -> do
687690 threadDelay delay
688691 sendMsg serverAddr $ NewConnection nodeAddr
689- loop stdGenVar nodeAccs servers events jobpool
692+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
690693
691694 CloseInboundConnection delay remoteAddr -> do
692695 threadDelay delay
693696 sendMsg remoteAddr $ Disconnect serverAddr
694- loop stdGenVar nodeAccs servers events jobpool
697+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
695698
696699 CloseOutboundConnection delay remoteAddr -> do
697700 threadDelay delay
698701 sendMsg serverAddr $ Disconnect remoteAddr
699- loop stdGenVar nodeAccs servers events jobpool
702+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
700703
701704 InboundMiniprotocols delay nodeAddr reqs -> do
702705 threadDelay delay
703706 sendMsg nodeAddr $ RunMiniProtocols serverAddr reqs
704- loop stdGenVar nodeAccs servers events jobpool
707+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
705708
706709 OutboundMiniprotocols delay nodeAddr reqs -> do
707710 threadDelay delay
708711 sendMsg serverAddr $ RunMiniProtocols nodeAddr reqs
709- loop stdGenVar nodeAccs servers events jobpool
712+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
710713
711714 ShutdownClientServer delay nodeAddr -> do
712715 threadDelay delay
713716 sendMsg nodeAddr Shutdown
714- loop stdGenVar nodeAccs servers events jobpool
717+ loop stdGenVar connStateIdSupply nodeAccs servers events jobpool
715718 where
716719 sendMsg :: peerAddr -> ConnectionHandlerMessage peerAddr req -> m ()
717720 sendMsg addr msg = atomically $
@@ -731,11 +734,12 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
731734 Just qs -> readTQueue (projectBundle tok qs)
732735
733736 startClientConnectionHandler :: StrictTVar m StdGen
737+ -> CM. ConnStateIdSupply m
734738 -> Name peerAddr
735739 -> peerAddr
736740 -> JobPool () m ()
737741 -> m (StrictTQueue m (ConnectionHandlerMessage peerAddr req ))
738- startClientConnectionHandler stdGenVar name localAddr jobpool = do
742+ startClientConnectionHandler stdGenVar connStateIdSupply name localAddr jobpool = do
739743 cc <- atomically newTQueue
740744 labelTQueueIO cc $ " cc/" ++ show name
741745 connVar <- newTVarIO Map. empty
@@ -746,7 +750,8 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
746750 $ Job
747751 ( withInitiatorOnlyConnectionManager
748752 name simTimeouts nullTracer nullTracer stdGen
749- snocket makeBearer (Just localAddr) (mkNextRequests connVar)
753+ snocket makeBearer connStateIdSupply
754+ (Just localAddr) (mkNextRequests connVar)
750755 timeLimitsHandshake acceptedConnLimit
751756 ( \ connectionManager ->
752757 connectionLoop SingInitiatorMode localAddr cc connectionManager Map. empty connVar
@@ -758,13 +763,14 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
758763 return cc
759764
760765 startServerConnectionHandler :: StrictTVar m StdGen
766+ -> CM. ConnStateIdSupply m
761767 -> Name peerAddr
762768 -> DataFlow
763769 -> acc
764770 -> peerAddr
765771 -> JobPool () m ()
766772 -> m (StrictTQueue m (ConnectionHandlerMessage peerAddr req ))
767- startServerConnectionHandler stdGenVar name dataFlow serverAcc localAddr jobpool = do
773+ startServerConnectionHandler stdGenVar connStateIdSupply name dataFlow serverAcc localAddr jobpool = do
768774 fd <- Snocket. open snocket addrFamily
769775 Snocket. bind snocket fd localAddr
770776 Snocket. listen snocket fd
@@ -782,7 +788,8 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
782788 inboundTrTracer trTracer cmTracer
783789 inboundTracer debugTracer
784790 stdGen
785- snocket makeBearer (\ _ -> pure () ) fd (Just localAddr) serverAcc
791+ snocket makeBearer connStateIdSupply
792+ (\ _ -> pure () ) fd (Just localAddr) serverAcc
786793 (mkNextRequests connVar)
787794 timeLimitsHandshake
788795 acceptedConnLimit
@@ -799,7 +806,8 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
799806 (show name)
800807 Unidirectional ->
801808 Job ( withInitiatorOnlyConnectionManager
802- name simTimeouts trTracer cmTracer stdGen snocket makeBearer (Just localAddr)
809+ name simTimeouts trTracer cmTracer stdGen snocket makeBearer
810+ connStateIdSupply (Just localAddr)
803811 (mkNextRequests connVar)
804812 timeLimitsHandshake
805813 acceptedConnLimit
@@ -2182,13 +2190,15 @@ prop_server_accept_error (Fixed rnd) (AbsIOError ioerr) =
21822190 Snocket. bind snock socket0 addr
21832191 Snocket. listen snock socket0
21842192 nextRequests <- oneshotNextRequests pdata
2193+ connStateIdSupply <- atomically $ CM. newConnStateIdSupply Proxy
21852194 withBidirectionalConnectionManager " node-0" simTimeouts
21862195 nullTracer nullTracer
21872196 nullTracer nullTracer
21882197 nullTracer
21892198 (mkStdGen rnd)
21902199 snock
21912200 makeFDBearer
2201+ connStateIdSupply
21922202 (\ _ -> pure () )
21932203 socket0 (Just addr)
21942204 [accumulatorInit pdata]
0 commit comments