8
8
{-# LANGUAGE RecordWildCards #-}
9
9
{-# LANGUAGE TypeFamilies #-}
10
10
11
- module Development.IDE.Graph.Internal.Database (compute , newDatabase , incDatabase , build , getDirtySet , getKeysAndVisitAge ) where
11
+ module Development.IDE.Graph.Internal.Database (compute , newDatabase , incDatabase , build , getDirtySet , getKeysAndVisitAge , AsyncParentKill ( .. ) ) where
12
12
13
13
import Prelude hiding (unzip )
14
14
15
15
import Control.Concurrent.Async
16
16
import Control.Concurrent.Extra
17
- import Control.Concurrent.STM.Stats (STM , atomically ,
17
+ import Control.Concurrent.STM.Stats (STM , TVar , atomically ,
18
18
atomicallyNamed ,
19
19
modifyTVar' , newTVarIO ,
20
- readTVarIO )
20
+ readTVar , readTVarIO ,
21
+ retry )
21
22
import Control.Exception
22
23
import Control.Monad
23
24
import Control.Monad.IO.Class (MonadIO (liftIO ))
24
25
import Control.Monad.Trans.Class (lift )
25
26
import Control.Monad.Trans.Reader
26
27
import qualified Control.Monad.Trans.State.Strict as State
27
28
import Data.Dynamic
28
- import Data.Either
29
29
import Data.Foldable (for_ , traverse_ )
30
30
import Data.IORef.Extra
31
31
import Data.Maybe
@@ -39,11 +39,12 @@ import Development.IDE.Graph.Internal.Types
39
39
import qualified Focus
40
40
import qualified ListT
41
41
import qualified StmContainers.Map as SMap
42
- import System.IO.Unsafe
43
42
import System.Time.Extra (duration , sleep )
44
43
45
44
#if MIN_VERSION_base(4,19,0)
46
45
import Data.Functor (unzip )
46
+ import UnliftIO (MonadUnliftIO (withRunInIO ))
47
+ import qualified UnliftIO.Exception as UE
47
48
#else
48
49
import Data.List.NonEmpty (unzip )
49
50
#endif
@@ -67,18 +68,22 @@ incDatabase db (Just kk) = do
67
68
-- since we assume that no build is mutating the db.
68
69
-- Therefore run one transaction per key to minimise contention.
69
70
atomicallyNamed " incDatabase" $ SMap. focus updateDirty k (databaseValues db)
71
+ -- let list = SMap.listT (databaseValues db)
72
+ -- atomicallyNamed "incDatabase - all " $ flip ListT.traverse_ list $ \(k,_) ->
73
+ -- SMap.focus dirtyRunningKey k (databaseValues db)
70
74
71
75
-- all keys are dirty
72
76
incDatabase db Nothing = do
73
77
atomically $ modifyTVar' (databaseStep db) $ \ (Step i) -> Step $ i + 1
74
78
let list = SMap. listT (databaseValues db)
79
+ -- all running keys are also dirty
75
80
atomicallyNamed " incDatabase - all " $ flip ListT. traverse_ list $ \ (k,_) ->
76
81
SMap. focus updateDirty k (databaseValues db)
77
82
78
83
updateDirty :: Monad m => Focus. Focus KeyDetails m ()
79
84
updateDirty = Focus. adjust $ \ (KeyDetails status rdeps) ->
80
85
let status'
81
- | Running _ _ _ x <- status = Dirty x
86
+ | Running _ x <- status = Dirty x
82
87
| Clean x <- status = Dirty (Just x)
83
88
| otherwise = status
84
89
in KeyDetails status' rdeps
@@ -88,58 +93,57 @@ build
88
93
=> Database -> Stack -> f key -> IO (f Key , f value )
89
94
-- build _ st k | traceShow ("build", st, k) False = undefined
90
95
build db stack keys = do
91
- built <- runAIO $ do
92
- built <- builder db stack (fmap newKey keys)
93
- case built of
94
- Left clean -> return clean
95
- Right dirty -> liftIO dirty
96
- let (ids, vs) = unzip built
97
- pure (ids, fmap (asV . resultValue) vs)
96
+ step <- readTVarIO $ databaseStep db
97
+ go `catch` \ e@ (AsyncParentKill i s) -> do
98
+ if s == step
99
+ then throw e
100
+ else throw $ AsyncParentKill i $ Step (- 1 )
98
101
where
99
- asV :: Value -> value
100
- asV (Value x) = unwrapDynamic x
102
+ go = do
103
+ step <- readTVarIO $ databaseStep db
104
+ ! built <- runAIO step $ builder db stack (fmap newKey keys)
105
+ let (ids, vs) = unzip built
106
+ pure (ids, fmap (asV . resultValue) vs)
107
+ where
108
+ asV :: Value -> value
109
+ asV (Value x) = unwrapDynamic x
110
+
101
111
102
112
-- | Build a list of keys and return their results.
103
113
-- If none of the keys are dirty, we can return the results immediately.
104
114
-- Otherwise, a blocking computation is returned *which must be evaluated asynchronously* to avoid deadlock.
105
- builder
106
- :: Traversable f => Database -> Stack -> f Key -> AIO (Either (f (Key , Result )) (IO (f (Key , Result ))))
115
+ builder :: (Traversable f ) => Database -> Stack -> f Key -> AIO (f (Key , Result ))
107
116
-- builder _ st kk | traceShow ("builder", st,kk) False = undefined
108
- builder db@ Database {.. } stack keys = withRunInIO $ \ (RunInIO run) -> do
109
- -- Things that I need to force before my results are ready
110
- toForce <- liftIO $ newTVarIO []
111
- current <- liftIO $ readTVarIO databaseStep
112
- results <- liftIO $ for keys $ \ id ->
113
- -- Updating the status of all the dependencies atomically is not necessary.
114
- -- Therefore, run one transaction per dep. to avoid contention
115
- atomicallyNamed " builder" $ do
116
- -- Spawn the id if needed
117
- status <- SMap. lookup id databaseValues
118
- val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
119
- Clean r -> pure r
120
- Running _ force val _
121
- | memberStack id stack -> throw $ StackException stack
122
- | otherwise -> do
123
- modifyTVar' toForce (Wait force : )
124
- pure val
125
- Dirty s -> do
126
- let act = run (refresh db stack id s)
127
- (force, val) = splitIO (join act)
128
- SMap. focus (updateStatus $ Running current force val s) id databaseValues
129
- modifyTVar' toForce (Spawn force: )
130
- pure val
131
-
132
- pure (id , val)
133
-
134
- toForceList <- liftIO $ readTVarIO toForce
135
- let waitAll = run $ waitConcurrently_ toForceList
136
- case toForceList of
137
- [] -> return $ Left results
138
- _ -> return $ Right $ do
139
- waitAll
140
- pure results
141
-
142
-
117
+ builder db stack keys = do
118
+ keyWaits <- for keys $ \ k -> builderOne db stack k
119
+ ! res <- for keyWaits $ \ (k, waitR) -> do
120
+ ! v<- liftIO waitR
121
+ return (k, v)
122
+ return res
123
+
124
+ builderOne :: Database -> Stack -> Key -> AIO (Key , IO Result )
125
+ builderOne db@ Database {.. } stack id = UE. mask_ $ do
126
+ current <- liftIO $ readTVarIO databaseStep
127
+ (k, registerWaitResult) <- liftIO $ atomicallyNamed " builder" $ do
128
+ -- Spawn the id if needed
129
+ status <- SMap. lookup id databaseValues
130
+ val <- case viewDirty current $ maybe (Dirty Nothing ) keyStatus status of
131
+ Dirty s -> do
132
+ let act =
133
+ asyncWithCleanUp
134
+ ( refresh db stack id s
135
+ `UE.onException` liftIO (atomicallyNamed " builder - onException" (SMap. focus updateDirty id databaseValues))
136
+ )
137
+ SMap. focus (updateStatus $ Running current s) id databaseValues
138
+ return act
139
+ Clean r -> pure . pure . pure $ r
140
+ -- force here might contains async exceptions from previous runs
141
+ Running _step _s
142
+ | memberStack id stack -> throw $ StackException stack
143
+ | otherwise -> retry
144
+ pure (id , val)
145
+ waitR <- registerWaitResult
146
+ return (k, waitR)
143
147
-- | isDirty
144
148
-- only dirty when it's build time is older than the changed time of one of its dependencies
145
149
isDirty :: Foldable t => Result -> t (a , Result ) -> Bool
@@ -155,41 +159,35 @@ isDirty me = any (\(_,dep) -> resultBuilt me < resultChanged dep)
155
159
refreshDeps :: KeySet -> Database -> Stack -> Key -> Result -> [KeySet ] -> AIO Result
156
160
refreshDeps visited db stack key result = \ case
157
161
-- no more deps to refresh
158
- [] -> liftIO $ compute db stack key RunDependenciesSame (Just result)
162
+ [] -> compute db stack key RunDependenciesSame (Just result)
159
163
(dep: deps) -> do
160
164
let newVisited = dep <> visited
161
165
res <- builder db stack (toListKeySet (dep `differenceKeySet` visited))
162
- case res of
163
- Left res -> if isDirty result res
166
+ if isDirty result res
164
167
-- restart the computation if any of the deps are dirty
165
- then liftIO $ compute db stack key RunDependenciesChanged (Just result)
168
+ then compute db stack key RunDependenciesChanged (Just result)
166
169
-- else kick the rest of the deps
167
170
else refreshDeps newVisited db stack key result deps
168
- Right iores -> do
169
- res <- liftIO iores
170
- if isDirty result res
171
- then liftIO $ compute db stack key RunDependenciesChanged (Just result)
172
- else refreshDeps newVisited db stack key result deps
173
-
174
- -- | Refresh a key:
175
- refresh :: Database -> Stack -> Key -> Maybe Result -> AIO (IO Result )
171
+
172
+
173
+ -- refresh :: Database -> Stack -> Key -> Maybe Result -> IO Result
176
174
-- refresh _ st k _ | traceShow ("refresh", st, k) False = undefined
175
+ refresh :: Database -> Stack -> Key -> Maybe Result -> AIO Result
177
176
refresh db stack key result = case (addStack key stack, result) of
178
177
(Left e, _) -> throw e
179
- (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> asyncWithCleanUp $ refreshDeps mempty db stack key me (reverse deps)
180
- (Right stack, _) ->
181
- asyncWithCleanUp $ liftIO $ compute db stack key RunDependenciesChanged result
178
+ (Right stack, Just me@ Result {resultDeps = ResultDeps deps}) -> refreshDeps mempty db stack key me (reverse deps)
179
+ (Right stack, _) -> compute db stack key RunDependenciesChanged result
182
180
183
181
-- | Compute a key.
184
- compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> IO Result
182
+ compute :: Database -> Stack -> Key -> RunMode -> Maybe Result -> AIO Result
185
183
-- compute _ st k _ _ | traceShow ("compute", st, k) False = undefined
186
184
compute db@ Database {.. } stack key mode result = do
187
185
let act = runRule databaseRules key (fmap resultData result) mode
188
- deps <- newIORef UnknownDeps
186
+ deps <- liftIO $ newIORef UnknownDeps
189
187
(execution, RunResult {.. }) <-
190
- duration $ runReaderT (fromAction act) $ SAction db deps stack
191
- curStep <- readTVarIO databaseStep
192
- deps <- readIORef deps
188
+ liftIO $ duration $ runReaderT (fromAction act) $ SAction db deps stack
189
+ curStep <- liftIO $ readTVarIO databaseStep
190
+ deps <- liftIO $ readIORef deps
193
191
let lastChanged = maybe curStep resultChanged result
194
192
let lastBuild = maybe curStep resultBuilt result
195
193
-- changed time is always older than or equal to build time
@@ -212,12 +210,12 @@ compute db@Database{..} stack key mode result = do
212
210
-- If an async exception strikes before the deps have been recorded,
213
211
-- we won't be able to accurately propagate dirtiness for this key
214
212
-- on the next build.
215
- void $
213
+ liftIO $ void $
216
214
updateReverseDeps key db
217
215
(getResultDepsDefault mempty previousDeps)
218
216
deps
219
217
_ -> pure ()
220
- atomicallyNamed " compute and run hook" $ do
218
+ liftIO $ atomicallyNamed " compute and run hook" $ do
221
219
runHook
222
220
SMap. focus (updateStatus $ Clean res) key databaseValues
223
221
pure res
@@ -247,18 +245,6 @@ getKeysAndVisitAge db = do
247
245
getAge Result {resultVisited = Step s} = curr - s
248
246
return keysWithVisitAge
249
247
--------------------------------------------------------------------------------
250
- -- Lazy IO trick
251
-
252
- data Box a = Box { fromBox :: a }
253
-
254
- -- | Split an IO computation into an unsafe lazy value and a forcing computation
255
- splitIO :: IO a -> (IO () , a )
256
- splitIO act = do
257
- let act2 = Box <$> act
258
- let res = unsafePerformIO act2
259
- (void $ evaluate res, fromBox res)
260
-
261
- --------------------------------------------------------------------------------
262
248
-- Reverse dependencies
263
249
264
250
-- | Update the reverse dependencies of an Id
@@ -301,14 +287,29 @@ transitiveDirtySet database = flip State.execStateT mempty . traverse_ loop
301
287
302
288
-- | A simple monad to implement cancellation on top of 'Async',
303
289
-- generalizing 'withAsync' to monadic scopes.
304
- newtype AIO a = AIO { unAIO :: ReaderT (IORef [Async () ]) IO a }
290
+ newtype AIO a = AIO { unAIO :: ReaderT (TVar [Async () ]) IO a }
305
291
deriving newtype (Applicative , Functor , Monad , MonadIO )
306
292
293
+ data AsyncParentKill = AsyncParentKill ThreadId Step
294
+ deriving (Show , Eq )
295
+
296
+ instance Exception AsyncParentKill where
297
+ toException = asyncExceptionToException
298
+ fromException = asyncExceptionFromException
299
+
307
300
-- | Run the monadic computation, cancelling all the spawned asyncs if an exception arises
308
- runAIO :: AIO a -> IO a
309
- runAIO (AIO act) = do
310
- asyncs <- newIORef []
311
- runReaderT act asyncs `onException` cleanupAsync asyncs
301
+ runAIO :: Step -> AIO a -> IO a
302
+ runAIO s (AIO act) = do
303
+ asyncsRef <- newTVarIO []
304
+ -- Log the exact exception (including async exceptions) before cleanup,
305
+ -- then rethrow to preserve previous semantics.
306
+ runReaderT act asyncsRef `onException` do
307
+ asyncs <- atomically $ do
308
+ r <- readTVar asyncsRef
309
+ modifyTVar' asyncsRef $ const []
310
+ return r
311
+ tid <- myThreadId
312
+ cleanupAsync asyncs tid s
312
313
313
314
-- | Like 'async' but with built-in cancellation.
314
315
-- Returns an IO action to wait on the result.
@@ -319,27 +320,25 @@ asyncWithCleanUp act = do
319
320
-- mask to make sure we keep track of the spawned async
320
321
liftIO $ uninterruptibleMask $ \ restore -> do
321
322
a <- async $ restore io
322
- atomicModifyIORef'_ st (void a : )
323
+ atomically $ modifyTVar' st (void a : )
323
324
return $ wait a
324
325
325
326
unliftAIO :: AIO a -> AIO (IO a )
326
327
unliftAIO act = do
327
328
st <- AIO ask
328
329
return $ runReaderT (unAIO act) st
329
330
330
- newtype RunInIO = RunInIO (forall a . AIO a -> IO a )
331
+ instance MonadUnliftIO AIO where
332
+ withRunInIO k = do
333
+ st <- AIO ask
334
+ liftIO $ k (\ aio -> runReaderT (unAIO aio) st)
331
335
332
- withRunInIO :: (RunInIO -> AIO b ) -> AIO b
333
- withRunInIO k = do
334
- st <- AIO ask
335
- k $ RunInIO (\ aio -> runReaderT (unAIO aio) st)
336
-
337
- cleanupAsync :: IORef [Async a ] -> IO ()
336
+ cleanupAsync :: [Async a ] -> ThreadId -> Step -> IO ()
338
337
-- mask to make sure we interrupt all the asyncs
339
- cleanupAsync ref = uninterruptibleMask $ \ unmask -> do
340
- asyncs <- atomicModifyIORef' ref ([] ,)
338
+ cleanupAsync asyncs tid step = uninterruptibleMask $ \ unmask -> do
341
339
-- interrupt all the asyncs without waiting
342
- mapM_ (\ a -> throwTo (asyncThreadId a) AsyncCancelled ) asyncs
340
+ -- mapM_ (\a -> throwTo (asyncThreadId a) AsyncCancelled) asyncs
341
+ mapM_ (\ a -> throwTo (asyncThreadId a) $ AsyncParentKill tid step) asyncs
343
342
-- Wait until all the asyncs are done
344
343
-- But if it takes more than 10 seconds, log to stderr
345
344
unless (null asyncs) $ do
@@ -348,32 +347,3 @@ cleanupAsync ref = uninterruptibleMask $ \unmask -> do
348
347
traceM " cleanupAsync: waiting for asyncs to finish"
349
348
withAsync warnIfTakingTooLong $ \ _ ->
350
349
mapM_ waitCatch asyncs
351
-
352
- data Wait
353
- = Wait { justWait :: ! (IO () )}
354
- | Spawn { justWait :: ! (IO () )}
355
-
356
- fmapWait :: (IO () -> IO () ) -> Wait -> Wait
357
- fmapWait f (Wait io) = Wait (f io)
358
- fmapWait f (Spawn io) = Spawn (f io)
359
-
360
- waitOrSpawn :: Wait -> IO (Either (IO () ) (Async () ))
361
- waitOrSpawn (Wait io) = pure $ Left io
362
- waitOrSpawn (Spawn io) = Right <$> async io
363
-
364
- waitConcurrently_ :: [Wait ] -> AIO ()
365
- waitConcurrently_ [] = pure ()
366
- waitConcurrently_ [one] = liftIO $ justWait one
367
- waitConcurrently_ many = do
368
- ref <- AIO ask
369
- -- spawn the async computations.
370
- -- mask to make sure we keep track of all the asyncs.
371
- (asyncs, syncs) <- liftIO $ uninterruptibleMask $ \ unmask -> do
372
- waits <- liftIO $ traverse (waitOrSpawn . fmapWait unmask) many
373
- let (syncs, asyncs) = partitionEithers waits
374
- liftIO $ atomicModifyIORef'_ ref (asyncs ++ )
375
- return (asyncs, syncs)
376
- -- work on the sync computations
377
- liftIO $ sequence_ syncs
378
- -- wait for the async computations before returning
379
- liftIO $ traverse_ wait asyncs
0 commit comments