Skip to content

Commit ae05ef5

Browse files
authored
Merge pull request #988 from phadej/list-is-pure-stream
Add toStreamGenerator (NonEmpty a) instance
2 parents 77ea599 + dcc67f3 commit ae05ef5

File tree

9 files changed

+153
-45
lines changed

9 files changed

+153
-45
lines changed

servant-client-core/servant-client-core.cabal

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ library
5252
, bytestring >= 0.10.4.0 && < 0.11
5353
, containers >= 0.5.5.1 && < 0.6
5454
, text >= 1.2.3.0 && < 1.3
55+
, transformers >= 0.3.0.0 && < 0.6
5556

5657
if !impl(ghc >= 8.0)
5758
build-depends:

servant-client-core/src/Servant/Client/Core/Internal/HasClient.hs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import Prelude.Compat
2121
import Control.Concurrent (newMVar, modifyMVar)
2222
import Data.Foldable (toList)
2323
import qualified Data.ByteString.Lazy as BL
24+
import Control.Monad.IO.Class (MonadIO (..))
2425
import Data.List (foldl')
2526
import Data.Proxy (Proxy (Proxy))
2627
import Data.Semigroup ((<>))
@@ -33,7 +34,7 @@ import Servant.API ((:<|>) ((:<|>)), (:>),
3334
AuthProtect, BasicAuth,
3435
BasicAuthData,
3536
BuildHeadersTo (..),
36-
BuildFromStream (..),
37+
FromResultStream (..),
3738
ByteStringParser (..),
3839
Capture', CaptureAll,
3940
Description, EmptyAPI,
@@ -283,18 +284,18 @@ instance OVERLAPPING_
283284
hoistClientMonad _ _ f ma = f ma
284285

285286
instance OVERLAPPABLE_
286-
( RunClient m, MimeUnrender ct a, ReflectMethod method,
287-
FramingUnrender framing a, BuildFromStream a (f a)
288-
) => HasClient m (Stream method status framing ct (f a)) where
287+
( RunClient m, MonadIO m, MimeUnrender ct a, ReflectMethod method,
288+
FramingUnrender framing a, FromResultStream a b
289+
) => HasClient m (Stream method status framing ct b) where
289290

290-
type Client m (Stream method status framing ct (f a)) = m (f a)
291+
type Client m (Stream method status framing ct b) = m b
291292

292293
clientWithRoute _pm Proxy req = do
293294
sresp <- streamingRequest req
294295
{ requestAccept = fromList [contentType (Proxy :: Proxy ct)]
295296
, requestMethod = reflectMethod (Proxy :: Proxy method)
296297
}
297-
return . buildFromStream $ ResultStream $ \k ->
298+
liftIO $ fromResultStream $ ResultStream $ \k ->
298299
runStreamingResponse sresp $ \gres -> do
299300
let reader = responseBody gres
300301
let unrender = unrenderFrames (Proxy :: Proxy framing) (Proxy :: Proxy a)

servant-client/test/Servant/StreamSpec.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ manager' = unsafePerformIO $ C.newManager C.defaultManagerSettings
107107
runClient :: ClientM a -> BaseUrl -> IO (Either ServantError a)
108108
runClient x baseUrl' = runClientM x (mkClientEnv manager' baseUrl')
109109

110-
runResultStream :: ResultStream a
110+
testRunResultStream :: ResultStream a
111111
-> IO ( Maybe (Either String a)
112112
, Maybe (Either String a)
113113
, Maybe (Either String a)
114114
, Maybe (Either String a))
115-
runResultStream (ResultStream k)
115+
testRunResultStream (ResultStream k)
116116
= k $ \act -> (,,,) <$> act <*> act <*> act <*> act
117117

118118
streamSpec :: Spec
@@ -122,13 +122,13 @@ streamSpec = beforeAll (CS.startWaiApp server) $ afterAll CS.endWaiApp $ do
122122
Right res <- runClient getGetNL baseUrl
123123
let jra = Just (Right alice)
124124
jrb = Just (Right bob)
125-
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
125+
testRunResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
126126

127127
it "works with Servant.API.StreamGet.Netstring" $ \(_, baseUrl) -> do
128128
Right res <- runClient getGetNS baseUrl
129129
let jra = Just (Right alice)
130130
jrb = Just (Right bob)
131-
runResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
131+
testRunResultStream res `shouldReturn` (jra, jrb, jra, Nothing)
132132

133133
it "streams in constant memory" $ \(_, baseUrl) -> do
134134
Right (ResultStream res) <- runClient getGetALot baseUrl

servant-docs/src/Servant/Docs/Internal.hs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,24 @@ instance OVERLAPPABLE_
840840
status = fromInteger $ natVal (Proxy :: Proxy status)
841841
p = Proxy :: Proxy a
842842

843+
-- | TODO: mention the endpoint is streaming, its framing strategy
844+
--
845+
-- Also there are no samples.
846+
instance OVERLAPPABLE_
847+
(MimeRender ct a, KnownNat status
848+
, ReflectMethod method)
849+
=> HasDocs (Stream method status framing ct a) where
850+
docsFor Proxy (endpoint, action) DocOptions{..} =
851+
single endpoint' action'
852+
853+
where endpoint' = endpoint & method .~ method'
854+
action' = action & response.respTypes .~ allMime t
855+
& response.respStatus .~ status
856+
t = Proxy :: Proxy '[ct]
857+
method' = reflectMethod (Proxy :: Proxy method)
858+
status = fromInteger $ natVal (Proxy :: Proxy status)
859+
p = Proxy :: Proxy a
860+
843861
instance OVERLAPPING_
844862
(ToSample a, AllMimeRender (ct ': cts) a, KnownNat status
845863
, ReflectMethod method, AllHeaderSamples ls, GetHeaders (HList ls))

servant-foreign/src/Servant/Foreign/Internal.hs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,20 @@ instance (Elem JSON list, HasForeignType lang ftype a, ReflectMethod method)
238238
method = reflectMethod (Proxy :: Proxy method)
239239
methodLC = toLower $ decodeUtf8 method
240240

241+
-- | TODO: doesn't taking framing into account.
242+
instance (ct ~ JSON, HasForeignType lang ftype a, ReflectMethod method)
243+
=> HasForeign lang ftype (Stream method status framing ct a) where
244+
type Foreign ftype (Stream method status framing ct a) = Req ftype
245+
246+
foreignFor lang Proxy Proxy req =
247+
req & reqFuncName . _FunctionName %~ (methodLC :)
248+
& reqMethod .~ method
249+
& reqReturnType .~ Just retType
250+
where
251+
retType = typeFor lang (Proxy :: Proxy ftype) (Proxy :: Proxy a)
252+
method = reflectMethod (Proxy :: Proxy method)
253+
methodLC = toLower $ decodeUtf8 method
254+
241255
instance (KnownSymbol sym, HasForeignType lang ftype (RequiredArgument mods a), HasForeign lang ftype api)
242256
=> HasForeign lang ftype (Header' mods sym a :> api) where
243257
type Foreign ftype (Header' mods sym a :> api) = Foreign ftype api

servant/src/Servant/API.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ import Servant.API.ResponseHeaders
114114
ResponseHeader (..), addHeader, getHeadersHList, getResponse,
115115
noHeader)
116116
import Servant.API.Stream
117-
(BoundaryStrategy (..), BuildFromStream (..),
117+
(BoundaryStrategy (..), FromResultStream (..),
118118
ByteStringParser (..), FramingRender (..),
119119
FramingUnrender (..), NetstringFraming, NewlineFraming,
120120
NoFraming, ResultStream (..), Stream, StreamGenerator (..),

servant/src/Servant/API/Internal/Test/ComprehensiveAPI.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
module Servant.API.Internal.Test.ComprehensiveAPI where
88

99
import Data.Proxy
10+
(Proxy (..))
1011
import Servant.API
1112

1213
type GET = Get '[JSON] NoContent
@@ -38,6 +39,7 @@ type ComprehensiveAPIWithoutRaw =
3839
Vault :> GET :<|>
3940
Verb 'POST 204 '[JSON] NoContent :<|>
4041
Verb 'POST 204 '[JSON] Int :<|>
42+
Stream 'GET 200 NetstringFraming JSON [Int] :<|>
4143
WithNamedContext "foo" '[] GET :<|>
4244
CaptureAll "foo" Int :> GET :<|>
4345
Summary "foo" :> GET :<|>

servant/src/Servant/API/Stream.hs

Lines changed: 104 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,48 @@
1-
{-# LANGUAGE DataKinds #-}
2-
{-# LANGUAGE DeriveDataTypeable #-}
3-
{-# LANGUAGE DeriveGeneric #-}
4-
{-# LANGUAGE FlexibleInstances #-}
1+
{-# LANGUAGE DataKinds #-}
2+
{-# LANGUAGE DeriveDataTypeable #-}
3+
{-# LANGUAGE DeriveGeneric #-}
4+
{-# LANGUAGE FlexibleInstances #-}
55
{-# LANGUAGE FunctionalDependencies #-}
6-
{-# LANGUAGE KindSignatures #-}
7-
{-# LANGUAGE MultiParamTypeClasses #-}
8-
{-# LANGUAGE OverloadedStrings #-}
9-
{-# LANGUAGE PolyKinds #-}
10-
{-# LANGUAGE RankNTypes #-}
11-
{-# LANGUAGE TupleSections #-}
6+
{-# LANGUAGE KindSignatures #-}
7+
{-# LANGUAGE MultiParamTypeClasses #-}
8+
{-# LANGUAGE OverloadedStrings #-}
9+
{-# LANGUAGE PolyKinds #-}
10+
{-# LANGUAGE RankNTypes #-}
11+
{-# LANGUAGE TupleSections #-}
1212
{-# OPTIONS_HADDOCK not-home #-}
1313

14-
module Servant.API.Stream where
14+
module Servant.API.Stream (
15+
Stream,
16+
StreamGet,
17+
StreamPost,
18+
-- * Sources
19+
--
20+
-- | Both 'StreamGenerator' and 'ResultStream' are equivalent
21+
-- to some *source* in streaming libraries.
22+
StreamGenerator (..),
23+
ToStreamGenerator (..),
24+
ResultStream (..),
25+
FromResultStream (..),
26+
-- * Framing
27+
FramingRender (..),
28+
FramingUnrender (..),
29+
BoundaryStrategy (..),
30+
ByteStringParser (..),
31+
-- ** Strategies
32+
NoFraming,
33+
NewlineFraming,
34+
NetstringFraming,
35+
) where
1536

1637
import Control.Arrow
1738
(first)
1839
import Data.ByteString.Lazy
1940
(ByteString, empty)
2041
import qualified Data.ByteString.Lazy.Char8 as LB
42+
import Data.Foldable
43+
(traverse_)
44+
import Data.List.NonEmpty
45+
(NonEmpty (..))
2146
import Data.Monoid
2247
((<>))
2348
import Data.Proxy
@@ -30,35 +55,82 @@ import GHC.TypeLits
3055
(Nat)
3156
import Network.HTTP.Types.Method
3257
(StdMethod (..))
58+
import System.IO.Unsafe
59+
(unsafeInterleaveIO)
3360
import Text.Read
3461
(readMaybe)
3562

36-
-- | A Stream endpoint for a given method emits a stream of encoded values at a given Content-Type, delimited by a framing strategy. Stream endpoints always return response code 200 on success. Type synonyms are provided for standard methods.
63+
-- | A Stream endpoint for a given method emits a stream of encoded values at a
64+
-- given Content-Type, delimited by a framing strategy. Stream endpoints always
65+
-- return response code 200 on success. Type synonyms are provided for standard
66+
-- methods.
3767
data Stream (method :: k1) (status :: Nat) (framing :: *) (contentType :: *) (a :: *)
3868
deriving (Typeable, Generic)
3969

4070
type StreamGet = Stream 'GET 200
4171
type StreamPost = Stream 'POST 200
4272

43-
-- | Stream endpoints may be implemented as producing a @StreamGenerator@ -- a function that itself takes two emit functions -- the first to be used on the first value the stream emits, and the second to be used on all subsequent values (to allow interspersed framing strategies such as comma separation).
44-
newtype StreamGenerator a = StreamGenerator {getStreamGenerator :: (a -> IO ()) -> (a -> IO ()) -> IO ()}
73+
-- | Stream endpoints may be implemented as producing a @StreamGenerator@ a
74+
-- function that itself takes two emit functions the first to be used on the
75+
-- first value the stream emits, and the second to be used on all subsequent
76+
-- values (to allow interspersed framing strategies such as comma separation).
77+
newtype StreamGenerator a = StreamGenerator { getStreamGenerator :: (a -> IO ()) -> (a -> IO ()) -> IO () }
4578

4679
-- | ToStreamGenerator is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly as endpoints.
4780
class ToStreamGenerator a b | a -> b where
48-
toStreamGenerator :: a -> StreamGenerator b
49-
50-
instance ToStreamGenerator (StreamGenerator a) a
51-
where toStreamGenerator x = x
52-
53-
-- | Clients reading from streaming endpoints can be implemented as producing a @ResultStream@ that captures the setup, takedown, and incremental logic for a read, being an IO continuation that takes a producer of Just either values or errors that terminates with a Nothing.
54-
newtype ResultStream a = ResultStream (forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b)
55-
56-
-- | BuildFromStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints.
57-
class BuildFromStream a b where
58-
buildFromStream :: ResultStream a -> b
59-
60-
instance BuildFromStream a (ResultStream a)
61-
where buildFromStream x = x
81+
toStreamGenerator :: a -> StreamGenerator b
82+
83+
instance ToStreamGenerator (StreamGenerator a) a where
84+
toStreamGenerator x = x
85+
86+
instance ToStreamGenerator (NonEmpty a) a where
87+
toStreamGenerator (x :| xs) = StreamGenerator $ \f g -> f x >> traverse_ g xs
88+
89+
instance ToStreamGenerator [a] a where
90+
toStreamGenerator [] = StreamGenerator $ \_ _ -> return ()
91+
toStreamGenerator (x : xs) = StreamGenerator $ \f g -> f x >> traverse_ g xs
92+
93+
-- | Clients reading from streaming endpoints can be implemented as producing a
94+
-- @ResultStream@ that captures the setup, takedown, and incremental logic for
95+
-- a read, being an IO continuation that takes a producer of Just either values
96+
-- or errors that terminates with a Nothing.
97+
newtype ResultStream a = ResultStream { runResultStream :: forall b. (IO (Maybe (Either String a)) -> IO b) -> IO b }
98+
99+
-- | FromResultStream is intended to be implemented for types such as Conduit, Pipe, etc. By implementing this class, all such streaming abstractions can be used directly on the client side for talking to streaming endpoints.
100+
class FromResultStream a b | b -> a where
101+
fromResultStream :: ResultStream a -> IO b
102+
103+
instance FromResultStream a (ResultStream a) where
104+
fromResultStream = return
105+
106+
-- | Uses 'unsafeInterleaveIO'
107+
instance FromResultStream a [a] where
108+
fromResultStream x = runResultStream x lazyRead
109+
110+
-- | Uses 'unsafeInterleaveIO'
111+
instance FromResultStream a (NonEmpty a) where
112+
fromResultStream x = runResultStream x $ \r -> do
113+
e <- r
114+
case e of
115+
Nothing -> fail "Empty stream"
116+
Just (Left er) -> fail er
117+
Just (Right y) -> do
118+
ys <- lazyRead r
119+
return (y :| ys)
120+
121+
lazyRead :: IO (Maybe (Either String a)) -> IO [a]
122+
lazyRead r = go
123+
where
124+
go = unsafeInterleaveIO loop
125+
126+
loop = do
127+
e <- r
128+
case e of
129+
Nothing -> return []
130+
Just (Left er) -> fail er
131+
Just (Right y) -> do
132+
ys <- go
133+
return (y : ys)
62134

63135
-- | The FramingRender class provides the logic for emitting a framing strategy. The strategy emits a header, followed by boundary-delimited data, and finally a termination character. For many strategies, some of these will just be empty bytestrings.
64136
class FramingRender strategy a where
@@ -74,10 +146,10 @@ data BoundaryStrategy = BoundaryStrategyBracket (ByteString -> (ByteString,ByteS
74146
| BoundaryStrategyGeneral (ByteString -> ByteString)
75147

76148
-- | A type of parser that can never fail, and has different parsing strategies (incremental, or EOF) depending if more input can be sent. The incremental parser should return `Nothing` if it would like to be sent a longer ByteString. If it returns a value, it also returns the remainder following that value.
77-
data ByteStringParser a = ByteStringParser {
78-
parseIncremental :: ByteString -> Maybe (a, ByteString),
79-
parseEOF :: ByteString -> (a, ByteString)
80-
}
149+
data ByteStringParser a = ByteStringParser
150+
{ parseIncremental :: ByteString -> Maybe (a, ByteString)
151+
, parseEOF :: ByteString -> (a, ByteString)
152+
}
81153

82154
-- | The FramingUnrender class provides the logic for parsing a framing strategy. The outer @ByteStringParser@ strips the header from a stream of bytes, and yields a parser that can handle the remainder, stepwise. Each frame may be a ByteString, or a String indicating the error state for that frame. Such states are per-frame, so that protocols that can resume after errors are able to do so. Eventually this returns an empty ByteString to indicate termination.
83155
class FramingUnrender strategy a where

servant/src/Servant/Utils/Links.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,8 +460,8 @@ instance HasLink Raw where
460460
type MkLink Raw a = a
461461
toLink toA _ = toA
462462

463-
instance HasLink (Stream m fr ct a) where
464-
type MkLink (Stream m fr ct a) r = r
463+
instance HasLink (Stream m status fr ct a) where
464+
type MkLink (Stream m status fr ct a) r = r
465465
toLink toA _ = toA
466466

467467
-- AuthProtext instances

0 commit comments

Comments
 (0)