@@ -5,45 +5,47 @@ module Data.Conduit.Process.Effectful
55 ( sourceProcessWithStreams
66 ) where
77
8- import Control.Concurrent.Async (Concurrently (.. ), runConcurrently )
9- import Control.Exception (finally , onException )
10- import Data.ByteString (ByteString )
11- import Data.Conduit (ConduitT , runConduit , (.|) )
12- import Data.Conduit.Process (StreamingProcessHandle ,
13- streamingProcess ,
14- streamingProcessHandleRaw ,
15- waitForStreamingProcess )
16- import Data.Void (Void )
17- import Effectful (Eff , IOE , Limit (Unlimited ),
18- Persistence (Ephemeral ),
19- UnliftStrategy (ConcUnlift ),
20- withEffToIO , (:>) )
21- import System.Exit (ExitCode )
22- import System.Process (CreateProcess , terminateProcess )
8+ import Data.ByteString (ByteString )
9+ import Data.Conduit (ConduitT , runConduit , (.|) )
10+ import Data.Conduit.Process (StreamingProcessHandle ,
11+ streamingProcess ,
12+ streamingProcessHandleRaw ,
13+ waitForStreamingProcess )
14+ import Data.Void (Void )
15+ import Effectful (Eff , IOE , Limit (Unlimited ),
16+ Persistence (Ephemeral ),
17+ UnliftStrategy (ConcUnlift ),
18+ liftIO , withEffToIO , (:>) )
19+ import Effectful.Concurrent.Async (Concurrent , Concurrently (.. ),
20+ runConcurrently )
21+ import Effectful.Exception (finally , onException )
22+ import Effectful.Process (CreateProcess , Process ,
23+ terminateProcess )
24+ import System.Exit (ExitCode )
2325
2426sourceProcessWithStreams ::
25- (IOE :> es )
27+ (Process :> es , Concurrent :> es , IOE :> es )
2628 => CreateProcess
2729 -> ConduitT () ByteString (Eff es ) ()
2830 -> ConduitT ByteString Void (Eff es ) a
2931 -> ConduitT ByteString Void (Eff es ) b
3032 -> Eff es (ExitCode , a , b )
31- sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
32- withEffToIO (ConcUnlift Ephemeral Unlimited ) $ \ u -> do
33- ((sinkStdin, closeStdin), (sourceStdout, closeStdout), (sourceStderr, closeStderr), sph) <-
34- streamingProcess cp
35- (_, resStdout, resStderr) <-
36- runConcurrently
37- ((,,)
38- <$> Concurrently
39- (u (runConduit $ producerStdin .| sinkStdin)
40- `finally` closeStdin)
41- <*> Concurrently (u $ runConduit $ sourceStdout .| consumerStdout)
42- <*> Concurrently (u $ runConduit $ sourceStderr .| consumerStderr))
43- `finally` (closeStdout >> closeStderr)
44- `onException` terminateStreamingProcess sph
45- ec <- waitForStreamingProcess sph
46- return (ec, resStdout, resStderr)
33+ sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr = do
34+ ((sinkStdin, closeStdin), (sourceStdout, closeStdout), (sourceStderr, closeStderr), sph) <-
35+ streamingProcess cp
36+ (_, resStdout, resStderr) <-
37+ runConcurrently
38+ ((,,)
39+ <$> Concurrently
40+ (runConduit (producerStdin .| sinkStdin) `finally` closeStdin)
41+ <*> Concurrently (runConduit (sourceStdout .| consumerStdout))
42+ <*> Concurrently
43+ (runConduit (sourceStderr .| consumerStderr)
44+ `finally` (closeStdout >> closeStderr)))
45+ `onException` terminateStreamingProcess sph
46+ ec <- waitForStreamingProcess sph
47+ return (ec, resStdout, resStderr)
4748
48- terminateStreamingProcess :: StreamingProcessHandle -> IO ()
49+ terminateStreamingProcess ::
50+ (Process :> es ) => StreamingProcessHandle -> Eff es ()
4951terminateStreamingProcess = terminateProcess . streamingProcessHandleRaw
0 commit comments