diff --git a/changelog.d/1317-server-sent-events b/changelog.d/1317-server-sent-events new file mode 100644 index 000000000..1a0a608ec --- /dev/null +++ b/changelog.d/1317-server-sent-events @@ -0,0 +1,11 @@ +synopsis: Server-sent events (SSE) for client-side +prs: #1317 +issues: #1317 + +description: { + +Implement Server-sent events (SSE) for the Servant client using a new +combinator "ServerSentEvents". The raw event messages, accumulated events and +JSON-processed events can be exposed. + +} diff --git a/servant-client-core/servant-client-core.cabal b/servant-client-core/servant-client-core.cabal index b2965ea16..59ae9e172 100644 --- a/servant-client-core/servant-client-core.cabal +++ b/servant-client-core/servant-client-core.cabal @@ -41,6 +41,7 @@ library Servant.Client.Core.Request Servant.Client.Core.Response Servant.Client.Core.RunClient + Servant.Client.Core.ServerSentEvents other-modules: Servant.Client.Core.Internal @@ -50,7 +51,8 @@ library -- -- note: mtl lower bound is so low because of GHC-7.8 build-depends: - base >= 4.9 && < 4.16 + attoparsec >= 0.13.2.2 && < 0.15 + , base >= 4.9 && < 4.16 , bytestring >= 0.10.8.1 && < 0.12 , constraints >= 0.2 && < 0.14 , containers >= 0.5.7.1 && < 0.7 @@ -94,11 +96,15 @@ test-suite spec other-modules: Servant.Client.Core.Internal.BaseUrlSpec Servant.Client.Core.RequestSpec + Servant.Client.Core.ServerSentEventsSpec -- Dependencies inherited from the library. No need to specify bounds. build-depends: base , base-compat + , bytestring + , transformers + , servant , servant-client-core -- Additional dependencies diff --git a/servant-client-core/src/Servant/Client/Core/HasClient.hs b/servant-client-core/src/Servant/Client/Core/HasClient.hs index 9b26c089c..29da53ae4 100644 --- a/servant-client-core/src/Servant/Client/Core/HasClient.hs +++ b/servant-client-core/src/Servant/Client/Core/HasClient.hs @@ -85,7 +85,7 @@ import Servant.API.Generic (GenericMode(..), ToServant, ToServantApi , GenericServant, toServant, fromServant) import Servant.API.ContentTypes - (contentTypes, AllMime (allMime), AllMimeUnrender (allMimeUnrender)) + (contentTypes, AllMime (allMime), AllMimeUnrender (allMimeUnrender), EventStream) import Servant.API.Status (statusFromNat) import Servant.API.TypeLevel (FragmentUnique, AtLeastOneFragment) @@ -94,6 +94,10 @@ import Servant.API.Modifiers import Servant.API.TypeErrors import Servant.API.UVerb (HasStatus, HasStatuses (Statuses, statuses), UVerb, Union, Unique, inject, statusOf, foldMapUnion, matchUnion) +import Servant.API.ServerSentEvents + (EventKind (JsonEvent, RawEvent), ServerSentEvents') +import Servant.API.Stream + (NoFraming) import Servant.Client.Core.Auth import Servant.Client.Core.BasicAuth @@ -101,6 +105,7 @@ import Servant.Client.Core.ClientError import Servant.Client.Core.Request import Servant.Client.Core.Response import Servant.Client.Core.RunClient +import Servant.Client.Core.ServerSentEvents -- * Accessing APIs as a Client @@ -462,6 +467,63 @@ instance {-# OVERLAPPING #-} , requestMethod = reflectMethod (Proxy :: Proxy method) } +type SseClientDelegate method status = + Stream method status NoFraming EventStream + +instance + ( RunClient m + , HasClient m (SseClientDelegate method status (EventMessageStreamT IO)) + ) + => HasClient m (ServerSentEvents' method status 'RawEvent EventMessage) where + type Client m (ServerSentEvents' method status 'RawEvent EventMessage) = + Client m (SseClientDelegate method status (EventMessageStreamT IO)) + + hoistClientMonad p _ = + hoistClientMonad + p + (Proxy :: Proxy (SseClientDelegate method status (EventMessageStreamT IO))) + + clientWithRoute p _ = + clientWithRoute + p + (Proxy :: Proxy (SseClientDelegate method status (EventMessageStreamT IO))) + +instance + ( RunClient m + , HasClient m (SseClientDelegate method status (EventStreamT IO)) + ) + => HasClient m (ServerSentEvents' method status 'RawEvent (Event a)) where + type Client m (ServerSentEvents' method status 'RawEvent (Event a)) = + Client m (SseClientDelegate method status (EventStreamT IO)) + + hoistClientMonad p _ = + hoistClientMonad + p + (Proxy :: Proxy (SseClientDelegate method status (EventStreamT IO))) + + clientWithRoute p _ = + clientWithRoute + p + (Proxy :: Proxy (SseClientDelegate method status (EventStreamT IO))) + +instance + ( RunClient m + , HasClient m (SseClientDelegate method status (JsonEventStreamT IO a)) + ) + => HasClient m (ServerSentEvents' method status 'JsonEvent a) where + type Client m (ServerSentEvents' method status 'JsonEvent a) = + Client m (SseClientDelegate method status (JsonEventStreamT IO a)) + + hoistClientMonad p _ = + hoistClientMonad + p + (Proxy :: Proxy (SseClientDelegate method status (JsonEventStreamT IO a))) + + clientWithRoute p _ = + clientWithRoute + p + (Proxy :: Proxy (SseClientDelegate method status (JsonEventStreamT IO a))) + -- | If you use a 'Header' in one of your endpoints in your API, -- the corresponding querying function will automatically take -- an additional argument of the type specified by your 'Header', diff --git a/servant-client-core/src/Servant/Client/Core/ServerSentEvents.hs b/servant-client-core/src/Servant/Client/Core/ServerSentEvents.hs new file mode 100644 index 000000000..fa6b61e93 --- /dev/null +++ b/servant-client-core/src/Servant/Client/Core/ServerSentEvents.hs @@ -0,0 +1,310 @@ +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + +-- | Server-sent events +-- +-- See for more details +-- on server-sent events (SSE). +-- +module Servant.Client.Core.ServerSentEvents ( + EventMessage (..), + EventIgnoreReason (..), + Event (..), + EventStreamT (..), + JsonEventStreamT (..), + EventMessageStreamT (..) +) where + +import Control.Applicative + (Alternative ((<|>))) +import Control.Monad.IO.Class + (MonadIO) +import qualified Data.Aeson as Aeson +import qualified Data.Attoparsec.ByteString as Attoparsec +import qualified Data.ByteString as ByteString +import qualified Data.ByteString.Char8 as ByteString.Char8 +import qualified Data.ByteString.Lazy as ByteString.Lazy +import Data.Char + (chr) +import Data.Coerce + (coerce) +import Data.Foldable + (traverse_) +import Data.Functor + (void) +import qualified Data.Text as Text +import Data.Text.Encoding + (encodeUtf8) +import GHC.Generics + (Generic) +import Numeric.Natural + (Natural) +import Servant.API.ContentTypes + (EventStreamChunk (..)) +import Servant.API.Stream + (FromSourceIO (..)) +import Servant.Types.SourceT + (SourceT, StepT (..), foreachYieldStep, mapStepT, + transformStepWithAtto) + +-- For compatibility with GHC <= 8.2 +import Data.Semigroup + (Semigroup (..)) + +-- | Line (or frame) of an event stream +newtype EventStreamLine = EventStreamLine + { unEventStreamLine :: ByteString.ByteString } + deriving Show + +-- | Consume chunks to produce event stream lines. +eventLinesFromRawChunks + :: Monad m + => StepT m ByteString.ByteString + -> StepT m EventStreamLine +eventLinesFromRawChunks = + transformStepWithAtto eventLine + +-- | Consume event stream chunks to produce event stream lines. +eventLinesFromChunks + :: Monad m + => StepT m EventStreamChunk + -> StepT m EventStreamLine +eventLinesFromChunks = + -- 'coerce' efficiently unpacks the 'EventStreamChunk' + eventLinesFromRawChunks . fmap (coerce ByteString.Lazy.toStrict) + +-- | Apply a 'Attoparsec.Parser' to each line of the event stream individually. +parseEventLines + :: Monad m + => Attoparsec.Parser a + -> StepT m EventStreamLine + -> StepT m a +parseEventLines parser = + foreachYieldStep $ \(EventStreamLine line) next -> + case Attoparsec.parseOnly parser line of + Left err -> Error err + Right value -> Yield value next + +-- | A line of an event stream +eventLine :: Attoparsec.Parser EventStreamLine +eventLine = do + Attoparsec.option () byteOrderMark -- A line may be prefixed with a byte order mark + EventStreamLine <$> untilLineEnd <* lineEnd + +-- | Byte order mark (U+FEFF) in UTF-8 representation +byteOrderMark :: Attoparsec.Parser () +byteOrderMark = + traverse_ Attoparsec.word8 + $ ByteString.unpack + $ encodeUtf8 + $ Text.singleton + $ chr 0xFEFF + +-- | Event stream line ending +lineEnd :: Attoparsec.Parser () +lineEnd = + (cr >> lf) <|> cr <|> lf <|> Attoparsec.endOfInput + where + cr = void (Attoparsec.word8 0x0D) + lf = void (Attoparsec.word8 0x0A) + +-- | Consume all contents until the end of the line. +untilLineEnd :: Attoparsec.Parser ByteString.ByteString +untilLineEnd = Attoparsec.takeWhile (\w8 -> w8 /= 0x0D && w8 /= 0x0A) + +-- | Structured variant of an event line of an event stream +data EventMessage + = EventDispatch + -- ^ Dispatch on the accumulated event. + | EventSetName ByteString.ByteString + -- ^ Set the name of the current event. + | EventSetLastId ByteString.ByteString + -- ^ Set the last event identifier. + | EventData ByteString.ByteString + -- ^ Append data to the event's data buffer. + | EventRetry Natural + -- ^ Set the event stream's reconnection time. + | EventIgnore EventIgnoreReason + -- ^ Ignored + deriving (Show, Eq, Ord) + +-- | Reason why a event line can be ignored +data EventIgnoreReason + = EventFieldNameUnknown ByteString.ByteString + | EventRetryNonNumeric ByteString.ByteString + | EventComment ByteString.ByteString + deriving (Show, Eq, Ord) + +-- | Parse the event stream lines into more structured messages. +eventMessagesFromLines + :: Monad m + => StepT m EventStreamLine + -> StepT m EventMessage +eventMessagesFromLines = + ensureLastDispatch False . parseEventLines eventMessage + where + -- | Make sure the last event message is a dispatch. + ensureLastDispatch didDispatch step = case step of + Stop -> + if not didDispatch then Yield EventDispatch Stop else Stop + Yield other next -> + Yield other $ ensureLastDispatch (other == EventDispatch) next + Skip next -> + Skip $ ensureLastDispatch didDispatch next + Effect eff -> + Effect $ ensureLastDispatch didDispatch <$> eff + err@Error{} -> + err + +-- | Event line parser for an event message. +eventMessage :: Attoparsec.Parser EventMessage +eventMessage = + ignore <|> field <|> dispatch + where + ignore = do + _ <- Attoparsec.word8 0x3A -- ':' + EventIgnore . EventComment <$> Attoparsec.takeByteString + + dispatch = do + Attoparsec.endOfInput + pure EventDispatch + + field = do + name <- Attoparsec.takeWhile1 (/= 0x3A) -- Up to ':' or the end + + value <- Attoparsec.option ByteString.empty $ do + _ <- Attoparsec.word8 0x3A -- ':' + _ <- Attoparsec.option 0x20 $ Attoparsec.word8 0x20 -- Optional ' ' + Attoparsec.takeByteString + + pure $ case name of + "event" -> EventSetName value + + "data" -> EventData value + + "id" -> EventSetLastId value + + "retry" -> + -- The retry value consist of digits. + if ByteString.all (\w8 -> 0x30 <= w8 && w8 <= 0x39) value then + EventRetry (read (ByteString.Char8.unpack value)) + else + EventIgnore (EventRetryNonNumeric value) + + _ -> EventIgnore (EventFieldNameUnknown name) + +-- | Event sent by the remote +data Event a = Event + { eventName :: Maybe ByteString.ByteString + , eventData :: a + } + deriving (Show, Eq, Ord, Functor, Generic) + +-- | Accumulate event messages to build individual 'Event's. +eventsFromMessages + :: Functor m + => StepT m EventMessage + -> StepT m (Event ByteString.ByteString) +eventsFromMessages = + initGo + where + initGo = go Nothing ByteString.Lazy.empty + + combineData dataBuffer newData = + if ByteString.Lazy.null dataBuffer then + ByteString.Lazy.fromStrict newData + else + ByteString.Lazy.concat + [ dataBuffer + , ByteString.Lazy.singleton 0x0A -- Line feed + , ByteString.Lazy.fromStrict newData + ] + + go name dataBuffer step = case step of + Stop -> + Stop + Skip next -> + go name dataBuffer next + Effect eff -> + Effect (go name dataBuffer <$> eff) + Error err -> + Error err + Yield message next -> case message of + EventSetName newName -> + go (Just newName) dataBuffer next + EventData newData -> + go name (combineData dataBuffer newData) next + EventDispatch -> + Yield + (Event name (ByteString.Lazy.toStrict dataBuffer)) + (initGo next) + _ -> + -- We ignore other message because they don't fit into + -- the 'Event' type. If a user needs more fine grained + -- control, the 'EventMessage' interface is better suited. + go name dataBuffer next + +-- | Server-sent event stream (SSE) +-- +-- See for more details. +-- +newtype EventMessageStreamT m = EventMessageStreamT + { unEventMessageStreamT :: SourceT m EventMessage } + deriving (Show, Semigroup, Monoid) + +-- | Server-sent event messages +-- +-- 'EventMessage' gives you more control over the communication with the server +-- than 'Event'. +-- +instance MonadIO m => FromSourceIO EventStreamChunk (EventMessageStreamT m) where + fromSourceIO = + EventMessageStreamT + . mapStepT (eventMessagesFromLines . eventLinesFromChunks) + . fromSourceIO + +-- | Server-sent event stream (SSE) +-- +-- See for more details. +-- +newtype EventStreamT m = EventStreamT + { unEventStreamT :: SourceT m (Event ByteString.ByteString) } + deriving (Show, Semigroup, Monoid) + +-- | Server-sent events +instance MonadIO m => FromSourceIO EventStreamChunk (EventStreamT m) where + fromSourceIO input = + -- 'coerce' is used in place of unpacking and repacking 'EventStreamT' + coerce + (mapStepT eventsFromMessages) + (fromSourceIO input :: EventMessageStreamT m) + +-- | Try to parse event data to JSON. +jsonEventsFromEvents + :: (Functor m, Aeson.FromJSON a) + => StepT m (Event ByteString.ByteString) + -> StepT m (Event a) +jsonEventsFromEvents = + foreachYieldStep $ \(Event name datas) next -> + either + Error + (\value -> Yield (Event name value) next) + (Aeson.eitherDecode (ByteString.Lazy.fromStrict datas)) + +-- | Server-sent event stream (SSE) for JSON values +newtype JsonEventStreamT m a = JsonEventStreamT + { unJsonEventStreamT :: SourceT m (Event a) } + deriving (Show, Functor, Semigroup, Monoid) + +-- | Server-sent JSON event stream +instance (MonadIO m, Aeson.FromJSON a) => FromSourceIO EventStreamChunk (JsonEventStreamT m a) where + fromSourceIO input = + -- The 'coerce' efficiently unwraps the 'EventStreamT' and wraps the + -- JsonEventStreamT. + coerce + (mapStepT jsonEventsFromEvents) + (fromSourceIO input :: EventStreamT m) diff --git a/servant-client-core/test/Servant/Client/Core/ServerSentEventsSpec.hs b/servant-client-core/test/Servant/Client/Core/ServerSentEventsSpec.hs new file mode 100644 index 000000000..709ab8583 --- /dev/null +++ b/servant-client-core/test/Servant/Client/Core/ServerSentEventsSpec.hs @@ -0,0 +1,102 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Servant.Client.Core.ServerSentEventsSpec (spec) where + +import Control.Monad.Trans.Except + (runExceptT) +import qualified Data.ByteString.Lazy as ByteString +import Data.Foldable + (for_) +import Data.Int + (Int64) +import Servant.API.ContentTypes + (EventStreamChunk (..)) +import Servant.API.Stream + (FromSourceIO (fromSourceIO)) +import Servant.Client.Core.ServerSentEvents + (Event (..), EventIgnoreReason (EventComment), + EventMessage (..), unEventMessageStreamT, unEventStreamT) +import Servant.Types.SourceT + (runSourceT, source) +import Test.Hspec + (Spec, describe, it, shouldBe) + +spec :: Spec +spec = describe "Servant.Client.Core.ServerSentEvent" $ do + describe "EventMessageStreamT" $ do + it "processes chunks correctly" $ do + let allMessages = ByteString.intercalate "\n" + [ "retry: 30" + , "data: Hello World" + , "id: 1" + , "" + , "event: my_event" + , "data" + , "id: 2" + , ":Just a comment" + , "" + , "data: Bye" + ] + + for_ [1, 10, 100] $ \chunkSize -> do + result <- + runExceptT + $ runSourceT + $ unEventMessageStreamT + $ fromSourceIO + $ source + $ map EventStreamChunk + $ chunkify chunkSize allMessages + + result `shouldBe` Right + [ EventRetry 30 + , EventData "Hello World" + , EventSetLastId "1" + , EventDispatch + , EventSetName "my_event" + , EventData "" + , EventSetLastId "2" + , EventIgnore (EventComment "Just a comment") + , EventDispatch + , EventData "Bye" + , EventDispatch + ] + + describe "EventStreamT" $ do + it "processes chunks correctly" $ do + let allMessages = ByteString.intercalate "\n" + [ "retry: 30" + , "data: Hello World" + , "id: 1" + , "" + , "event: my_event" + , "data" + , "id: 2" + , ":Just a comment" + , "" + , "data: Bye" + ] + + for_ [1, 10, 100] $ \chunkSize -> do + result <- + runExceptT + $ runSourceT + $ unEventStreamT + $ fromSourceIO + $ source + $ map EventStreamChunk + $ chunkify chunkSize allMessages + + result `shouldBe` Right + [ Event Nothing "Hello World" + , Event (Just "my_event") "" + , Event Nothing "Bye" + ] + +chunkify :: Int64 -> ByteString.ByteString -> [ByteString.ByteString] +chunkify chunkSize input = + if ByteString.null input then + [] + else + let (h, t) = ByteString.splitAt chunkSize input + in h : chunkify chunkSize t diff --git a/servant/servant.cabal b/servant/servant.cabal index 7a82a9ee0..a233a4b55 100644 --- a/servant/servant.cabal +++ b/servant/servant.cabal @@ -52,6 +52,7 @@ library Servant.API.RemoteHost Servant.API.ReqBody Servant.API.ResponseHeaders + Servant.API.ServerSentEvents Servant.API.Status Servant.API.Stream Servant.API.Sub diff --git a/servant/src/Servant/API.hs b/servant/src/Servant/API.hs index de4b805cc..41ba91363 100644 --- a/servant/src/Servant/API.hs +++ b/servant/src/Servant/API.hs @@ -42,6 +42,9 @@ module Servant.API ( -- * Streaming endpoints, distinguished by HTTP method module Servant.API.Stream, + -- * Server-sent events (SSE) + module Servant.API.ServerSentEvents, + -- * Authentication module Servant.API.BasicAuth, @@ -121,6 +124,8 @@ import Servant.API.ResponseHeaders GetHeaders (getHeaders), HList (..), HasResponseHeader, Headers (..), ResponseHeader (..), addHeader, getHeadersHList, getResponse, lookupResponseHeader, noHeader) +import Servant.API.ServerSentEvents + (EventKind (..), ServerSentEvents, ServerSentEvents') import Servant.API.Stream (FramingRender (..), FramingUnrender (..), FromSourceIO (..), NetstringFraming, NewlineFraming, NoFraming, SourceIO, Stream, diff --git a/servant/src/Servant/API/ContentTypes.hs b/servant/src/Servant/API/ContentTypes.hs index 10e8d8966..2cd238d26 100644 --- a/servant/src/Servant/API/ContentTypes.hs +++ b/servant/src/Servant/API/ContentTypes.hs @@ -49,6 +49,7 @@ module Servant.API.ContentTypes , PlainText , FormUrlEncoded , OctetStream + , EventStream -- * Building your own Content-Type , Accept(..) @@ -67,6 +68,7 @@ module Servant.API.ContentTypes , AllMimeUnrender(..) , eitherDecodeLenient , canHandleAcceptH + , EventStreamChunk(..) ) where import Control.Arrow @@ -110,6 +112,7 @@ data JSON deriving Typeable data PlainText deriving Typeable data FormUrlEncoded deriving Typeable data OctetStream deriving Typeable +data EventStream deriving Typeable -- * Accept class @@ -153,6 +156,10 @@ instance Accept PlainText where instance Accept OctetStream where contentType _ = "application" M.// "octet-stream" +-- | @text/event-stream@ +instance Accept EventStream where + contentType _ = "text" M.// "event-stream" + newtype AcceptHeader = AcceptHeader BS.ByteString deriving (Eq, Show, Read, Typeable, Generic) @@ -419,6 +426,12 @@ instance MimeUnrender OctetStream ByteString where instance MimeUnrender OctetStream BS.ByteString where mimeUnrender _ = Right . toStrict +-- | Chunk of an event stream +newtype EventStreamChunk = EventStreamChunk + { unEventStreamChunk :: ByteString } + +instance MimeUnrender EventStream EventStreamChunk where + mimeUnrender _ = Right . EventStreamChunk -- $setup -- >>> :set -XFlexibleInstances diff --git a/servant/src/Servant/API/ServerSentEvents.hs b/servant/src/Servant/API/ServerSentEvents.hs new file mode 100644 index 000000000..cc887fba5 --- /dev/null +++ b/servant/src/Servant/API/ServerSentEvents.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE PolyKinds #-} + +-- | Server-sent events +-- +-- See . +-- +module Servant.API.ServerSentEvents + ( ServerSentEvents' + , ServerSentEvents + , EventKind (..) + ) +where + +import Data.Typeable + (Typeable) +import GHC.Generics + (Generic) +import GHC.TypeLits + (Nat) +import Network.HTTP.Types + (StdMethod (GET)) + +-- | Determines the shape of events you may receive (i.e. the @a@ in +-- 'ServerSentEvents\'') +data EventKind + = RawEvent + -- ^ 'EventMessage' or 'Event' 'ByteString' + | JsonEvent + -- ^ Anything that implements 'FromJSON' + +-- | Server-sent events (SSE) +-- +-- See . +-- +data ServerSentEvents' (method :: k) (status :: Nat) (kind :: EventKind) (a :: *) + deriving (Typeable, Generic) + +type ServerSentEvents = ServerSentEvents' 'GET 200 diff --git a/servant/src/Servant/Types/SourceT.hs b/servant/src/Servant/Types/SourceT.hs index 84cb4b6a8..a29d80005 100644 --- a/servant/src/Servant/Types/SourceT.hs +++ b/servant/src/Servant/Types/SourceT.hs @@ -296,6 +296,22 @@ foreachStep f g = go where go (Error err) = f err go (Effect ms) = ms >>= go +-- | Traverse the 'StepT' and call the given function for each 'Yield'. +foreachYieldStep + :: Functor m + => (a -> StepT m b -> StepT m b) + -> StepT m a + -> StepT m b +foreachYieldStep f = + go + where + go step = case step of + Error msg -> Error msg + Stop -> Stop + Skip next -> Skip (go next) + Yield val next -> f val (go next) + Effect eff -> Effect (go <$> eff) + ------------------------------------------------------------------------------- -- Monadic -------------------------------------------------------------------------------