Skip to content

Commit db78d77

Browse files
stolyarolehOleh Stolyar
authored andcommitted
hnix-store-remote: use pipes to stream logs..
..instead of collecting them in memory and then returning everything at once. I did this by changing the `MonadStore` to: ```haskell type MonadStore a = ExceptT Error (Producer Logger (ReaderT Socket IO)) a ``` It's a `Producer` of log messages that can fail with an `Error`: ```haskell data Error = LogError Int LBS.ByteString | ParseError String | ConnError String ``` `runStore` can now accept a `Consumer Logger IO (Either Error a)` parameter to consume logs incrementally. I added a `runStore_` that simply discards all logs. Similarly, I changed the `runOpArgs` and `runOp`: now they accept a `Get a` parser that is used to get the result. `runOp_` and `runOpArgs_` assume that op does not return anything interesting.
1 parent 58837c3 commit db78d77

File tree

5 files changed

+117
-140
lines changed

5 files changed

+117
-140
lines changed

hnix-store-remote/src/System/Nix/Store/Remote.hs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@ type RepairFlag = Bool
2222
type CheckFlag = Bool
2323

2424
syncWithGC :: MonadStore ()
25-
syncWithGC = void $ simpleOp SyncWithGC
25+
syncWithGC = runOp_ SyncWithGC
2626

2727
optimiseStore :: MonadStore ()
28-
optimiseStore = void $ simpleOp OptimiseStore
28+
optimiseStore = runOp_ OptimiseStore
2929

30-
-- returns True on errors
31-
verifyStore :: CheckFlag -> RepairFlag -> MonadStore Bool
32-
verifyStore check repair = simpleOpArgs VerifyStore $ do
30+
verifyStore :: CheckFlag -> RepairFlag -> MonadStore ()
31+
verifyStore check repair = runOpArgs_ VerifyStore $ do
3332
putBool check
3433
putBool repair

hnix-store-remote/src/System/Nix/Store/Remote/Logger.hs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
module System.Nix.Store.Remote.Logger (
22
Logger(..)
33
, Field(..)
4-
, processOutput)
5-
where
4+
, streamLogs
5+
) where
66

7+
import Control.Monad.Except (throwError)
8+
import Control.Monad (replicateM)
79
import Control.Monad.Reader (ask, liftIO)
810
import Data.Binary.Get
9-
1011
import Network.Socket.ByteString (recv)
11-
12+
import Pipes (lift, yield)
1213
import System.Nix.Store.Remote.Types
1314
import System.Nix.Util
1415

@@ -26,30 +27,38 @@ controlParser = do
2627
0x52534c54 -> Result <$> getInt <*> getInt <*> getFields
2728
x -> fail $ "Invalid control message received:" ++ show x
2829

29-
processOutput :: MonadStore [Logger]
30-
processOutput = go decoder
31-
where decoder = runGetIncremental controlParser
32-
go :: Decoder Logger -> MonadStore [Logger]
33-
go (Done _leftover _consumed ctrl) = do
34-
case ctrl of
35-
e@(Error _ _) -> return [e]
36-
Last -> return [Last]
37-
-- we should probably handle Read here as well
38-
x -> do
39-
next <- go decoder
40-
return $ x:next
41-
go (Partial k) = do
42-
soc <- ask
43-
chunk <- liftIO (Just <$> recv soc 8)
44-
go (k chunk)
30+
logger :: Logger -> MonadStore ()
31+
logger = lift . yield
32+
33+
streamLogs :: MonadStore ()
34+
streamLogs = go decoder
35+
where
36+
go :: Decoder Logger -> MonadStore ()
37+
go (Done _leftover _consumed ctrl) = do
38+
case ctrl of
39+
e@(Error status err) -> do
40+
logger e
41+
throwError (LogError status err)
42+
Last ->
43+
logger Last
44+
-- we should probably handle Read here as well
45+
x -> do
46+
logger x
47+
go decoder
48+
go (Partial cont) = do
49+
soc <- ask
50+
chunk <- liftIO (recv soc 8)
51+
go (cont (Just chunk))
52+
go (Fail _leftover _consumed msg) =
53+
throwError (ParseError msg)
4554

46-
go (Fail _leftover _consumed msg) = do
47-
error msg
55+
decoder :: Decoder Logger
56+
decoder = runGetIncremental controlParser
4857

4958
getFields :: Get [Field]
5059
getFields = do
51-
cnt <- getInt
52-
sequence $ replicate cnt getField
60+
count <- getInt
61+
replicateM count getField
5362

5463
getField :: Get Field
5564
getField = do

hnix-store-remote/src/System/Nix/Store/Remote/Protocol.hs

Lines changed: 50 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,25 @@
1+
{-# LANGUAGE TypeApplications #-}
12
module System.Nix.Store.Remote.Protocol (
23
WorkerOp(..)
3-
, simpleOp
4-
, simpleOpArgs
54
, runOp
5+
, runOp_
66
, runOpArgs
7-
, runStore) where
7+
, runOpArgs_
8+
, runStore
9+
, runStore_
10+
) where
811

9-
import Control.Exception (bracket)
10-
import Control.Monad.Except
12+
import Control.Exception (SomeException, bracket, catch, displayException)
13+
import Control.Monad.Except (throwError, runExceptT)
1114
import Control.Monad.Reader
12-
import Control.Monad.State
1315

14-
import Data.Binary.Get
1516
import Data.Binary.Put
16-
import qualified Data.ByteString.Char8 as BSC
17-
import qualified Data.ByteString.Lazy as LBS
17+
import Data.Binary.Get
18+
19+
import Network.Socket
1820

19-
import Network.Socket hiding (send, sendTo, recv, recvFrom)
20-
import Network.Socket.ByteString (recv)
21+
import Pipes
22+
import qualified Pipes.Prelude as Pipes
2123

2224
import System.Nix.Store.Remote.Logger
2325
import System.Nix.Store.Remote.Types
@@ -106,65 +108,57 @@ opNum NarFromPath = 38
106108
opNum AddToStoreNar = 39
107109
opNum QueryMissing = 40
108110

111+
runOp :: WorkerOp -> Get a -> MonadStore a
112+
runOp op result = runOpArgs op mempty result
109113

110-
simpleOp :: WorkerOp -> MonadStore Bool
111-
simpleOp op = do
112-
simpleOpArgs op $ return ()
113-
114-
simpleOpArgs :: WorkerOp -> Put -> MonadStore Bool
115-
simpleOpArgs op args = do
116-
runOpArgs op args
117-
err <- gotError
118-
case err of
119-
True -> do
120-
Error _num msg <- head <$> getError
121-
throwError $ BSC.unpack $ LBS.toStrict msg
122-
False -> do
123-
sockGetBool
124-
125-
runOp :: WorkerOp -> MonadStore ()
126-
runOp op = runOpArgs op $ return ()
127-
128-
runOpArgs :: WorkerOp -> Put -> MonadStore ()
129-
runOpArgs op args = do
130-
131-
-- Temporary hack for printing the messages destined for nix-daemon socket
132-
when False $
133-
liftIO $ LBS.writeFile "mytestfile2" $ runPut $ do
134-
putInt $ opNum op
135-
args
114+
runOp_ :: WorkerOp -> MonadStore ()
115+
runOp_ op = runOp op (skip 8)
136116

117+
runOpArgs :: WorkerOp -> Put -> Get a -> MonadStore a
118+
runOpArgs op args result = do
137119
sockPut $ do
138-
putInt $ opNum op
120+
putInt (opNum op)
139121
args
122+
streamLogs
123+
sockGet result
140124

141-
out <- processOutput
142-
modify (++out)
143-
err <- gotError
144-
when err $ do
145-
Error _num msg <- head <$> getError
146-
throwError $ BSC.unpack $ LBS.toStrict msg
125+
runOpArgs_ :: WorkerOp -> Put -> MonadStore ()
126+
runOpArgs_ op args = runOpArgs op args (skip 8)
147127

148-
runStore :: MonadStore a -> IO (Either String a, [Logger])
149-
runStore code = do
150-
bracket (open sockPath) close run
128+
runStore :: Consumer Logger IO (Either Error a) -> MonadStore a -> IO (Either Error a)
129+
runStore sink code =
130+
bracket (open sockPath) close run `catch` onException
151131
where
152132
open path = do
153-
soc <- socket AF_UNIX Stream 0
154-
connect soc (SockAddrUnix path)
155-
return soc
133+
sock <- socket AF_UNIX Stream 0
134+
connect sock (SockAddrUnix path)
135+
return sock
136+
156137
greet = do
157138
sockPut $ putInt workerMagic1
158-
soc <- ask
159-
vermagic <- liftIO $ recv soc 16
160-
let (magic2, daemonProtoVersion) = flip runGet (LBS.fromStrict vermagic) $ (,) <$> getInt <*> getInt
161-
unless (magic2 == workerMagic2) $ error "Worker magic 2 mismatch"
139+
140+
magic2 <- sockGetInt
141+
_ <- sockGetInt -- daemonVersion
142+
143+
unless (magic2 == workerMagic2) $
144+
throwError (ConnError "Worker magic 2 mismatch")
162145

163146
sockPut $ putInt protoVersion -- clientVersion
164147
sockPut $ putInt (0 :: Int) -- affinity
165148
sockPut $ putInt (0 :: Int) -- obsolete reserveSpace
166149

167-
processOutput
150+
streamLogs -- receive startup error messages, if any
168151

169152
run sock =
170-
flip runReaderT sock $ flip runStateT [] $ runExceptT (greet >> code)
153+
let producer =
154+
runExceptT $ do
155+
greet
156+
code
157+
effect = producer >-> hoist liftIO sink
158+
in runReaderT (runEffect effect) sock
159+
160+
onException :: SomeException -> IO (Either Error a)
161+
onException = return . Left . ConnError . displayException
162+
163+
runStore_ :: MonadStore a -> IO (Either Error a)
164+
runStore_ = runStore Pipes.drain
Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
module System.Nix.Store.Remote.Types (
22
MonadStore
3+
, Error(..)
34
, Logger(..)
45
, Field(..)
5-
, getLog
6-
, flushLog
7-
, gotError
8-
, getError) where
6+
) where
97

10-
11-
import qualified Data.ByteString.Lazy as LBS
12-
import Network.Socket (Socket)
138
import Control.Monad.Except
149
import Control.Monad.Reader
15-
import Control.Monad.State
10+
import qualified Data.ByteString.Lazy as LBS
11+
import Network.Socket (Socket)
12+
import Pipes
13+
14+
data Error =
15+
LogError Int LBS.ByteString
16+
| ParseError String
17+
| ConnError String
18+
deriving (Eq, Show)
1619

17-
type MonadStore a = ExceptT String (StateT [Logger] (ReaderT Socket IO)) a
20+
type MonadStore a = ExceptT Error (Producer Logger (ReaderT Socket IO)) a
1821

1922
type ActivityID = Int
2023
type ActivityParentID = Int
@@ -35,19 +38,3 @@ data Logger =
3538
| StopActivity ActivityID
3639
| Result ActivityID ResultType [Field]
3740
deriving (Eq, Ord, Show)
38-
39-
isError :: Logger -> Bool
40-
isError (Error _ _) = True
41-
isError _ = False
42-
43-
gotError :: MonadStore Bool
44-
gotError = any isError <$> get
45-
46-
getError :: MonadStore [Logger]
47-
getError = filter isError <$> get
48-
49-
getLog :: MonadStore [Logger]
50-
getLog = get
51-
52-
flushLog :: MonadStore ()
53-
flushLog = put []

hnix-store-remote/src/System/Nix/Store/Remote/Util.hs

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,47 @@
11
module System.Nix.Store.Remote.Util where
22

3-
import Control.Monad.Reader
4-
5-
import Data.Maybe
3+
import Control.Monad.Except (throwError)
4+
import Control.Monad.Reader (ask, liftIO)
65
import Data.Binary.Get
76
import Data.Binary.Put
87
import Data.Text (Text)
98
import qualified Data.Text as T
10-
import qualified Data.ByteString as B
119
import qualified Data.ByteString.Char8 as BSC
1210
import qualified Data.ByteString.Lazy as LBS
13-
import qualified Data.HashSet as HashSet
1411

1512
import Network.Socket.ByteString (recv, sendAll)
1613

1714
import System.Nix.Store.Remote.Types
18-
import System.Nix.Hash
1915
import System.Nix.Util
2016

21-
22-
genericIncremental :: (MonadIO m) => m (Maybe B.ByteString) -> Get a -> m a
23-
genericIncremental getsome parser = go decoder
24-
where decoder = runGetIncremental parser
25-
go (Done _leftover _consumed x) = do
26-
return x
27-
go (Partial k) = do
28-
chunk <- getsome
29-
go (k chunk)
30-
go (Fail _leftover _consumed msg) = do
31-
error msg
32-
33-
getSocketIncremental :: Get a -> MonadStore a
34-
getSocketIncremental = genericIncremental sockGet
35-
3617
sockPut :: Put -> MonadStore ()
3718
sockPut p = do
3819
soc <- ask
3920
liftIO $ sendAll soc $ LBS.toStrict $ runPut p
4021

41-
sockGet :: MonadStore (Maybe BSC.ByteString)
42-
sockGet = do
43-
soc <- ask
44-
liftIO $ Just <$> recv soc 8
45-
46-
sockGetInt :: Integral a => MonadStore a
47-
sockGetInt = getSocketIncremental getInt
22+
sockGet :: Get a -> MonadStore a
23+
sockGet = go . runGetIncremental
24+
where
25+
go :: Decoder a -> MonadStore a
26+
go (Done _leftover _consumed x) = return x
27+
go (Partial cont) = do
28+
sock <- ask
29+
chunk <- liftIO (recv sock 8)
30+
go (cont (Just chunk))
31+
go (Fail _leftover _consumed msg) =
32+
throwError (ParseError msg)
33+
34+
sockGetInt :: MonadStore Int
35+
sockGetInt = sockGet getInt
4836

4937
sockGetBool :: MonadStore Bool
5038
sockGetBool = (== (1 :: Int)) <$> sockGetInt
5139

5240
sockGetStr :: MonadStore LBS.ByteString
53-
sockGetStr = getSocketIncremental getByteStringLen
41+
sockGetStr = sockGet getByteStringLen
5442

5543
sockGetStrings :: MonadStore [LBS.ByteString]
56-
sockGetStrings = getSocketIncremental getByteStrings
44+
sockGetStrings = sockGet getByteStrings
5745

5846
lBSToText :: LBS.ByteString -> Text
5947
lBSToText = T.pack . BSC.unpack . LBS.toStrict

0 commit comments

Comments
 (0)