8
8
{-# LANGUAGE RecordWildCards #-}
9
9
{-# LANGUAGE TypeFamilies #-}
10
10
11
- module Development.IDE.Graph.Internal.Database (compute , newDatabase , incDatabase , build , getDirtySet , getKeysAndVisitAge , AsyncParentKill ( .. ) ) where
11
+ module Development.IDE.Graph.Internal.Database (compute , newDatabase , incDatabase , build , getDirtySet , getKeysAndVisitAge ) where
12
12
13
13
import Prelude hiding (unzip )
14
14
15
15
import Control.Concurrent.Async
16
16
import Control.Concurrent.Extra
17
- import Control.Concurrent.STM.Stats (STM , TVar , atomically ,
17
+ import Control.Concurrent.STM.Stats (STM , atomically ,
18
18
atomicallyNamed ,
19
19
modifyTVar' , newTVarIO ,
20
- readTVar , readTVarIO ,
21
- retry )
20
+ readTVarIO )
22
21
import Control.Exception
23
22
import Control.Monad
24
23
import Control.Monad.IO.Class (MonadIO (liftIO ))
25
24
import Control.Monad.Trans.Class (lift )
26
25
import Control.Monad.Trans.Reader
27
26
import qualified Control.Monad.Trans.State.Strict as State
28
27
import Data.Dynamic
28
+ import Data.Either
29
29
import Data.Foldable (for_ , traverse_ )
30
30
import Data.IORef.Extra
31
31
import Data.Maybe
@@ -39,9 +39,8 @@ import Development.IDE.Graph.Internal.Types
39
39
import qualified Focus
40
40
import qualified ListT
41
41
import qualified StmContainers.Map as SMap
42
+ import System.IO.Unsafe
42
43
import System.Time.Extra (duration , sleep )
43
- import UnliftIO (MonadUnliftIO (withRunInIO ))
44
- import qualified UnliftIO.Exception as UE
45
44
46
45
#if MIN_VERSION_base(4,19,0)
47
46
import Data.Functor (unzip )
@@ -68,22 +67,18 @@ incDatabase db (Just kk) = do
68
67
-- since we assume that no build is mutating the db.
69
68
-- Therefore run one transaction per key to minimise contention.
70
69
atomicallyNamed " incDatabase" $ SMap. focus updateDirty k (databaseValues db)
71
- -- let list = SMap.listT (databaseValues db)
72
- -- atomicallyNamed "incDatabase - all " $ flip ListT.traverse_ list $ \(k,_) ->
73
- -- SMap.focus dirtyRunningKey k (databaseValues db)
74
70
75
71
-- all keys are dirty
76
72
incDatabase db Nothing = do
77
73
atomically $ modifyTVar' (databaseStep db) $ \ (Step i) -> Step $ i + 1
78
74
let list = SMap. listT (databaseValues db)
79
- -- all running keys are also dirty
80
75
atomicallyNamed " incDatabase - all " $ flip ListT. traverse_ list $ \ (k,_) ->
81
76
SMap. focus updateDirty k (databaseValues db)
82
77
83
78
updateDirty :: Monad m => Focus. Focus KeyDetails m ()
84
79
updateDirty = Focus. adjust $ \ (KeyDetails status rdeps) ->
85
80
let status'
86
- | Running _ x <- status = Dirty x
81
+ | Running _ _ _ x <- status = Dirty x
87
82
| Clean x <- status = Dirty (Just x)
88
83
| otherwise = status
89
84
in KeyDetails status' rdeps
@@ -93,57 +88,58 @@ build
93
88
=> Database -> Stack -> f key -> IO (f Key , f value )
94
89
-- build _ st k | traceShow ("build", st, k) False = undefined
95
90
build db stack keys = do
96
- step <- readTVarIO $ databaseStep db
97
- go `catch` \ e@ (AsyncParentKill i s) -> do
98
- if s == step
99
- then throw e
100
- else throw $ AsyncParentKill i $ Step (- 1 )
91
+ built <- runAIO $ do
92
+ built <- builder db stack (fmap newKey keys)
93
+ case built of
94
+ Left clean -> return clean
95
+ Right dirty -> liftIO dirty
96
+ let (ids, vs) = unzip built
97
+ pure (ids, fmap (asV . resultValue) vs)
101
98
where
102
- go = do
103
- step <- readTVarIO $ databaseStep db
104
- ! built <- runAIO step $ builder db stack (fmap newKey keys)
105
- let (ids, vs) = unzip built
106
- pure (ids, fmap (asV . resultValue) vs)
107
- where
108
- asV :: Value -> value
109
- asV (Value x) = unwrapDynamic x
110
-
99
+ asV :: Value -> value
100
+ asV (Value x) = unwrapDynamic x
111
101
112
102
-- | Build a list of keys and return their results.
113
103
-- If none of the keys are dirty, we can return the results immediately.
114
104
-- Otherwise, a blocking computation is returned *which must be evaluated asynchronously* to avoid deadlock.
115
- builder :: (Traversable f ) => Database -> Stack -> f Key -> AIO (f (Key , Result ))
105
+ builder
106
+ :: Traversable f => Database -> Stack -> f Key -> AIO (Either (f (Key , Result )) (IO (f (Key , Result ))))
116
107
-- builder _ st kk | traceShow ("builder", st,kk) False = undefined
117
- builder db stack keys = do
118
- keyWaits <- for keys $ \ k -> builderOne db stack k
119
- ! res <- for keyWaits $ \ (k, waitR) -> do
120
- ! v<- liftIO waitR
121
- return (k, v)
122
- return res
123
-
124
- builderOne :: Database -> Stack -> Key -> AIO (Key , IO Result )
125
- builderOne db@ Database {.. } stack id = UE. uninterruptibleMask $ \ restore -> do
126
- current <- liftIO $ readTVarIO databaseStep
127
- (k, registerWaitResult) <- restore $ liftIO $ atomicallyNamed " builder" $ do
128
- -- Spawn the id if needed
129
- status <- SMap. lookup id databaseValues
130
- val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
131
- Dirty s -> do
132
- let act =
133
- asyncWithCleanUp
134
- ((restore $ refresh db stack id s)
135
- `UE.onException` UE. uninterruptibleMask_ (liftIO (atomicallyNamed " builder - onException" (SMap. focus updateDirty id databaseValues)))
136
- )
137
- SMap. focus (updateStatus $ Running current s) id databaseValues
138
- return act
139
- Clean r -> pure . pure . pure $ r
140
- -- force here might contains async exceptions from previous runs
141
- Running _step _s
142
- | memberStack id stack -> throw $ StackException stack
143
- | otherwise -> retry
144
- pure (id , val)
145
- waitR <- registerWaitResult
146
- return (k, waitR)
108
+ builder db@ Database {.. } stack keys = withRunInIO $ \ (RunInIO run) -> do
109
+ -- Things that I need to force before my results are ready
110
+ toForce <- liftIO $ newTVarIO []
111
+ current <- liftIO $ readTVarIO databaseStep
112
+ results <- liftIO $ for keys $ \ id ->
113
+ -- Updating the status of all the dependencies atomically is not necessary.
114
+ -- Therefore, run one transaction per dep. to avoid contention
115
+ atomicallyNamed " builder" $ do
116
+ -- Spawn the id if needed
117
+ status <- SMap. lookup id databaseValues
118
+ val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
119
+ Clean r -> pure r
120
+ Running _ force val _
121
+ | memberStack id stack -> throw $ StackException stack
122
+ | otherwise -> do
123
+ modifyTVar' toForce (Wait force : )
124
+ pure val
125
+ Dirty s -> do
126
+ let act = run (refresh db stack id s)
127
+ (force, val) = splitIO (join act)
128
+ SMap. focus (updateStatus $ Running current force val s) id databaseValues
129
+ modifyTVar' toForce (Spawn force: )
130
+ pure val
131
+
132
+ pure (id , val)
133
+
134
+ toForceList <- liftIO $ readTVarIO toForce
135
+ let waitAll = run $ waitConcurrently_ toForceList
136
+ case toForceList of
137
+ [] -> return $ Left results
138
+ _ -> return $ Right $ do
139
+ waitAll
140
+ pure results
141
+
142
+
147
143
-- | isDirty
148
144
-- only dirty when it's build time is older than the changed time of one of its dependencies
149
145
isDirty :: Foldable t => Result -> t (a , Result ) -> Bool
@@ -159,37 +155,41 @@ isDirty me = any (\(_,dep) -> resultBuilt me < resultChanged dep)
159
155
refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet ] -> AIO Result
160
156
refreshDeps visited db stack key result = \ case
161
157
-- no more deps to refresh
162
- [] -> compute' db stack key RunDependenciesSame (Just result)
158
+ [] -> liftIO $ compute db stack key RunDependenciesSame (Just result)
163
159
(dep: deps) -> do
164
160
let newVisited = dep <> visited
165
161
res <- builder db stack (toListKeySet (dep `differenceKeySet` visited))
166
- if isDirty result res
162
+ case res of
163
+ Left res -> if isDirty result res
167
164
-- restart the computation if any of the deps are dirty
168
- then compute' db stack key RunDependenciesChanged (Just result)
165
+ then liftIO $ compute db stack key RunDependenciesChanged (Just result)
169
166
-- else kick the rest of the deps
170
167
else refreshDeps newVisited db stack key result deps
171
-
172
-
173
- -- refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
168
+ Right iores -> do
169
+ res <- liftIO iores
170
+ if isDirty result res
171
+ then liftIO $ compute db stack key RunDependenciesChanged (Just result)
172
+ else refreshDeps newVisited db stack key result deps
173
+
174
+ -- | Refresh a key:
175
+ refresh :: Database -> Stack -> Key -> Maybe Result -> AIO (IO Result )
174
176
-- refresh _ st k _ | traceShow ("refresh", st, k) False = undefined
175
- refresh :: Database -> Stack -> Key -> Maybe Result -> AIO Result
176
177
refresh db stack key result = case (addStack key stack, result) of
177
178
(Left e, _) -> throw e
178
- (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> refreshDeps mempty db stack key me (reverse deps)
179
- (Right stack, _) -> compute' db stack key RunDependenciesChanged result
179
+ (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> asyncWithCleanUp $ refreshDeps mempty db stack key me (reverse deps)
180
+ (Right stack, _) ->
181
+ asyncWithCleanUp $ liftIO $ compute db stack key RunDependenciesChanged result
180
182
181
- compute' :: Database -> Stack -> Key -> RunMode -> Maybe Result -> AIO Result
182
- compute' db stack key mode result = liftIO $ compute db stack key mode result
183
183
-- | Compute a key.
184
184
compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> IO Result
185
185
-- compute _ st k _ _ | traceShow ("compute", st, k) False = undefined
186
186
compute db@ Database {.. } stack key mode result = do
187
187
let act = runRule databaseRules key (fmap resultData result) mode
188
- deps <- liftIO $ newIORef UnknownDeps
188
+ deps <- newIORef UnknownDeps
189
189
(execution, RunResult {.. }) <-
190
- liftIO $ duration $ runReaderT (fromAction act) $ SAction db deps stack
191
- curStep <- liftIO $ readTVarIO databaseStep
192
- deps <- liftIO $ readIORef deps
190
+ duration $ runReaderT (fromAction act) $ SAction db deps stack
191
+ curStep <- readTVarIO databaseStep
192
+ deps <- readIORef deps
193
193
let lastChanged = maybe curStep resultChanged result
194
194
let lastBuild = maybe curStep resultBuilt result
195
195
-- changed time is always older than or equal to build time
@@ -212,12 +212,12 @@ compute db@Database{..} stack key mode result = do
212
212
-- If an async exception strikes before the deps have been recorded,
213
213
-- we won't be able to accurately propagate dirtiness for this key
214
214
-- on the next build.
215
- liftIO $ void $
215
+ void $
216
216
updateReverseDeps key db
217
217
(getResultDepsDefault mempty previousDeps)
218
218
deps
219
219
_ -> pure ()
220
- liftIO $ atomicallyNamed " compute and run hook" $ do
220
+ atomicallyNamed " compute and run hook" $ do
221
221
runHook
222
222
SMap. focus (updateStatus $ Clean res) key databaseValues
223
223
pure res
@@ -247,6 +247,18 @@ getKeysAndVisitAge db = do
247
247
getAge Result {resultVisited = Step s} = curr - s
248
248
return keysWithVisitAge
249
249
--------------------------------------------------------------------------------
250
+ -- Lazy IO trick
251
+
252
+ data Box a = Box { fromBox :: a }
253
+
254
+ -- | Split an IO computation into an unsafe lazy value and a forcing computation
255
+ splitIO :: IO a -> (IO () , a )
256
+ splitIO act = do
257
+ let act2 = Box <$> act
258
+ let res = unsafePerformIO act2
259
+ (void $ evaluate res, fromBox res)
260
+
261
+ --------------------------------------------------------------------------------
250
262
-- Reverse dependencies
251
263
252
264
-- | Update the reverse dependencies of an Id
@@ -289,29 +301,14 @@ transitiveDirtySet database = flip State.execStateT mempty . traverse_ loop
289
301
290
302
-- | A simple monad to implement cancellation on top of 'Async',
291
303
-- generalizing 'withAsync' to monadic scopes.
292
- newtype AIO a = AIO { unAIO :: ReaderT (TVar [Async () ]) IO a }
304
+ newtype AIO a = AIO { unAIO :: ReaderT (IORef [Async () ]) IO a }
293
305
deriving newtype (Applicative , Functor , Monad , MonadIO )
294
306
295
- data AsyncParentKill = AsyncParentKill ThreadId Step
296
- deriving (Show , Eq )
297
-
298
- instance Exception AsyncParentKill where
299
- toException = asyncExceptionToException
300
- fromException = asyncExceptionFromException
301
-
302
307
-- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises
303
- runAIO :: Step -> AIO a -> IO a
304
- runAIO s (AIO act) = do
305
- asyncsRef <- newTVarIO []
306
- -- Log the exact exception (including async exceptions) before cleanup,
307
- -- then rethrow to preserve previous semantics.
308
- runReaderT act asyncsRef `onException` do
309
- asyncs <- atomically $ do
310
- r <- readTVar asyncsRef
311
- modifyTVar' asyncsRef $ const []
312
- return r
313
- tid <- myThreadId
314
- cleanupAsync asyncs tid s
308
+ runAIO :: AIO a -> IO a
309
+ runAIO (AIO act) = do
310
+ asyncs <- newIORef []
311
+ runReaderT act asyncs `onException` cleanupAsync asyncs
315
312
316
313
-- | Like 'async' but with built-in cancellation.
317
314
-- Returns an IO action to wait on the result.
@@ -322,25 +319,27 @@ asyncWithCleanUp act = do
322
319
-- mask to make sure we keep track of the spawned async
323
320
liftIO $ uninterruptibleMask $ \ restore -> do
324
321
a <- async $ restore io
325
- atomically $ modifyTVar' st (void a : )
322
+ atomicModifyIORef'_ st (void a : )
326
323
return $ wait a
327
324
328
325
unliftAIO :: AIO a -> AIO (IO a )
329
326
unliftAIO act = do
330
327
st <- AIO ask
331
328
return $ runReaderT (unAIO act) st
332
329
333
- instance MonadUnliftIO AIO where
334
- withRunInIO k = do
335
- st <- AIO ask
336
- liftIO $ k (\ aio -> runReaderT (unAIO aio) st)
330
+ newtype RunInIO = RunInIO (forall a . AIO a -> IO a )
337
331
338
- cleanupAsync :: [Async a ] -> ThreadId -> Step -> IO ()
332
+ withRunInIO :: (RunInIO -> AIO b ) -> AIO b
333
+ withRunInIO k = do
334
+ st <- AIO ask
335
+ k $ RunInIO (\ aio -> runReaderT (unAIO aio) st)
336
+
337
+ cleanupAsync :: IORef [Async a ] -> IO ()
339
338
-- mask to make sure we interrupt all the asyncs
340
- cleanupAsync asyncs tid step = uninterruptibleMask $ \ unmask -> do
339
+ cleanupAsync ref = uninterruptibleMask $ \ unmask -> do
340
+ asyncs <- atomicModifyIORef' ref ([] ,)
341
341
-- interrupt all the asyncs without waiting
342
- -- mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs
343
- mapM_ (\ a -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) asyncs
342
+ mapM_ (\ a -> throwTo (asyncThreadId a) AsyncCancelled ) asyncs
344
343
-- Wait until all the asyncs are done
345
344
-- But if it takes more than 10 seconds, log to stderr
346
345
unless (null asyncs) $ do
@@ -349,3 +348,32 @@ cleanupAsync asyncs tid step = uninterruptibleMask $ \unmask -> do
349
348
traceM " cleanupAsync: waiting for asyncs to finish"
350
349
withAsync warnIfTakingTooLong $ \ _ ->
351
350
mapM_ waitCatch asyncs
351
+
352
+ data Wait
353
+ = Wait { justWait :: ! (IO () )}
354
+ | Spawn { justWait :: ! (IO () )}
355
+
356
+ fmapWait :: (IO () -> IO () ) -> Wait -> Wait
357
+ fmapWait f (Wait io) = Wait (f io)
358
+ fmapWait f (Spawn io) = Spawn (f io)
359
+
360
+ waitOrSpawn :: Wait -> IO (Either (IO () ) (Async () ))
361
+ waitOrSpawn (Wait io) = pure $ Left io
362
+ waitOrSpawn (Spawn io) = Right <$> async io
363
+
364
+ waitConcurrently_ :: [Wait ] -> AIO ()
365
+ waitConcurrently_ [] = pure ()
366
+ waitConcurrently_ [one] = liftIO $ justWait one
367
+ waitConcurrently_ many = do
368
+ ref <- AIO ask
369
+ -- spawn the async computations.
370
+ -- mask to make sure we keep track of all the asyncs.
371
+ (asyncs, syncs) <- liftIO $ uninterruptibleMask $ \ unmask -> do
372
+ waits <- liftIO $ traverse (waitOrSpawn . fmapWait unmask) many
373
+ let (syncs, asyncs) = partitionEithers waits
374
+ liftIO $ atomicModifyIORef'_ ref (asyncs ++ )
375
+ return (asyncs, syncs)
376
+ -- work on the sync computations
377
+ liftIO $ sequence_ syncs
378
+ -- wait for the async computations before returning
379
+ liftIO $ traverse_ wait asyncs
0 commit comments