Skip to content

Commit 5cc9ed7

Browse files
committed
Add watch as separate package
1 parent 38f64fc commit 5cc9ed7

File tree

4 files changed

+182
-0
lines changed

4 files changed

+182
-0
lines changed

kubernetes-watch/README.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# kubernetes-watch-client
2+
3+
Client for streaming events from watch enabled endpoints.
4+
5+
## Example
6+
Following is a simple example which
7+
just streams to stdout. First some setup - this assumes kubernetes is accessible
8+
at http://localhost:8001, e.g. after running `kubectl proxy`:
9+
10+
```haskell
11+
> import qualified Data.ByteString.Streaming.Char8 as Q
12+
13+
> manager <- newManager defaultManagerSettings
14+
> defaultConfig <- newConfig
15+
> config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
16+
> request = listEndpointsForAllNamespaces (Accept MimeJSON)
17+
```
18+
19+
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
20+
21+
```haskell
22+
> dispatchWatch manager config request Q.stdout
23+
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
24+
```
25+
26+
A more complex example involving some ggprocessing of the stream, the following
27+
prints out the event types of each event. First, define functions to allow us apply
28+
a parser to a stream:
29+
30+
31+
```haskell
32+
import Data.Aeson
33+
import qualified Data.ByteString.Streaming.Char8 as Q
34+
import Data.JsonStream.Parser
35+
import qualified Streaming.Prelude as S
36+
37+
-- | Parse the stream using the given parser.
38+
streamParse ::
39+
FromJSON a =>
40+
Parser a
41+
-> Q.ByteString IO r
42+
-> Stream (Of [a]) IO r
43+
streamParse parser byteStream = do
44+
byteStream & Q.lines & parseEvent parser
45+
46+
-- | Parse a single event from the stream.
47+
parseEvent ::
48+
(FromJSON a, Monad m) =>
49+
Parser a
50+
-> Stream (Q.ByteString m) m r
51+
-> Stream (Of [a]) m r
52+
parseEvent parser byteStream = S.map (parseByteString parser) (S.mapped Q.toStrict byteStream)
53+
```
54+
55+
Next, define the parser and apply it to the stream:
56+
57+
```haskell
58+
> eventParser = value :: Parser (WatchEvent V1Endpoints)
59+
> withResponseBody body = streamParse eventParser body & S.map (map eventType)
60+
> dispatchWatch manager config request (S.print . withResponseBody)
61+
[\"ADDED\"]
62+
[\"ADDED\"]
63+
[\"MODIFIED\"]
64+
...
65+
```
66+
67+
Packages in this example:
68+
* Data.Aeson -- from [aeson](https://hackage.haskell.org/package/aeson)
69+
* Data.ByteString.Streaming.Char8 from [streaming-bytestring](https://hackage.haskell.org/package/streaming-bytestring-0.1.5/docs/Data-ByteString-Streaming-Char8.html)
70+
* Data.JsonStream.Parser from [json-stream](https://hackage.haskell.org/package/json-stream-0.4.1.5/docs/Data-JsonStream-Parser.html)
71+
* Streaming.Prelude from [streaming](https://hackage.haskell.org/package/streaming-0.2.0.0/docs/Streaming-Prelude.html)

kubernetes-watch/package.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
name: kubernetes-watch
2+
version: 0.1.0.0
3+
library:
4+
source-dirs: src
5+
dependencies:
6+
- base >=4.7 && <5.0
7+
- aeson >=1.0 && <2.0
8+
- bytestring >=0.10.0 && <0.11
9+
- http-client >=0.5 && <0.6
10+
- mtl >=2.2.1
11+
- streaming-bytestring >= 0.1.5 && < 0.2.0
12+
- text >=0.11 && <1.3
13+
- kubernetes == 0.1.0.0
14+
15+
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
{-# LANGUAGE FlexibleContexts #-}
2+
{-# LANGUAGE OverloadedStrings #-}
3+
module Kubernetes.Watch.Client
4+
( WatchEvent
5+
, eventType
6+
, eventObject
7+
, dispatchWatch
8+
) where
9+
10+
import Control.Monad
11+
import Control.Monad.Trans (lift)
12+
import Data.Aeson
13+
import qualified Data.ByteString as B
14+
import qualified Data.ByteString.Streaming.Char8 as Q
15+
import qualified Data.Text as T
16+
import Kubernetes.Core
17+
import Kubernetes.Client
18+
import Kubernetes.MimeTypes
19+
import Kubernetes.Model (Watch(..))
20+
import Network.HTTP.Client
21+
22+
data WatchEvent a = WatchEvent
23+
{ _eventType :: T.Text
24+
, _eventObject :: a
25+
} deriving (Eq, Show)
26+
27+
instance FromJSON a => FromJSON (WatchEvent a) where
28+
parseJSON (Object x) = WatchEvent <$> x .: "type" <*> x .: "object"
29+
parseJSON _ = fail "Expected an object"
30+
31+
instance ToJSON a => ToJSON (WatchEvent a) where
32+
toJSON x = object
33+
[ "type" .= _eventType x
34+
, "object" .= _eventObject x
35+
]
36+
37+
-- | Type of the 'WatchEvent'.
38+
eventType :: WatchEvent a -> T.Text
39+
eventType = _eventType
40+
41+
-- | Object within the 'WatchEvent'.
42+
eventObject :: WatchEvent a -> a
43+
eventObject = _eventObject
44+
45+
{-| Dispatch a request setting watch to true. Takes a consumer function
46+
which consumes the 'Q.ByteString' stream. Following is a simple example which
47+
just streams to stdout. First some setup - this assumes kubernetes is accessible
48+
at http://localhost:8001, e.g. after running /kubectl proxy/:
49+
50+
@
51+
import qualified Data.ByteString.Streaming.Char8 as Q -- from <https://hackage.haskell.org/package/streaming-bytestring-0.1.5/docs/Data-ByteString-Streaming-Char8.html streaming-bytestring>
52+
53+
manager <- newManager defaultManagerSettings
54+
defaultConfig <- newConfig
55+
config = defaultConfig { configHost = "http://localhost:8001", configValidateAuthMethods = False }
56+
request = listEndpointsForAllNamespaces (Accept MimeJSON)
57+
@
58+
59+
Launching 'dispatchWatch' with the above we get a stream of endpoints data:
60+
61+
@
62+
> dispatchWatch manager config request Q.stdout
63+
{"type":\"ADDED\","object":{"kind":\"Endpoints\","apiVersion":"v1","metadata":{"name":"heapster" ....
64+
@
65+
-}
66+
dispatchWatch ::
67+
(HasOptionalParam req Watch, MimeType accept, MimeType contentType) =>
68+
Manager
69+
-> KubernetesConfig
70+
-> KubernetesRequest req contentType resp accept
71+
-> (Q.ByteString IO () -> IO a)
72+
-> IO a
73+
dispatchWatch manager config request apply = do
74+
let watchRequest = applyOptionalParam request (Watch True)
75+
(InitRequest req) <- _toInitRequest config watchRequest
76+
withHTTP req manager $ \resp -> apply $ responseBody resp
77+
78+
withHTTP ::
79+
Request
80+
-> Manager
81+
-> (Response (Q.ByteString IO ()) -> IO a)
82+
-> IO a
83+
withHTTP response manager f = withResponse response manager f'
84+
where
85+
f' resp = do
86+
let p = (from . brRead . responseBody) resp
87+
f (resp {responseBody = p})
88+
from :: IO B.ByteString -> Q.ByteString IO ()
89+
from io = go
90+
where
91+
go = do
92+
bs <- lift io
93+
unless (B.null bs) $ do
94+
Q.chunk bs
95+
go

stack.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ extra-deps:
33
packages:
44
- kubernetes
55
- kubernetes-client-helper
6+
- kubernetes-watch

0 commit comments

Comments
 (0)