Skip to content

Commit 14e0f71

Browse files
Move parseBreak from StreamK module to ParserDrivers
1 parent dffa256 commit 14e0f71

File tree

4 files changed

+141
-119
lines changed

4 files changed

+141
-119
lines changed

core/src/Streamly/Internal/Data/ParserDrivers.h

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#ifndef PARSER_WITH_POS
22
#define PARSE_BREAK parseBreak
3+
#define PARSE_BREAK_STREAMK parseBreakStreamK
34
#define PARSE_BREAK_CHUNKS parseBreakChunks
45
#define PARSE_BREAK_CHUNKS_GENERIC parseBreakChunksGeneric
56
#define PARSE_MANY parseMany
@@ -9,6 +10,8 @@
910
#else
1011
#undef PARSE_BREAK
1112
#define PARSE_BREAK parseBreakPos
13+
#undef PARSE_BREAK_STREAMK
14+
#define PARSE_BREAK_STREAMK parseBreakStreamKPos
1215
#undef PARSE_BREAK_CHUNKS
1316
#define PARSE_BREAK_CHUNKS parseBreakChunksPos
1417
#undef PARSE_BREAK_CHUNKS_GENERIC
@@ -757,6 +760,126 @@ PARSE_BREAK (PRD.Parser pstep initial extract) stream@(Stream step state) = do
757760
let src = Prelude.reverse $ getList buf
758761
return (Left (ParseError DEFAULT(i) err), fromList src)
759762

763+
{-# INLINE_NORMAL PARSE_BREAK_STREAMK #-}
764+
PARSE_BREAK_STREAMK
765+
:: forall m a b. Monad m
766+
=> ParserK.ParserK a m b
767+
-> StreamK m a
768+
-> m (Either ParseError b, StreamK m a)
769+
PARSE_BREAK_STREAMK parser input = do
770+
let parserk = ParserK.runParser parser ParserK.parserDone 0 0
771+
in go OPTIONAL(0) [] parserk input
772+
773+
where
774+
775+
{-# INLINE backtrck #-}
776+
-- backtrck :: Int -> [a] -> StreamK m a -> (StreamK m a, [a])
777+
backtrck n xs stream =
778+
let (pre, post) = Stream.splitAt "Data.StreamK.parseBreak" n xs
779+
in (StreamK.append (StreamK.fromList (Prelude.reverse pre)) stream, post)
780+
781+
{-# INLINE goStop #-}
782+
{-
783+
goStop
784+
:: OPTIONAL(Int ->)
785+
[a]
786+
-> (ParserK.Input a -> m (ParserK.Step a m b))
787+
-> m (Either ParseError b, StreamK m a)
788+
-}
789+
goStop OPTIONAL(pos) backBuf parserk = do
790+
pRes <- parserk ParserK.None
791+
case pRes of
792+
-- If we stop in an alternative, it will try calling the next
793+
-- parser, the next parser may call initial returning Partial and
794+
-- then immediately we have to call extract on it.
795+
ParserK.Partial 0 cont1 ->
796+
go OPTIONAL(pos) [] cont1 StreamK.nil
797+
ParserK.Partial n cont1 -> do
798+
let n1 = negate n
799+
assertM(n1 >= 0 && n1 <= length backBuf)
800+
let (s1, backBuf1) = backtrck n1 backBuf StreamK.nil
801+
in go OPTIONAL(pos + n) backBuf1 cont1 s1
802+
ParserK.Continue 0 cont1 ->
803+
go OPTIONAL(pos) backBuf cont1 StreamK.nil
804+
ParserK.Continue n cont1 -> do
805+
let n1 = negate n
806+
assertM(n1 >= 0 && n1 <= length backBuf)
807+
let (s1, backBuf1) = backtrck n1 backBuf StreamK.nil
808+
in go OPTIONAL(pos + n) backBuf1 cont1 s1
809+
ParserK.Done 0 b ->
810+
return (Right b, StreamK.nil)
811+
ParserK.Done n b -> do
812+
let n1 = negate n
813+
assertM(n1 >= 0 && n1 <= length backBuf)
814+
let (s1, _) = backtrck n1 backBuf StreamK.nil
815+
in return (Right b, s1)
816+
ParserK.Error n err ->
817+
let strm = StreamK.fromList (Prelude.reverse backBuf)
818+
in return (Left (ParseError (DEFAULT(pos) + n) err), strm)
819+
820+
{-
821+
yieldk
822+
:: OPTIONAL(Int ->)
823+
[a]
824+
-> (ParserK.Input a -> m (ParserK.Step a m b))
825+
-> a
826+
-> StreamK m a
827+
-> m (Either ParseError b, StreamK m a)
828+
-}
829+
yieldk OPTIONAL(pos) backBuf parserk element stream = do
830+
pRes <- parserk (ParserK.Chunk element)
831+
-- NOTE: factoring out "StreamK.cons element stream" in a let statement here
832+
-- cause big alloc regression.
833+
case pRes of
834+
ParserK.Partial 1 cont1 -> go OPTIONAL(pos + 1) [] cont1 stream
835+
ParserK.Partial 0 cont1 -> go OPTIONAL(pos) [] cont1 (StreamK.cons element stream)
836+
ParserK.Partial n cont1 -> do -- n < 0 case
837+
let n1 = negate n
838+
bufLen = length backBuf
839+
s = StreamK.cons element stream
840+
assertM(n1 >= 0 && n1 <= bufLen)
841+
let (s1, _) = backtrck n1 backBuf s
842+
go OPTIONAL(pos + n) [] cont1 s1
843+
ParserK.Continue 1 cont1 -> go OPTIONAL(pos + 1) (element:backBuf) cont1 stream
844+
ParserK.Continue 0 cont1 ->
845+
go OPTIONAL(pos) backBuf cont1 (StreamK.cons element stream)
846+
ParserK.Continue n cont1 -> do
847+
let n1 = negate n
848+
bufLen = length backBuf
849+
s = StreamK.cons element stream
850+
assertM(n1 >= 0 && n1 <= bufLen)
851+
let (s1, backBuf1) = backtrck n1 backBuf s
852+
go OPTIONAL(pos + n) backBuf1 cont1 s1
853+
ParserK.Done 1 b -> pure (Right b, stream)
854+
ParserK.Done 0 b -> pure (Right b, StreamK.cons element stream)
855+
ParserK.Done n b -> do
856+
let n1 = negate n
857+
bufLen = length backBuf
858+
s = StreamK.cons element stream
859+
assertM(n1 >= 0 && n1 <= bufLen)
860+
let (s1, _) = backtrck n1 backBuf s
861+
pure (Right b, s1)
862+
ParserK.Error n err ->
863+
let strm =
864+
StreamK.append
865+
(StreamK.fromList (Prelude.reverse backBuf))
866+
(StreamK.cons element stream)
867+
in return (Left (ParseError (DEFAULT(pos) + n + 1) err), strm)
868+
869+
{-
870+
go
871+
:: OPTIONAL(Int ->)
872+
[a]
873+
-> (ParserK.Input a -> m (ParserK.Step a m b))
874+
-> StreamK m a
875+
-> m (Either ParseError b, StreamK m a)
876+
-}
877+
go OPTIONAL(pos) backBuf parserk stream = do
878+
let stop = goStop OPTIONAL(pos) backBuf parserk
879+
single a = yieldk OPTIONAL(pos) backBuf parserk a StreamK.nil
880+
in StreamK.foldStream
881+
defState (yieldk OPTIONAL(pos) backBuf parserk) single stop stream
882+
760883
{-# INLINE_NORMAL PARSE_BREAK_CHUNKS #-}
761884
PARSE_BREAK_CHUNKS
762885
:: (Monad m, Unbox a)

core/src/Streamly/Internal/Data/ParserDrivers.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ module Streamly.Internal.Data.ParserDrivers
1212
-- * Running a Parser
1313
parseBreak
1414
, parseBreakPos
15+
, parseBreakStreamK
16+
, parseBreakStreamKPos
1517
, parseBreakChunks
1618
, parseBreakChunksPos
1719
, parseBreakChunksGeneric

core/src/Streamly/Internal/Data/Stream/Nesting.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1601,7 +1601,7 @@ parseIterate
16011601
-> Stream m (Either ParseError b)
16021602
parseIterate = Drivers.parseIterate
16031603

1604-
-- | Like 'parseMany' but includes stream position information in the error
1604+
-- | Like 'parseIterate' but includes stream position information in the error
16051605
-- messages.
16061606
--
16071607
{-# INLINE parseIteratePos #-}

core/src/Streamly/Internal/Data/StreamK.hs

Lines changed: 15 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ module Streamly.Internal.Data.StreamK
4343
, parseDBreak
4444
, parseD
4545
, parseBreak
46+
, parseBreakPos
4647
, parse
4748

4849
-- ** Specialized Folds
@@ -153,6 +154,7 @@ import qualified Streamly.Internal.Data.Array as Array
153154
import qualified Streamly.Internal.Data.Array.Generic as GenArr
154155
import qualified Streamly.Internal.Data.Fold.Type as FL
155156
import qualified Streamly.Internal.Data.Parser as Parser
157+
import qualified Streamly.Internal.Data.ParserDrivers as Drivers
156158
import qualified Streamly.Internal.Data.Parser.Type as PR
157159
import qualified Streamly.Internal.Data.ParserK.Type as ParserK
158160
import qualified Streamly.Internal.Data.Stream as Stream
@@ -1295,131 +1297,26 @@ parseChunks = Array.parse
12951297
-- ParserK Singular
12961298
-------------------------------------------------------------------------------
12971299

1298-
#ifndef PARSER_WITH_POS
1299-
#define PARSE_BREAK parseBreak
1300-
#define OPTIONAL(x)
1301-
#define DEFAULT(x) 0
1302-
#else
1303-
#define PARSE_BREAK parseBreakPos
1304-
#define OPTIONAL(x) (x)
1305-
#define DEFAULT(x) (x)
1306-
#endif
1307-
13081300
-- | Similar to 'parseBreak' but works on singular elements.
13091301
--
1310-
{-# INLINE_NORMAL PARSE_BREAK #-}
1311-
PARSE_BREAK
1302+
{-# INLINE parseBreak #-}
1303+
parseBreak
13121304
:: forall m a b. Monad m
13131305
=> ParserK.ParserK a m b
13141306
-> StreamK m a
13151307
-> m (Either ParseError b, StreamK m a)
1316-
PARSE_BREAK parser input = do
1317-
let parserk = ParserK.runParser parser ParserK.parserDone 0 0
1318-
in go OPTIONAL(0) [] parserk input
1319-
1320-
where
1308+
parseBreak = Drivers.parseBreakStreamK
13211309

1322-
{-# INLINE backtrack #-}
1323-
-- backtrack :: Int -> [a] -> StreamK m a -> (StreamK m a, [a])
1324-
backtrack n xs stream =
1325-
let (pre, post) = Stream.splitAt "Data.StreamK.parseBreak" n xs
1326-
in (append (fromList (Prelude.reverse pre)) stream, post)
1327-
1328-
{-# INLINE goStop #-}
1329-
goStop
1330-
:: OPTIONAL(Int ->)
1331-
[a]
1332-
-> (ParserK.Input a -> m (ParserK.Step a m b))
1333-
-> m (Either ParseError b, StreamK m a)
1334-
goStop OPTIONAL(pos) backBuf parserk = do
1335-
pRes <- parserk ParserK.None
1336-
case pRes of
1337-
-- If we stop in an alternative, it will try calling the next
1338-
-- parser, the next parser may call initial returning Partial and
1339-
-- then immediately we have to call extract on it.
1340-
ParserK.Partial 0 cont1 ->
1341-
go OPTIONAL(pos) [] cont1 nil
1342-
ParserK.Partial n cont1 -> do
1343-
let n1 = negate n
1344-
assertM(n1 >= 0 && n1 <= length backBuf)
1345-
let (s1, backBuf1) = backtrack n1 backBuf nil
1346-
in go OPTIONAL(pos + n) backBuf1 cont1 s1
1347-
ParserK.Continue 0 cont1 ->
1348-
go OPTIONAL(pos) backBuf cont1 nil
1349-
ParserK.Continue n cont1 -> do
1350-
let n1 = negate n
1351-
assertM(n1 >= 0 && n1 <= length backBuf)
1352-
let (s1, backBuf1) = backtrack n1 backBuf nil
1353-
in go OPTIONAL(pos + n) backBuf1 cont1 s1
1354-
ParserK.Done 0 b ->
1355-
return (Right b, nil)
1356-
ParserK.Done n b -> do
1357-
let n1 = negate n
1358-
assertM(n1 >= 0 && n1 <= length backBuf)
1359-
let (s1, _) = backtrack n1 backBuf nil
1360-
in return (Right b, s1)
1361-
ParserK.Error n err ->
1362-
let strm = fromList (Prelude.reverse backBuf)
1363-
in return (Left (ParseError (DEFAULT(pos) + n) err), strm)
1364-
1365-
yieldk
1366-
:: OPTIONAL(Int ->)
1367-
[a]
1368-
-> (ParserK.Input a -> m (ParserK.Step a m b))
1369-
-> a
1370-
-> StreamK m a
1371-
-> m (Either ParseError b, StreamK m a)
1372-
yieldk OPTIONAL(pos) backBuf parserk element stream = do
1373-
pRes <- parserk (ParserK.Chunk element)
1374-
-- NOTE: factoring out "cons element stream" in a let statement here
1375-
-- cause big alloc regression.
1376-
case pRes of
1377-
ParserK.Partial 1 cont1 -> go OPTIONAL(pos + 1) [] cont1 stream
1378-
ParserK.Partial 0 cont1 -> go OPTIONAL(pos) [] cont1 (cons element stream)
1379-
ParserK.Partial n cont1 -> do -- n < 0 case
1380-
let n1 = negate n
1381-
bufLen = length backBuf
1382-
s = cons element stream
1383-
assertM(n1 >= 0 && n1 <= bufLen)
1384-
let (s1, _) = backtrack n1 backBuf s
1385-
go OPTIONAL(pos + n) [] cont1 s1
1386-
ParserK.Continue 1 cont1 -> go OPTIONAL(pos + 1) (element:backBuf) cont1 stream
1387-
ParserK.Continue 0 cont1 ->
1388-
go OPTIONAL(pos) backBuf cont1 (cons element stream)
1389-
ParserK.Continue n cont1 -> do
1390-
let n1 = negate n
1391-
bufLen = length backBuf
1392-
s = cons element stream
1393-
assertM(n1 >= 0 && n1 <= bufLen)
1394-
let (s1, backBuf1) = backtrack n1 backBuf s
1395-
go OPTIONAL(pos + n) backBuf1 cont1 s1
1396-
ParserK.Done 1 b -> pure (Right b, stream)
1397-
ParserK.Done 0 b -> pure (Right b, cons element stream)
1398-
ParserK.Done n b -> do
1399-
let n1 = negate n
1400-
bufLen = length backBuf
1401-
s = cons element stream
1402-
assertM(n1 >= 0 && n1 <= bufLen)
1403-
let (s1, _) = backtrack n1 backBuf s
1404-
pure (Right b, s1)
1405-
ParserK.Error n err ->
1406-
let strm =
1407-
append
1408-
(fromList (Prelude.reverse backBuf))
1409-
(cons element stream)
1410-
in return (Left (ParseError (DEFAULT(pos) + n + 1) err), strm)
1411-
1412-
go
1413-
:: OPTIONAL(Int ->)
1414-
[a]
1415-
-> (ParserK.Input a -> m (ParserK.Step a m b))
1416-
-> StreamK m a
1417-
-> m (Either ParseError b, StreamK m a)
1418-
go OPTIONAL(pos) backBuf parserk stream = do
1419-
let stop = goStop OPTIONAL(pos) backBuf parserk
1420-
single a = yieldk OPTIONAL(pos) backBuf parserk a nil
1421-
in foldStream
1422-
defState (yieldk OPTIONAL(pos) backBuf parserk) single stop stream
1310+
-- | Like 'parseBreak' but includes stream position information in the error
1311+
-- messages.
1312+
--
1313+
{-# INLINE parseBreakPos #-}
1314+
parseBreakPos
1315+
:: forall m a b. Monad m
1316+
=> ParserK.ParserK a m b
1317+
-> StreamK m a
1318+
-> m (Either ParseError b, StreamK m a)
1319+
parseBreakPos = Drivers.parseBreakStreamKPos
14231320

14241321
-- | Run a 'ParserK' over a 'StreamK'. Please use 'parseChunks' where possible,
14251322
-- for better performance.

0 commit comments

Comments
 (0)