Skip to content

Commit 2634926

Browse files
refactoring sourceProcessWithStreams to avoid having to use IOE
1 parent 9e8d029 commit 2634926

File tree

7 files changed

+198
-50
lines changed

7 files changed

+198
-50
lines changed

app/Main.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module Main where
2020
import Control.Monad (when)
2121
import Control.Monad.IO.Class (liftIO)
2222
import Data.HashSet (toList)
23-
import Effectful (Eff, IOE, runEff, (:>))
23+
import Effectful (Eff, runEff, (:>))
2424
import Effectful.Environment (runEnvironment)
2525
import Effectful.Exception (finally)
2626
import Effectful.FileSystem (FileSystem, runFileSystem)
@@ -45,7 +45,6 @@ builder ::
4545
, Reader Args :> es
4646
, Time :> es
4747
, Concurrent :> es
48-
, IOE :> es
4948
)
5049
=> Eff es ()
5150
builder = do
Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,81 @@
1-
{-# LANGUAGE FlexibleContexts #-}
2-
{-# LANGUAGE TypeOperators #-}
1+
{-# LANGUAGE AllowAmbiguousTypes #-}
2+
{-# LANGUAGE DataKinds #-}
3+
{-# LANGUAGE FlexibleContexts #-}
4+
{-# LANGUAGE FlexibleInstances #-}
5+
{-# LANGUAGE TypeOperators #-}
6+
{-# LANGUAGE TypeFamilies #-}
37

48
module Data.Conduit.Process.Effectful
59
( sourceProcessWithStreams
610
) where
711

8-
import Data.ByteString (ByteString)
9-
import Data.Conduit (ConduitT, runConduit, (.|))
10-
import Data.Conduit.Process (StreamingProcessHandle,
11-
streamingProcess,
12-
streamingProcessHandleRaw,
13-
waitForStreamingProcess)
14-
import Data.Void (Void)
15-
import Effectful (Eff, IOE, Limit (Unlimited),
16-
Persistence (Ephemeral),
17-
UnliftStrategy (ConcUnlift),
18-
liftIO, withEffToIO, (:>))
19-
import Effectful.Concurrent.Async (Concurrent, Concurrently (..),
20-
runConcurrently)
21-
import Effectful.Exception (finally, onException)
22-
import Effectful.Process (CreateProcess, Process,
23-
terminateProcess)
24-
import System.Exit (ExitCode)
12+
import Control.Monad.Trans (lift)
13+
import Data.ByteString (ByteString)
14+
import qualified Data.ByteString as BS (null)
15+
import Data.ByteString.Builder.Extra (defaultChunkSize)
16+
import Data.Conduit (ConduitT, awaitForever,
17+
runConduit, yield, (.|))
18+
import Data.Functor (($>))
19+
import Data.Streaming.Process.Effectful (InputSource, OutputSink,
20+
StreamingProcessHandle,
21+
isStdStream, osStdStream,
22+
streamingProcess,
23+
streamingProcessHandleRaw,
24+
waitForStreamingProcess)
25+
import Data.Void (Void)
26+
import Effectful (Eff,
27+
Limit (Unlimited),
28+
Persistence (Ephemeral),
29+
UnliftStrategy (ConcUnlift),
30+
liftIO, withEffToIO, (:>))
31+
import Effectful.Concurrent (Concurrent)
32+
import Effectful.Concurrent.Async (Concurrently (..),
33+
runConcurrently)
34+
import Effectful.Exception (finally, onException)
35+
import Effectful.FileSystem (FileSystem)
36+
import Effectful.FileSystem.IO (BufferMode (NoBuffering),
37+
Handle, hClose,
38+
hSetBuffering)
39+
import Effectful.FileSystem.IO.ByteString (hGetSome, hPut)
40+
import Effectful.Process (CreateProcess, Process,
41+
StdStream (CreatePipe),
42+
terminateProcess)
43+
import System.Exit (ExitCode)
44+
45+
instance (FileSystem :> es) => InputSource (ConduitT ByteString o (Eff es) ()) where
46+
isStdStream =
47+
(\(Just h) -> hSetBuffering h NoBuffering $> sinkHandle h, Just CreatePipe)
48+
49+
instance (FileSystem :> es, r ~ (), r' ~ ()) =>
50+
InputSource (ConduitT ByteString o (Eff es) r, Eff es r') where
51+
isStdStream =
52+
( \(Just h) -> hSetBuffering h NoBuffering $> (sinkHandle h, hClose h)
53+
, Just CreatePipe)
54+
55+
instance (FileSystem :> es) => OutputSink (ConduitT i ByteString (Eff es) ()) where
56+
osStdStream =
57+
( \(Just h) -> hSetBuffering h NoBuffering $> sourceHandle h
58+
, Just CreatePipe)
59+
60+
instance (FileSystem :> es, r ~ (), r' ~ ()) =>
61+
OutputSink (ConduitT i ByteString (Eff es) r, Eff es r') where
62+
osStdStream =
63+
( \(Just h) -> hSetBuffering h NoBuffering $> (sourceHandle h, hClose h)
64+
, Just CreatePipe)
65+
66+
sinkHandle :: (FileSystem :> es) => Handle -> ConduitT ByteString o (Eff es) ()
67+
sinkHandle h = awaitForever (lift . hPut h)
68+
69+
sourceHandle ::
70+
(FileSystem :> es) => Handle -> ConduitT i ByteString (Eff es) ()
71+
sourceHandle h = do
72+
bs <- lift $ hGetSome h defaultChunkSize
73+
if BS.null bs
74+
then pure ()
75+
else yield bs >> sourceHandle h
2576

2677
sourceProcessWithStreams ::
27-
(Process :> es, Concurrent :> es, IOE :> es)
78+
(Process :> es, Concurrent :> es, FileSystem :> es)
2879
=> CreateProcess
2980
-> ConduitT () ByteString (Eff es) ()
3081
-> ConduitT ByteString Void (Eff es) a
@@ -47,5 +98,5 @@ sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr = do
4798
return (ec, resStdout, resStderr)
4899

49100
terminateStreamingProcess ::
50-
(Process :> es) => StreamingProcessHandle -> Eff es ()
101+
(Process :> es) => StreamingProcessHandle es -> Eff es ()
51102
terminateStreamingProcess = terminateProcess . streamingProcessHandleRaw
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE TypeOperators #-}
3+
4+
module Data.Streaming.Process.Effectful
5+
( StreamingProcessHandle
6+
, streamingProcess
7+
, streamingProcessHandleRaw
8+
, waitForStreamingProcess
9+
, InputSource
10+
, isStdStream
11+
, OutputSink
12+
, osStdStream
13+
) where
14+
15+
import Control.Exception (throw)
16+
import Data.Maybe (fromMaybe)
17+
import Effectful (Eff, (:>))
18+
import Effectful.Concurrent (Concurrent, forkIOWithUnmask)
19+
import Effectful.Concurrent.STM (STM, TMVar, atomically,
20+
newEmptyTMVarIO, putTMVar, readTMVar)
21+
import Effectful.Exception (SomeException, finally, try)
22+
import Effectful.FileSystem.IO (FileSystem, Handle, hClose)
23+
import Effectful.Process (CreateProcess (std_err, std_in, std_out),
24+
Process, ProcessHandle,
25+
StdStream (CreatePipe),
26+
createProcess_, waitForProcess)
27+
import System.Exit (ExitCode)
28+
29+
class InputSource a where
30+
isStdStream ::
31+
(FileSystem :> es) => (Maybe Handle -> Eff es a, Maybe StdStream)
32+
33+
instance InputSource Handle where
34+
isStdStream = (\(Just h) -> pure h, Just CreatePipe)
35+
36+
class OutputSink a where
37+
osStdStream :: (FileSystem :> es) => (Maybe Handle -> Eff es a, Maybe StdStream)
38+
39+
instance OutputSink Handle where
40+
osStdStream = (\(Just h) -> pure h, Just CreatePipe)
41+
42+
data StreamingProcessHandle es =
43+
StreamingProcessHandle ProcessHandle (TMVar ExitCode) (Eff es ())
44+
45+
streamingProcess ::
46+
( InputSource stdin
47+
, OutputSink stdout
48+
, OutputSink stderr
49+
, Process :> es
50+
, Concurrent :> es
51+
, FileSystem :> es
52+
)
53+
=> CreateProcess
54+
-> Eff es (stdin, stdout, stderr, StreamingProcessHandle es)
55+
streamingProcess cp = do
56+
let (getStdin, stdinStream) = isStdStream
57+
(getStdout, stdoutStream) = osStdStream
58+
(getStderr, stderrStream) = osStdStream
59+
(stdinH, stdoutH, stderrH, ph) <-
60+
createProcess_
61+
"streamingProcess"
62+
cp
63+
{ std_in = fromMaybe (std_in cp) stdinStream
64+
, std_out = fromMaybe (std_out cp) stdoutStream
65+
, std_err = fromMaybe (std_err cp) stderrStream
66+
}
67+
ec <- newEmptyTMVarIO
68+
-- Apparently waitForProcess can throw an exception itself when
69+
-- delegate_ctlc is True, so to avoid this TMVar from being left empty, we
70+
-- capture any exceptions and store them as an impure exception in the
71+
-- TMVar
72+
_ <-
73+
forkIOWithUnmask $ \_unmask ->
74+
try (waitForProcess ph)
75+
>>= atomically . putTMVar ec . either (throw :: SomeException -> a) id
76+
let close = mclose stdinH `finally` mclose stdoutH `finally` mclose stderrH
77+
where
78+
mclose = maybe (return ()) hClose
79+
(,,,)
80+
<$> getStdin stdinH
81+
<*> getStdout stdoutH
82+
<*> getStderr stderrH
83+
<*> return (StreamingProcessHandle ph ec close)
84+
85+
streamingProcessHandleRaw :: StreamingProcessHandle es -> ProcessHandle
86+
streamingProcessHandleRaw (StreamingProcessHandle ph _ _) = ph
87+
88+
-- Since 0.1.4
89+
waitForStreamingProcess ::
90+
(Concurrent :> es) => StreamingProcessHandle es -> Eff es ExitCode
91+
waitForStreamingProcess = atomically . waitForStreamingProcessSTM
92+
93+
-- | STM version of @waitForStreamingProcess@.
94+
--
95+
-- Since 0.1.4
96+
waitForStreamingProcessSTM :: StreamingProcessHandle es -> STM ExitCode
97+
waitForStreamingProcessSTM = readTMVar . streamingProcessHandleTMVar
98+
99+
streamingProcessHandleTMVar :: StreamingProcessHandle es -> TMVar ExitCode
100+
streamingProcessHandleTMVar (StreamingProcessHandle _ var _) = var

random-build.cabal

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,24 @@ common shared-properties
6262
, unordered-containers ^>=0.2.20
6363

6464
library conduit-effectful
65-
exposed-modules: Data.Conduit.Process.Effectful
65+
exposed-modules:
66+
Data.Conduit.Process.Effectful
67+
Data.Streaming.Process.Effectful
68+
6669
build-depends:
67-
, async ^>=2.2.5
68-
, base >=4.15.1.0 && <4.22
69-
, bytestring >=0.10.12.1 && <0.13
70-
, conduit ^>=1.3.6
71-
, conduit-extra ^>=1.3.7
72-
, effectful ^>=2.5.1.0
73-
, effectful-core ^>=2.5.0
74-
, process >=1.6.13.2 && <1.7
75-
, unliftio-core ^>=0.2.1.0
76-
77-
hs-source-dirs: eff
70+
, async ^>=2.2.5
71+
, base >=4.15.1.0 && <4.22
72+
, bytestring >=0.10.12.1 && <0.13
73+
, conduit ^>=1.3.6
74+
, conduit-extra ^>=1.3.7
75+
, effectful ^>=2.5.1.0
76+
, effectful-core ^>=2.5.0
77+
, mtl >=2.2.2 && <2.4
78+
, process >=1.6.13.2 && <1.7
79+
, streaming-commons ^>=0.2.3.0
80+
, unliftio-core ^>=0.2.1.0
81+
82+
hs-source-dirs: eff
7883
default-language: Haskell2010
7984

8085
library

src/GHRB/Core.hs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ module GHRB.Core
2525
) where
2626

2727
import Control.Applicative (many, optional, (<|>))
28-
import Control.Monad (void, unless)
28+
import Control.Monad (void)
2929
import qualified Data.ByteString as BS (ByteString)
3030
import qualified Data.ByteString.Char8 as BS (pack)
3131
import qualified Data.HashSet as Set (insert, member)
@@ -43,7 +43,6 @@ import Distribution.Portage.Types (Category (Category),
4343
unwrapPkgName)
4444
import FlatParse.Basic (Parser, Result (OK), char, eof,
4545
runParser, satisfy, string, anyChar)
46-
import qualified FlatParse.Basic as FP (failed)
4746
import GHRB.Core.Types (PackageSet, St (St), completed,
4847
downgrade, failed,
4948
tried, unresolved, untried, installed)

src/GHRB/IO.hs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import Data.List (uncons)
1515
import qualified Data.Text as T (unpack)
1616
import Data.Time.Clock (UTCTime)
1717
import Distribution.Portage.Types (Package)
18-
import Effectful (Eff, IOE, (:>))
18+
import Effectful (Eff, (:>))
1919
import Effectful.Concurrent (Concurrent)
2020
import Effectful.FileSystem (FileSystem)
2121
import Effectful.Process (Process,
@@ -87,8 +87,8 @@ runEmerge args pkg =
8787
""
8888

8989
runHaskellUpdater ::
90-
( IOE :> es
91-
, FileSystem :> es
90+
(
91+
FileSystem :> es
9292
, Process :> es
9393
, Reader Args :> es
9494
, Concurrent :> es
@@ -166,8 +166,8 @@ processIfNotDowngrade output = do
166166
else pure (PrelimEmergeSuccess, output)
167167

168168
install ::
169-
( IOE :> es
170-
, FileSystem :> es
169+
(
170+
FileSystem :> es
171171
, State St :> es
172172
, Reader Args :> es
173173
, Process :> es
@@ -292,7 +292,6 @@ randomBuild ::
292292
, Process :> es
293293
, Time :> es
294294
, Concurrent :> es
295-
, IOE :> es
296295
)
297296
=> Eff es Running
298297
randomBuild = do

src/GHRB/IO/Cmd.hs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import qualified Data.ByteString.Lazy as BL (ByteString)
1616
import Data.Conduit (ConduitT, (.|))
1717
import Data.Conduit.Process.Effectful (sourceProcessWithStreams)
1818
import Data.Void (Void)
19-
import Effectful (Eff, IOE, (:>))
19+
import Effectful (Eff, (:>))
2020
import Effectful.Concurrent (Concurrent)
2121
import Effectful.FileSystem (FileSystem)
2222
import Effectful.FileSystem.IO.ByteString as BS (hPut)
@@ -67,12 +67,7 @@ installedArgs = ["-I"]
6767
-- | Run a command and dump stdout to @stdout@, stderr to @stderr@, also
6868
-- capturing both streams.
6969
runTransparent ::
70-
( IOE :> es
71-
, FileSystem :> es
72-
, Reader Args :> es
73-
, Concurrent :> es
74-
, Process :> es
75-
)
70+
(FileSystem :> es, Reader Args :> es, Concurrent :> es, Process :> es)
7671
=> FilePath -- ^ executable path
7772
-> [String] -- ^ arguments
7873
-- | Exit code, stdout, stderr

0 commit comments

Comments
 (0)