Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions io-sim/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
`TMVar`, `MVar` and a few others - except for `TChan`.
- A blocked `takeTVar` is now safe in the presence of exceptions. It will relay
the value to other waiting threads.
- Faster handling of timeouts and timers by using a more efficient
internal representation.

## 1.6.0.0

Expand Down
1 change: 1 addition & 0 deletions io-sim/io-sim.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ library
exceptions >=0.10,
containers,
deepseq,
hashable,
nothunks,
primitive >=0.7 && <0.11,
psqueues >=0.2 && <0.3,
Expand Down
3 changes: 3 additions & 0 deletions io-sim/src/Control/Monad/IOSim/CommonTypes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import Control.Monad.ST.Lazy

import NoThunks.Class

import Data.Hashable
import Data.List (intercalate, intersperse)
import Data.Map (Map)
import Data.Map qualified as Map
Expand Down Expand Up @@ -70,6 +71,8 @@ data IOSimThreadId =
deriving anyclass NFData
deriving anyclass NoThunks

instance Hashable IOSimThreadId

ppIOSimThreadId :: IOSimThreadId -> String
ppIOSimThreadId (RacyThreadId as) = "Thread {"++ intercalate "," (map show as) ++"}"
ppIOSimThreadId (ThreadId as) = "Thread " ++ show as
Expand Down
38 changes: 20 additions & 18 deletions io-sim/src/Control/Monad/IOSim/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTSyntax #-}
{-# LANGUAGE MultiParamTypeClasses #-}
Expand Down Expand Up @@ -48,17 +49,18 @@ module Control.Monad.IOSim.Internal

import Prelude hiding (read)

import Data.Coerce
import Data.Deque.Strict (Deque)
import Data.Deque.Strict qualified as Deque
import Data.Dynamic
import Data.Foldable (foldlM, toList, traverse_)
import Data.IntPSQ (IntPSQ)
import Data.IntPSQ qualified as PSQ
import Data.List qualified as List
import Data.List.Trace qualified as Trace
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.OrdPSQ (OrdPSQ)
import Data.OrdPSQ qualified as PSQ
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time (UTCTime (..), fromGregorian)
Expand Down Expand Up @@ -134,7 +136,7 @@ data TimerCompletionInfo s =
-- `TimeoutId` (only used to report in a trace).


type Timeouts s = OrdPSQ TimeoutId Time (TimerCompletionInfo s)
type Timeouts s = IntPSQ Time (TimerCompletionInfo s)

-- | Internal state.
--
Expand Down Expand Up @@ -263,7 +265,7 @@ schedule !thread@Thread{

DelayFrame tmid k ctl' -> do
let thread' = thread { threadControl = ThreadControl k ctl' }
timers' = PSQ.delete tmid timers
timers' = (PSQ.delete . coerce) tmid timers
schedule thread' simstate { timers = timers' }

Throw e -> case unwindControlStack e thread timers of
Expand Down Expand Up @@ -360,7 +362,7 @@ schedule !thread@Thread{
StartTimeout d action' k -> do
!lock <- TMVar <$> execNewTVar (TMVarId nextVid) (Just $! "lock-" ++ show nextTmid) Nothing
let !expiry = d `addTime` time
!timers' = PSQ.insert nextTmid expiry (TimerTimeout tid nextTmid lock) timers
!timers' = (PSQ.insert . coerce) nextTmid expiry (TimerTimeout tid nextTmid lock) timers
!thread' = thread { threadControl =
ThreadControl action'
(TimeoutFrame nextTmid lock k ctl)
Expand All @@ -373,7 +375,7 @@ schedule !thread@Thread{

UnregisterTimeout tmid k -> do
let thread' = thread { threadControl = ThreadControl k ctl }
schedule thread' simstate { timers = PSQ.delete tmid timers }
schedule thread' simstate { timers = (PSQ.delete . coerce) tmid timers }

RegisterDelay d k | d < 0 -> do
!tvar <- execNewTVar (TVarId nextVid)
Expand All @@ -391,7 +393,7 @@ schedule !thread@Thread{
(Just $! "<<timeout " ++ show (unTimeoutId nextTmid) ++ ">>")
False
let !expiry = d `addTime` time
!timers' = PSQ.insert nextTmid expiry (TimerRegisterDelay tvar) timers
!timers' = (PSQ.insert . coerce) nextTmid expiry (TimerRegisterDelay tvar) timers
!thread' = thread { threadControl = ThreadControl (k tvar) ctl }
trace <- schedule thread' simstate { timers = timers'
, nextVid = succ nextVid
Expand All @@ -410,7 +412,7 @@ schedule !thread@Thread{

ThreadDelay d k -> do
let !expiry = d `addTime` time
!timers' = PSQ.insert nextTmid expiry (TimerThreadDelay tid nextTmid) timers
!timers' = (PSQ.insert . coerce) nextTmid expiry (TimerThreadDelay tid nextTmid) timers
!thread' = thread { threadControl = ThreadControl (Return ()) (DelayFrame nextTmid k ctl) }
!trace <- deschedule (Blocked BlockedOnDelay) thread' simstate { timers = timers'
, nextTmid = succ nextTmid }
Expand All @@ -434,15 +436,15 @@ schedule !thread@Thread{
TimeoutPending
let !expiry = d `addTime` time
!t = Timeout tvar nextTmid
!timers' = PSQ.insert nextTmid expiry (Timer tvar) timers
!timers' = (PSQ.insert . coerce) nextTmid expiry (Timer tvar) timers
!thread' = thread { threadControl = ThreadControl (k t) ctl }
trace <- schedule thread' simstate { timers = timers'
, nextVid = succ nextVid
, nextTmid = succ nextTmid }
return (SimTrace time tid tlbl (EventTimerCreated nextTmid (TVarId nextVid) expiry) trace)

CancelTimeout (Timeout tvar tmid) k -> do
let !timers' = PSQ.delete tmid timers
let !timers' = (PSQ.delete . coerce) tmid timers
!thread' = thread { threadControl = ThreadControl k ctl }
!written <- execAtomically' (runSTM $ writeTVar tvar TimeoutCancelled)
-- note: we are not running traceTVar on 'tvar', since its not exposed to
Expand Down Expand Up @@ -925,8 +927,8 @@ unwindControlStack e thread = \timers ->
where
unwind :: forall s' c. MaskingState
-> ControlStack s' c a
-> OrdPSQ TimeoutId Time (TimerCompletionInfo s)
-> (Either Bool (Thread s' a), OrdPSQ TimeoutId Time (TimerCompletionInfo s))
-> IntPSQ Time (TimerCompletionInfo s)
-> (Either Bool (Thread s' a), IntPSQ Time (TimerCompletionInfo s))
unwind _ MainFrame timers = (Left True, timers)
unwind _ ForkFrame timers = (Left False, timers)
unwind _ (MaskFrame _k maskst' ctl) timers = unwind maskst' ctl timers
Expand Down Expand Up @@ -962,24 +964,24 @@ unwindControlStack e thread = \timers ->
_ -> unwind maskst ctl timers'
where
-- Remove the timeout associated with the 'TimeoutFrame'.
timers' = PSQ.delete tmid timers
timers' = (PSQ.delete . coerce) tmid timers

unwind maskst (DelayFrame tmid _k ctl) timers =
unwind maskst ctl timers'
where
-- Remove the timeout associated with the 'DelayFrame'.
timers' = PSQ.delete tmid timers
timers' = (PSQ.delete . coerce) tmid timers


atLeastInterruptibleMask :: MaskingState -> MaskingState
atLeastInterruptibleMask Unmasked = MaskedInterruptible
atLeastInterruptibleMask ms = ms


removeMinimums :: (Ord k, Ord p)
=> OrdPSQ k p a
-> Maybe ([k], p, [a], OrdPSQ k p a)
removeMinimums = \psq ->
removeMinimums :: (Coercible k Int, Ord p)
=> IntPSQ p a
-> Maybe ([k], p, [a], IntPSQ p a)
removeMinimums = \psq -> coerce $
case PSQ.minView psq of
Nothing -> Nothing
Just (k, p, x, psq') -> Just (collectAll [k] p [x] psq')
Expand Down
50 changes: 27 additions & 23 deletions io-sim/src/Control/Monad/IOSimPOR/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
-- incomplete uni patterns in 'schedule' (when interpreting 'StmTxCommitted')
-- and 'reschedule'.
{-# OPTIONS_GHC -Wno-incomplete-uni-patterns -Wno-unused-matches #-}
{-# OPTIONS_GHC -Wno-orphans #-}
#if __GLASGOW_HASKELL__ >= 908
-- We use partial functions from `Data.List`.
{-# OPTIONS_GHC -Wno-x-partial #-}
Expand Down Expand Up @@ -53,14 +54,16 @@ import Prelude hiding (read)

import Data.Dynamic
import Data.Foldable (foldlM, traverse_)
import Data.HashPSQ (HashPSQ)
import Data.HashPSQ qualified as PSQ
import Data.IntPSQ (IntPSQ)
import Data.IntPSQ qualified as IPSQ
import Data.List qualified as List
import Data.List.Trace qualified as Trace
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (mapMaybe)
import Data.Ord
import Data.OrdPSQ (OrdPSQ)
import Data.OrdPSQ qualified as PSQ
import Data.Set (Set)
import Data.Set qualified as Set
import Data.Time (UTCTime (..), fromGregorian)
Expand All @@ -85,6 +88,8 @@ import Control.Monad.IOSim.Types hiding (SimEvent (SimEvent), Trace (SimTrace))
import Control.Monad.IOSim.Types (SimEvent)
import Control.Monad.IOSimPOR.Timeout (unsafeTimeout)
import Control.Monad.IOSimPOR.Types
import Data.Coerce (Coercible, coerce)
import Data.Hashable

--
-- Simulation interpreter
Expand Down Expand Up @@ -178,8 +183,10 @@ data TimerCompletionInfo s =
-- ^ `timeout` timer run by `IOSimThreadId` which was assigned the given
-- `TimeoutId` (only used to report in a trace).

type RunQueue = OrdPSQ (Down IOSimThreadId) (Down IOSimThreadId) ()
type Timeouts s = OrdPSQ TimeoutId Time (TimerCompletionInfo s)
instance Hashable a => Hashable (Down a)

type RunQueue = HashPSQ (Down IOSimThreadId) (Down IOSimThreadId) ()
type Timeouts s = IntPSQ Time (TimerCompletionInfo s)

-- | Internal state.
--
Expand Down Expand Up @@ -215,7 +222,7 @@ initialState =
runqueue = PSQ.empty,
threads = Map.empty,
curTime = Time 0,
timers = PSQ.empty,
timers = IPSQ.empty,
clocks = Map.singleton (ClockId []) epoch1970,
nextVid = 0,
nextTmid = TimeoutId 0,
Expand All @@ -240,9 +247,6 @@ invariant Nothing SimState{runqueue,threads,clocks} =
assert (PSQ.fold' (\(Down tid) _ _ a -> tid `Map.member` threads && a) True runqueue)
. assert (and [ (isThreadBlocked t || isThreadDone t) == not (Down (threadId t) `PSQ.member` runqueue)
| t <- Map.elems threads ])
. assert (and (zipWith (\(Down tid, _, _) (Down tid', _, _) -> tid > tid')
(PSQ.toList runqueue)
(drop 1 (PSQ.toList runqueue))))
. assert (and [ threadClockId t `Map.member` clocks
| t <- Map.elems threads ])

Expand Down Expand Up @@ -372,7 +376,7 @@ schedule thread@Thread{

DelayFrame tmid k ctl' -> do
let thread' = thread { threadControl = ThreadControl k ctl' }
timers' = PSQ.delete tmid timers
timers' = IPSQ.delete (coerce tmid) timers
schedule thread' simstate { timers = timers' }

Throw e -> case unwindControlStack e thread timers of
Expand Down Expand Up @@ -482,7 +486,7 @@ schedule thread@Thread{
StartTimeout d action' k -> do
lock <- TMVar <$> execNewTVar (TMVarId nextVid) (Just $! "lock-" ++ show nextTmid) Nothing
let expiry = d `addTime` time
timers' = PSQ.insert nextTmid expiry (TimerTimeout tid nextTmid lock) timers
timers' = IPSQ.insert (coerce nextTmid) expiry (TimerTimeout tid nextTmid lock) timers
thread' = thread { threadControl =
ThreadControl action'
(TimeoutFrame nextTmid lock k ctl)
Expand All @@ -493,7 +497,7 @@ schedule thread@Thread{

UnregisterTimeout tmid k -> do
let thread' = thread { threadControl = ThreadControl k ctl }
schedule thread' simstate { timers = PSQ.delete tmid timers }
schedule thread' simstate { timers = IPSQ.delete (coerce tmid) timers }

RegisterDelay d k | d < 0 -> do
tvar <- execNewTVar (TVarId nextVid)
Expand All @@ -513,7 +517,7 @@ schedule thread@Thread{
False
modifySTRef (tvarVClock tvar) (leastUpperBoundVClock vClock)
let !expiry = d `addTime` time
!timers' = PSQ.insert nextTmid expiry (TimerRegisterDelay tvar) timers
!timers' = IPSQ.insert (coerce nextTmid) expiry (TimerRegisterDelay tvar) timers
!thread' = thread { threadControl = ThreadControl (k tvar) ctl }
trace <- schedule thread' simstate { timers = timers'
, nextVid = succ nextVid
Expand All @@ -532,7 +536,7 @@ schedule thread@Thread{

ThreadDelay d k -> do
let expiry = d `addTime` time
timers' = PSQ.insert nextTmid expiry (TimerThreadDelay tid nextTmid) timers
timers' = IPSQ.insert (coerce nextTmid) expiry (TimerThreadDelay tid nextTmid) timers
thread' = thread { threadControl = ThreadControl (Return ()) (DelayFrame nextTmid k ctl) }
trace <- deschedule (Blocked BlockedOnDelay) thread'
simstate { timers = timers',
Expand All @@ -558,15 +562,15 @@ schedule thread@Thread{
modifySTRef (tvarVClock tvar) (leastUpperBoundVClock vClock)
let expiry = d `addTime` time
t = Timeout tvar nextTmid
timers' = PSQ.insert nextTmid expiry (Timer tvar) timers
timers' = IPSQ.insert (coerce nextTmid) expiry (Timer tvar) timers
thread' = thread { threadControl = ThreadControl (k t) ctl }
trace <- schedule thread' simstate { timers = timers'
, nextVid = succ (succ nextVid)
, nextTmid = succ nextTmid }
return (SimPORTrace time tid tstep tlbl (EventTimerCreated nextTmid (TVarId nextVid) expiry) trace)

CancelTimeout (Timeout tvar tmid) k -> do
let timers' = PSQ.delete tmid timers
let timers' = IPSQ.delete (coerce tmid) timers
written <- execAtomically' (runSTM $ writeTVar tvar TimeoutCancelled)
written' <- mapM someTVarToLabelled written
(wakeup, wokeby) <- threadsUnblockedByWrites written
Expand Down Expand Up @@ -1291,29 +1295,29 @@ unwindControlStack e thread = \timeouts ->
_ -> unwind maskst ctl timers'
where
-- Remove the timeout associated with the 'TimeoutFrame'.
timers' = PSQ.delete tmid timers
timers' = IPSQ.delete (coerce tmid) timers

unwind maskst (DelayFrame tmid _k ctl) timers =
unwind maskst ctl timers'
where
-- Remove the timeout associated with the 'DelayFrame'.
timers' = PSQ.delete tmid timers
timers' = IPSQ.delete (coerce tmid) timers

atLeastInterruptibleMask :: MaskingState -> MaskingState
atLeastInterruptibleMask Unmasked = MaskedInterruptible
atLeastInterruptibleMask ms = ms


removeMinimums :: (Ord k, Ord p)
=> OrdPSQ k p a
-> Maybe ([k], p, [a], OrdPSQ k p a)
removeMinimums = \psq ->
case PSQ.minView psq of
removeMinimums :: (Coercible Int k, Ord p)
=> IntPSQ p a
-> Maybe ([k], p, [a], IntPSQ p a)
removeMinimums = \psq -> coerce $
case IPSQ.minView psq of
Nothing -> Nothing
Just (k, p, x, psq') -> Just (collectAll [k] p [x] psq')
where
collectAll ks p xs psq =
case PSQ.minView psq of
case IPSQ.minView psq of
Just (k, p', x, psq')
| p == p' -> collectAll (k:ks) p (x:xs) psq'
_ -> (reverse ks, p, reverse xs, psq)
Expand Down
Loading