Skip to content

Commit 21d27d5

Browse files
committed
Expose read and on('readable')
1 parent 1233c9b commit 21d27d5

File tree

3 files changed

+129
-32
lines changed

3 files changed

+129
-32
lines changed

src/Node/Stream.js

Lines changed: 51 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,30 @@ exports.setEncodingImpl = function(s) {
1212
};
1313
};
1414

15-
exports.onDataEitherImpl = function(left){
16-
return function(right){
17-
return function(s) {
18-
return function(f) {
19-
return function() {
20-
s.on('data', function(chunk) {
21-
if (chunk instanceof Buffer) {
22-
f(right(chunk))();
23-
}
24-
else if (typeof chunk === "string") {
25-
f(left(chunk))();
26-
}
27-
else {
28-
throw new Error(
29-
"Node.Stream.onDataEitherImpl: Unrecognised" +
30-
"chunk type; expected String or Buffer, got:" +
31-
chunk);
32-
}
33-
});
34-
};
15+
exports.readChunkImpl = function(Left) {
16+
return function(Right) {
17+
return function(chunk) {
18+
if (chunk instanceof Buffer) {
19+
return Right(chunk);
20+
} else if (typeof chunk === 'string') {
21+
return Left(chunk);
22+
} else {
23+
throw new Error(
24+
"Node.Stream.readChunkImpl: Unrecognised " +
25+
"chunk type; expected String or Buffer, got: " +
26+
chunk);
27+
}
28+
};
29+
};
30+
};
31+
32+
exports.onDataEitherImpl = function(readChunk) {
33+
return function(r) {
34+
return function(f) {
35+
return function() {
36+
r.on('data', function(data) {
37+
f(readChunk(data))();
38+
});
3539
};
3640
};
3741
};
@@ -40,9 +44,15 @@ exports.onDataEitherImpl = function(left){
4044
exports.onEnd = function(s) {
4145
return function(f) {
4246
return function() {
43-
s.on('end', function() {
44-
f();
45-
});
47+
s.on('end', f);
48+
};
49+
};
50+
};
51+
52+
exports.onReadable = function(s) {
53+
return function(f) {
54+
return function() {
55+
s.on('readable', f);
4656
};
4757
};
4858
};
@@ -60,9 +70,7 @@ exports.onError = function(s) {
6070
exports.onClose = function(s) {
6171
return function(f) {
6272
return function() {
63-
s.on('close', function() {
64-
f();
65-
});
73+
s.on('close', f);
6674
};
6775
};
6876
};
@@ -93,6 +101,23 @@ exports.pipe = function(r) {
93101
};
94102
};
95103

104+
exports.readImpl = function(readChunk) {
105+
return function(Nothing) {
106+
return function(Just) {
107+
return function(r) {
108+
return function() {
109+
const v = r.read();
110+
if (v === null) {
111+
return Nothing;
112+
} else {
113+
return Just(readChunk(v));
114+
}
115+
};
116+
};
117+
};
118+
};
119+
};
120+
96121
exports.write = function(w) {
97122
return function(chunk) {
98123
return function(done) {

src/Node/Stream.purs

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@ module Node.Stream
1111
, onDataString
1212
, onDataEither
1313
, setEncoding
14+
, onReadable
1415
, onEnd
1516
, onClose
1617
, onError
1718
, resume
1819
, pause
1920
, isPaused
2021
, pipe
22+
, read
23+
, readString
24+
, readEither
2125
, write
2226
, writeString
2327
, cork
@@ -29,6 +33,7 @@ module Node.Stream
2933
import Prelude
3034

3135
import Control.Bind ((<=<))
36+
import Data.Maybe (Maybe(..), maybe)
3237
import Data.Either (Either(..))
3338
import Node.Encoding
3439
import Node.Buffer (Buffer())
@@ -61,6 +66,11 @@ type Writable r = Stream (write :: Write | r)
6166
-- | A duplex (readable _and_ writable stream)
6267
type Duplex = Stream (read :: Read, write :: Write)
6368

69+
foreign import data Chunk :: *
70+
readChunk :: Chunk -> Either String Buffer
71+
readChunk = readChunkImpl Left Right
72+
foreign import readChunkImpl :: (forall l r. l -> Either l r) -> (forall l r. r -> Either l r) -> Chunk -> Either String Buffer
73+
6474
-- | Listen for `data` events, returning data in a Buffer. Note that this will fail
6575
-- | if `setEncoding` has been called on the stream.
6676
onData :: forall w eff. Readable w (err :: EXCEPTION | eff) -> (Buffer -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit
@@ -70,23 +80,44 @@ onData r cb =
7080
fromEither x =
7181
case x of
7282
Left _ ->
73-
throw "Node.Stream.onData: Stream encoding should not be set"
83+
throw "Stream encoding should not be set"
7484
Right buf ->
7585
pure buf
7686

87+
read :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Eff (err :: EXCEPTION | eff) (Maybe Buffer)
88+
read r = do
89+
v <- readEither r
90+
case v of
91+
Nothing -> pure Nothing
92+
Just (Left _) -> throw "Stream encoding should not be set"
93+
Just (Right b) -> pure (Just b)
94+
95+
readString :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Encoding -> Eff (err :: EXCEPTION | eff) (Maybe String)
96+
readString r enc = do
97+
v <- readEither r
98+
case v of
99+
Nothing -> pure Nothing
100+
Just (Left _) -> throw "Stream encoding should not be set"
101+
Just (Right buf) -> Just <$> (unsafeInterleaveEff $ Buffer.toString enc buf)
102+
103+
readEither :: forall w eff. Readable w eff -> Eff eff (Maybe (Either String Buffer))
104+
readEither = readImpl readChunk Nothing Just
105+
106+
foreign import readImpl :: forall r eff. (Chunk -> Either String Buffer) -> (forall a. Maybe a) -> (forall a. a -> Maybe a) -> Readable r eff -> Eff eff (Maybe (Either String Buffer))
107+
77108
-- | Listen for `data` events, returning data in a String, which will be
78109
-- | decoded using the given encoding. Note that this will fail if `setEncoding`
79110
-- | has been called on the stream.
80111
onDataString :: forall w eff. Readable w (err :: EXCEPTION | eff) -> Encoding -> (String -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit
81112
onDataString r enc cb = onData r (cb <=< unsafeInterleaveEff <<< Buffer.toString enc)
82113

83-
foreign import onDataEitherImpl :: forall w eff. (forall l r. l -> Either l r) -> (forall l r. r -> Either l r) -> Readable w eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit
84-
85114
-- | Listen for `data` events, returning data in an `Either String Buffer`. This
86115
-- | function is provided for the (hopefully rare) case that `setEncoding` has
87116
-- | been called on the stream.
88-
onDataEither :: forall w eff. Readable w eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit
89-
onDataEither = onDataEitherImpl Left Right
117+
onDataEither :: forall r eff. Readable r (err :: EXCEPTION | eff) -> (Either String Buffer -> Eff (err :: EXCEPTION | eff) Unit) -> Eff (err :: EXCEPTION | eff) Unit
118+
onDataEither r cb = onDataEitherImpl readChunk r cb
119+
120+
foreign import onDataEitherImpl :: forall r eff. (Chunk -> Either String Buffer) -> Readable r eff -> (Either String Buffer -> Eff eff Unit) -> Eff eff Unit
90121

91122
foreign import setEncodingImpl :: forall w eff. Readable w eff -> String -> Eff eff Unit
92123

@@ -99,6 +130,9 @@ foreign import setEncodingImpl :: forall w eff. Readable w eff -> String -> Eff
99130
setEncoding :: forall w eff. Readable w eff -> Encoding -> Eff eff Unit
100131
setEncoding r enc = setEncodingImpl r (show enc)
101132

133+
-- | Listen for `readable` events.
134+
foreign import onReadable :: forall w eff. Readable w eff -> Eff eff Unit -> Eff eff Unit
135+
102136
-- | Listen for `end` events.
103137
foreign import onEnd :: forall w eff. Readable w eff -> Eff eff Unit -> Eff eff Unit
104138

test/Main.purs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ module Test.Main where
22

33
import Prelude
44

5+
import Data.Maybe (Maybe(..), isNothing, isJust)
6+
import Data.Maybe.Unsafe (fromJust)
57
import Data.Either (Either(..))
68
import Node.Buffer as Buffer
79
import Node.Encoding
810
import Node.Stream
9-
import Node.Stream.StdIO
1011

1112
import Control.Monad.Eff
1213
import Control.Monad.Eff.Console
@@ -40,9 +41,46 @@ main = do
4041
log "test pipe"
4142
testPipe
4243

44+
log "test manual reads"
45+
testReads
46+
4347
testString :: String
4448
testString = "üöß💡"
4549

50+
testReads = do
51+
testReadString
52+
testReadBuf
53+
54+
where
55+
testReadString = do
56+
sIn <- passThrough
57+
v <- readString sIn UTF8
58+
assert (isNothing v)
59+
60+
onReadable sIn do
61+
str <- readString sIn UTF8
62+
assert (isJust str)
63+
assertEqual (fromJust str) testString
64+
return unit
65+
66+
writeString sIn UTF8 testString do
67+
return unit
68+
69+
testReadBuf = do
70+
sIn <- passThrough
71+
v <- read sIn
72+
assert (isNothing v)
73+
74+
onReadable sIn do
75+
buf <- read sIn
76+
assert (isJust buf)
77+
assertEqual <$> (Buffer.toString UTF8 (fromJust buf))
78+
<*> pure testString
79+
return unit
80+
81+
writeString sIn UTF8 testString do
82+
return unit
83+
4684
testSetDefaultEncoding = do
4785
w1 <- writableStreamBuffer
4886
check w1

0 commit comments

Comments
 (0)