Skip to content

Commit f905ff7

Browse files
committed
Make the fetch function return a Maybe type in demux-like functions
1 parent 37f9565 commit f905ff7

File tree

4 files changed

+73
-55
lines changed

4 files changed

+73
-55
lines changed

benchmark/Streamly/Benchmark/Data/Fold.hs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
{-# LANGUAGE FlexibleInstances #-}
1111
{-# LANGUAGE RankNTypes #-}
1212
{-# LANGUAGE ScopedTypeVariables #-}
13-
{-# OPTIONS_GHC -fno-warn-warnings-deprecations #-}
1413
{-# OPTIONS_GHC -Wno-orphans #-}
1514

1615
#undef FUSION_CHECK
@@ -48,6 +47,7 @@ import Streamly.Internal.Data.MutArray (MutArray)
4847

4948
import qualified Streamly.Internal.Data.Array as Array
5049
import qualified Streamly.Internal.Data.Fold as FL
50+
import qualified Streamly.Internal.Data.Scanl as Scanl
5151
import qualified Streamly.Internal.Data.Fold as Fold
5252
import qualified Streamly.Internal.Data.Parser as Parser
5353
import qualified Streamly.Internal.Data.Pipe as Pipe
@@ -130,14 +130,14 @@ filter _ = Stream.fold (FL.filter even FL.drain)
130130

131131
{-# INLINE scanMaybe #-}
132132
scanMaybe :: Monad m => Int -> Stream m Int -> m ()
133-
scanMaybe _ = Stream.fold (FL.scanMaybe (FL.filtering even) FL.drain)
133+
scanMaybe _ = Stream.fold (FL.postscanlMaybe (Scanl.filtering even) FL.drain)
134134

135135
{-# INLINE scanMaybe2 #-}
136136
scanMaybe2 :: Monad m => Int -> Stream m Int -> m ()
137137
scanMaybe2 _ =
138138
Stream.fold
139-
$ FL.scanMaybe (FL.filtering even)
140-
$ FL.scanMaybe (FL.filtering odd) FL.drain
139+
$ FL.postscanlMaybe (Scanl.filtering even)
140+
$ FL.postscanlMaybe (Scanl.filtering odd) FL.drain
141141

142142
-------------------------------------------------------------------------------
143143
-- Splitting in two
@@ -423,18 +423,18 @@ partitionByMinM =
423423

424424
{-# INLINE demuxToMap #-}
425425
demuxToMap :: (Monad m, Ord k) =>
426-
(a -> k) -> (a -> m (Fold m a b)) -> Stream m a -> m (Map k b)
427-
demuxToMap f g = Stream.fold (FL.demuxToContainer f g)
426+
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Stream m a -> m (Map k b)
427+
demuxToMap f g = Stream.fold (FL.demuxerToContainer f g)
428428

429429
{-# INLINE demuxToIntMap #-}
430430
demuxToIntMap :: Monad m =>
431-
(a -> Int) -> (a -> m (Fold m a b)) -> Stream m a -> m (IntMap b)
432-
demuxToIntMap f g = Stream.fold (FL.demuxToContainer f g)
431+
(a -> Int) -> (Int -> m (Maybe (Fold m a b))) -> Stream m a -> m (IntMap b)
432+
demuxToIntMap f g = Stream.fold (FL.demuxerToContainer f g)
433433

434434
{-# INLINE demuxToMapIO #-}
435435
demuxToMapIO :: (MonadIO m, Ord k) =>
436-
(a -> k) -> (a -> m (Fold m a b)) -> Stream m a -> m (Map k b)
437-
demuxToMapIO f g = Stream.fold (FL.demuxToContainerIO f g)
436+
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Stream m a -> m (Map k b)
437+
demuxToMapIO f g = Stream.fold (FL.demuxerToContainerIO f g)
438438

439439
{-# INLINE toMap #-}
440440
toMap ::
@@ -506,9 +506,9 @@ o_1_space_serial_elimination :: Int -> [Benchmark]
506506
o_1_space_serial_elimination value =
507507
[ bgroup "elimination"
508508
[ benchIOSink value "drain" (Stream.fold FL.drain)
509-
, benchIOSink value "drainBy" (Stream.fold (FL.drainBy return))
509+
, benchIOSink value "drainBy" (Stream.fold (FL.drainMapM return))
510510
, benchIOSink value "drainN" (Stream.fold (FL.drainN value))
511-
, benchIOSink value "last" (Stream.fold FL.last)
511+
, benchIOSink value "last" (Stream.fold FL.latest)
512512
, benchIOSink value "length" (Stream.fold FL.length)
513513
, benchIOSink value "top" (Stream.fold $ FL.top 10)
514514
, benchIOSink value "bottom" (Stream.fold $ FL.bottom 10)
@@ -523,6 +523,11 @@ o_1_space_serial_elimination value =
523523
value
524524
"mean"
525525
(Stream.fold FL.mean . fmap (fromIntegral :: Int -> Double))
526+
{-
527+
-- These are already benchmarked in streamly-statistics package. If we
528+
-- still want to keep these tests here, perhaps we should move them to a
529+
-- different module so we can remove -fno-warn-warnings-deprecations.
530+
526531
, benchIOSink
527532
value
528533
"variance"
@@ -531,6 +536,7 @@ o_1_space_serial_elimination value =
531536
value
532537
"stdDev"
533538
(Stream.fold FL.stdDev . fmap (fromIntegral :: Int -> Double))
539+
-}
534540
, benchIOSink
535541
value
536542
"mconcat"
@@ -601,15 +607,15 @@ o_1_space_serial_transformation value =
601607
, benchIOSink
602608
value
603609
"fold-scan"
604-
(Stream.fold $ FL.scan FL.sum FL.drain)
610+
(Stream.fold $ FL.scanl Scanl.sum FL.drain)
605611
, benchIOSink
606612
value
607613
"fold-scanMany"
608-
(Stream.fold $ FL.scanMany (FL.take 2 FL.drain) FL.drain)
614+
(Stream.fold $ FL.scanlMany (Scanl.take 2 Scanl.drain) FL.drain)
609615
, benchIOSink
610616
value
611617
"fold-postscan"
612-
(Stream.fold $ FL.postscan FL.sum FL.drain)
618+
(Stream.fold $ FL.postscanl Scanl.sum FL.drain)
613619
]
614620
]
615621

@@ -665,11 +671,11 @@ o_n_heap_serial value =
665671
, bgroup "key-value"
666672
[
667673
benchIOSink value "demuxToMap (64 buckets) [sum, length]"
668-
$ demuxToMap (getKey 64) (getFold . getKey 64)
674+
$ demuxToMap (getKey 64) getFold
669675
, benchIOSink value "demuxToIntMap (64 buckets) [sum, length]"
670-
$ demuxToIntMap (getKey 64) (getFold . getKey 64)
676+
$ demuxToIntMap (getKey 64) getFold
671677
, benchIOSink value "demuxToMapIO (64 buckets) [sum, length]"
672-
$ demuxToMapIO (getKey 64) (getFold . getKey 64)
678+
$ demuxToMapIO (getKey 64) getFold
673679

674680
-- classify: immutable
675681
, benchIOSink value "toMap (64 buckets) sum"
@@ -694,7 +700,7 @@ o_n_heap_serial value =
694700
getKey buckets = (`mod` buckets)
695701

696702
getFold k =
697-
return $ case k of
703+
return $ Just $ case k of
698704
0 -> FL.sum
699705
1 -> FL.length
700706
_ -> FL.length

core/src/Streamly/Internal/Data/Fold/Container.hs

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ demuxGeneric getKey getFold =
354354
{-# INLINE demuxerToContainer #-}
355355
demuxerToContainer :: (Monad m, IsMap f, Traversable f) =>
356356
(a -> Key f)
357-
-> (Key f -> m (Fold m a b))
357+
-> (Key f -> m (Maybe (Fold m a b)))
358358
-> Fold m a (f b)
359359
demuxerToContainer getKey getFold =
360360
Fold (\s a -> Partial <$> step s a) (Partial <$> initial) undefined final
@@ -388,8 +388,10 @@ demuxerToContainer getKey getFold =
388388
let k = getKey a
389389
case IsMap.mapLookup k kv of
390390
Nothing -> do
391-
fld <- getFold k
392-
runFold kv kv1 fld (k, a)
391+
mfld <- getFold k
392+
case mfld of
393+
Nothing -> pure $ Tuple' kv kv1
394+
Just fld -> runFold kv kv1 fld (k, a)
393395
Just f -> runFold kv kv1 f (k, a)
394396

395397
final (Tuple' kv kv1) = do
@@ -408,7 +410,7 @@ demuxerToContainer getKey getFold =
408410
{-# INLINE demuxScanGeneric #-}
409411
demuxScanGeneric :: (Monad m, IsMap f, Traversable f) =>
410412
(a -> Key f)
411-
-> (Key f -> m (Fold m a b))
413+
-> (Key f -> m (Maybe (Fold m a b)))
412414
-> Scanl m a (m (f b), Maybe (Key f, b))
413415
demuxScanGeneric getKey getFold =
414416
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
@@ -439,8 +441,10 @@ demuxScanGeneric getKey getFold =
439441
let k = getKey a
440442
case IsMap.mapLookup k kv of
441443
Nothing -> do
442-
fld <- getFold k
443-
runFold kv fld (k, a)
444+
mfld <- getFold k
445+
case mfld of
446+
Nothing -> pure $ Tuple' kv Nothing
447+
Just fld -> runFold kv fld (k, a)
444448
Just f -> runFold kv f (k, a)
445449

446450
extract (Tuple' kv x) = return (Prelude.mapM f kv, x)
@@ -500,7 +504,7 @@ demux = demuxGeneric
500504
{-# INLINE demuxUsingMap #-}
501505
demuxUsingMap :: (Monad m, Ord k) =>
502506
(a -> k)
503-
-> (k -> m (Fold m a b))
507+
-> (k -> m (Maybe (Fold m a b)))
504508
-> Scanl m a (m (Map k b), Maybe (k, b))
505509
demuxUsingMap = demuxScanGeneric
506510

@@ -512,7 +516,7 @@ demuxUsingMap = demuxScanGeneric
512516
{-# INLINE demuxScan #-}
513517
demuxScan :: (Monad m, Ord k) =>
514518
(a -> k)
515-
-> (k -> m (Fold m a b))
519+
-> (k -> m (Maybe (Fold m a b)))
516520
-> Scanl m a (Maybe (k, b))
517521
demuxScan getKey = fmap snd . demuxUsingMap getKey
518522

@@ -601,7 +605,7 @@ demuxGenericIO getKey getFold =
601605
{-# INLINE demuxerToContainerIO #-}
602606
demuxerToContainerIO :: (MonadIO m, IsMap f, Traversable f) =>
603607
(a -> Key f)
604-
-> (Key f -> m (Fold m a b))
608+
-> (Key f -> m (Maybe (Fold m a b)))
605609
-> Fold m a (f b)
606610
demuxerToContainerIO getKey getFold =
607611
Fold (\s a -> Partial <$> step s a) (Partial <$> initial) undefined final
@@ -647,8 +651,10 @@ demuxerToContainerIO getKey getFold =
647651
let k = getKey a
648652
case IsMap.mapLookup k kv of
649653
Nothing -> do
650-
f <- getFold k
651-
initFold kv kv1 f (k, a)
654+
res <- getFold k
655+
case res of
656+
Nothing -> pure $ Tuple' kv kv1
657+
Just f -> initFold kv kv1 f (k, a)
652658
Just ref -> do
653659
f <- liftIO $ readIORef ref
654660
runFold kv kv1 ref f (k, a)
@@ -675,7 +681,7 @@ demuxerToContainerIO getKey getFold =
675681
{-# INLINE demuxScanGenericIO #-}
676682
demuxScanGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
677683
(a -> Key f)
678-
-> (Key f -> m (Fold m a b))
684+
-> (Key f -> m (Maybe (Fold m a b)))
679685
-> Scanl m a (m (f b), Maybe (Key f, b))
680686
demuxScanGenericIO getKey getFold =
681687
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
@@ -721,8 +727,10 @@ demuxScanGenericIO getKey getFold =
721727
let k = getKey a
722728
case IsMap.mapLookup k kv of
723729
Nothing -> do
724-
f <- getFold k
725-
initFold kv f (k, a)
730+
res <- getFold k
731+
case res of
732+
Nothing -> pure $ Tuple' kv Nothing
733+
Just f -> initFold kv f (k, a)
726734
Just ref -> do
727735
f <- liftIO $ readIORef ref
728736
runFold kv ref f (k, a)
@@ -766,7 +774,7 @@ demuxIO = demuxGenericIO
766774
{-# INLINE demuxUsingMapIO #-}
767775
demuxUsingMapIO :: (MonadIO m, Ord k) =>
768776
(a -> k)
769-
-> (k -> m (Fold m a b))
777+
-> (k -> m (Maybe (Fold m a b)))
770778
-> Scanl m a (m (Map k b), Maybe (k, b))
771779
demuxUsingMapIO = demuxScanGenericIO
772780

@@ -779,7 +787,7 @@ demuxUsingMapIO = demuxScanGenericIO
779787
{-# INLINE demuxScanIO #-}
780788
demuxScanIO :: (MonadIO m, Ord k) =>
781789
(a -> k)
782-
-> (k -> m (Fold m a b))
790+
-> (k -> m (Maybe (Fold m a b)))
783791
-> Scanl m a (Maybe (k, b))
784792
demuxScanIO getKey = fmap snd . demuxUsingMapIO getKey
785793

@@ -839,7 +847,7 @@ demuxToMap = demuxToContainer
839847
--
840848
{-# INLINE demuxerToMap #-}
841849
demuxerToMap :: (Monad m, Ord k) =>
842-
(a -> k) -> (k -> m (Fold m a b)) -> Fold m a (Map k b)
850+
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Fold m a (Map k b)
843851
demuxerToMap = demuxerToContainer
844852

845853
{-# DEPRECATED demuxToContainerIO "Use demuxerToContainerIO instead" #-}
@@ -869,13 +877,13 @@ demuxToMapIO = demuxToContainerIO
869877
--
870878
{-# INLINE demuxerToMapIO #-}
871879
demuxerToMapIO :: (MonadIO m, Ord k) =>
872-
(a -> k) -> (k -> m (Fold m a b)) -> Fold m a (Map k b)
880+
(a -> k) -> (k -> m (Maybe (Fold m a b))) -> Fold m a (Map k b)
873881
demuxerToMapIO = demuxerToContainerIO
874882

875883
{-# INLINE demuxKvToContainer #-}
876884
demuxKvToContainer :: (Monad m, IsMap f, Traversable f) =>
877-
(Key f -> m (Fold m a b)) -> Fold m (Key f, a) (f b)
878-
demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f)
885+
(Key f -> m (Maybe (Fold m a b))) -> Fold m (Key f, a) (f b)
886+
demuxKvToContainer f = demuxerToContainer fst (fmap (fmap (lmap snd)) . f)
879887

880888
-- | Fold a stream of key value pairs using a function that maps keys to folds.
881889
--
@@ -887,8 +895,8 @@ demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f)
887895
--
888896
-- >>> import Data.Map (Map)
889897
-- >>> :{
890-
-- let f "SUM" = return Fold.sum
891-
-- f _ = return Fold.product
898+
-- let f "SUM" = return (Just Fold.sum)
899+
-- f _ = return (Just Fold.product)
892900
-- input = Stream.fromList [("SUM",1),("PRODUCT",2),("SUM",3),("PRODUCT",4)]
893901
-- in Stream.fold (Fold.demuxKvToMap f) input :: IO (Map String Int)
894902
-- :}
@@ -897,7 +905,7 @@ demuxKvToContainer f = demuxerToContainer fst (fmap (lmap snd) . f)
897905
-- /Pre-release/
898906
{-# INLINE demuxKvToMap #-}
899907
demuxKvToMap :: (Monad m, Ord k) =>
900-
(k -> m (Fold m a b)) -> Fold m (k, a) (Map k b)
908+
(k -> m (Maybe (Fold m a b))) -> Fold m (k, a) (Map k b)
901909
demuxKvToMap = demuxKvToContainer
902910

903911
------------------------------------------------------------------------------

core/src/Streamly/Internal/Data/Scanl/Container.hs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ countDistinctInt = fmap (\(Tuple' _ n) -> n) $ foldl' step initial
274274
{-# INLINE demuxGeneric #-}
275275
demuxGeneric :: (Monad m, IsMap f, Traversable f) =>
276276
(a -> Key f)
277-
-> (Key f -> m (Scanl m a b))
277+
-> (Key f -> m (Maybe (Scanl m a b)))
278278
-> Scanl m a (m (f b), Maybe (Key f, b))
279279
demuxGeneric getKey getFold =
280280
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
@@ -309,8 +309,10 @@ demuxGeneric getKey getFold =
309309
let k = getKey a
310310
case IsMap.mapLookup k kv of
311311
Nothing -> do
312-
fld <- getFold k
313-
runFold kv fld (k, a)
312+
mfld <- getFold k
313+
case mfld of
314+
Nothing -> pure $ Tuple' kv Nothing
315+
Just fld -> runFold kv fld (k, a)
314316
Just f -> runFold kv f (k, a)
315317

316318
extract (Tuple' kv x) = return (Prelude.mapM f kv, x)
@@ -336,7 +338,7 @@ demuxGeneric getKey getFold =
336338
{-# INLINE demuxUsingMap #-}
337339
demuxUsingMap :: (Monad m, Ord k) =>
338340
(a -> k)
339-
-> (k -> m (Scanl m a b))
341+
-> (k -> m (Maybe (Scanl m a b)))
340342
-> Scanl m a (m (Map k b), Maybe (k, b))
341343
demuxUsingMap = demuxGeneric
342344

@@ -365,7 +367,7 @@ demuxUsingMap = demuxGeneric
365367
{-# INLINE demux #-}
366368
demux :: (Monad m, Ord k) =>
367369
(a -> k)
368-
-> (k -> m (Scanl m a b))
370+
-> (k -> m (Maybe (Scanl m a b)))
369371
-> Scanl m a (Maybe (k, b))
370372
demux getKey = fmap snd . demuxUsingMap getKey
371373

@@ -380,7 +382,7 @@ demux getKey = fmap snd . demuxUsingMap getKey
380382
{-# INLINE demuxGenericIO #-}
381383
demuxGenericIO :: (MonadIO m, IsMap f, Traversable f) =>
382384
(a -> Key f)
383-
-> (Key f -> m (Scanl m a b))
385+
-> (Key f -> m (Maybe (Scanl m a b)))
384386
-> Scanl m a (m (f b), Maybe (Key f, b))
385387
demuxGenericIO getKey getFold =
386388
Scanl (\s a -> Partial <$> step s a) (Partial <$> initial) extract final
@@ -429,8 +431,10 @@ demuxGenericIO getKey getFold =
429431
let k = getKey a
430432
case IsMap.mapLookup k kv of
431433
Nothing -> do
432-
f <- getFold k
433-
initFold kv f (k, a)
434+
res <- getFold k
435+
case res of
436+
Nothing -> pure $ Tuple' kv Nothing
437+
Just f -> initFold kv f (k, a)
434438
Just ref -> do
435439
f <- liftIO $ readIORef ref
436440
runFold kv ref f (k, a)
@@ -460,7 +464,7 @@ demuxGenericIO getKey getFold =
460464
{-# INLINE demuxUsingMapIO #-}
461465
demuxUsingMapIO :: (MonadIO m, Ord k) =>
462466
(a -> k)
463-
-> (k -> m (Scanl m a b))
467+
-> (k -> m (Maybe (Scanl m a b)))
464468
-> Scanl m a (m (Map k b), Maybe (k, b))
465469
demuxUsingMapIO = demuxGenericIO
466470

@@ -470,7 +474,7 @@ demuxUsingMapIO = demuxGenericIO
470474
{-# INLINE demuxIO #-}
471475
demuxIO :: (MonadIO m, Ord k) =>
472476
(a -> k)
473-
-> (k -> m (Scanl m a b))
477+
-> (k -> m (Maybe (Scanl m a b)))
474478
-> Scanl m a (Maybe (k, b))
475479
demuxIO getKey = fmap snd . demuxUsingMapIO getKey
476480

test/Streamly/Test/Data/Fold.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -596,9 +596,9 @@ foldBreak ls = monadicIO $ do
596596

597597
demux :: Expectation
598598
demux =
599-
let table "SUM" = return Fold.sum
600-
table "PRODUCT" = return Fold.product
601-
table _ = return Fold.length
599+
let table "SUM" = return $ Just Fold.sum
600+
table "PRODUCT" = return $ Just Fold.product
601+
table _ = return $ Just Fold.length
602602
input = Stream.fromList (
603603
[ ("SUM", 1)
604604
, ("abc", 1)

0 commit comments

Comments
 (0)