Skip to content

Commit 529485d

Browse files
committed
Add slidingWindowSum
`slidingWindow` typically helps write only functions with complexity around `O(n*k)`, where `n` is the number of elements in the stream and `k` is the size of the window. In many cases, this can be reduced to `O(n)` by looking not at the window itself but instead the sum of that window in some `Semigroup`. This can be used, for example, to implement moving averages such as arithmetic, geometric, or harmonic means.
1 parent d2685a5 commit 529485d

File tree

4 files changed

+245
-5
lines changed

4 files changed

+245
-5
lines changed

src/Data/AnnotatedQueue.hs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
{-# language FunctionalDependencies, ScopedTypeVariables, FlexibleInstances,
2+
BangPatterns, UndecidableInstances #-}
3+
4+
-- | An implementation of Okasaki's implicit queues holding elements of some
5+
-- semigroup. We track the sum of them all.
6+
module Data.AnnotatedQueue
7+
( Queue
8+
, ViewL (..)
9+
, empty
10+
, viewl
11+
, drop1
12+
, singleton
13+
, snoc
14+
, measure
15+
) where
16+
17+
import Data.Semigroup (Semigroup (..))
18+
19+
data FDigit a = FOne !a | FTwo !a !a
20+
data RDigit a = RZero | ROne !a
21+
data Node s a = Node !s !a !a
22+
23+
newtype Queue s = Queue (Tree s (Elem s))
24+
instance Semigroup s => Semigroup (Queue s) where
25+
(!t) <> u = case viewl u of
26+
EmptyL -> t
27+
ViewL x xs -> (t `snoc` x) <> xs
28+
instance Semigroup s => Monoid (Queue s) where
29+
mempty = empty
30+
mappend = (<>)
31+
32+
newtype Elem a = Elem a
33+
34+
-- Debit invariant (Okasaki): the middle tree of
35+
-- a Deep node is allowed |pr| - |sf| debits, where
36+
-- pr is the prefix and sf is the suffix.
37+
data Tree s a
38+
= Zero
39+
| One !a
40+
| Two !a !a
41+
| Deep !s !(FDigit a) (Tree s (Node s a)) !(RDigit a)
42+
43+
empty :: Queue s
44+
empty = Queue Zero
45+
46+
singleton :: s -> Queue s
47+
singleton = Queue . One . Elem
48+
49+
snoc :: Semigroup s => Queue s -> s -> Queue s
50+
snoc (Queue t) s = Queue (snocTree t (Elem s))
51+
{-# INLINABLE snoc #-}
52+
53+
measure :: Semigroup s => Queue s -> Maybe s
54+
measure (Queue q) = case q of
55+
Zero -> Nothing
56+
One a -> Just (measure_ a)
57+
Two a b -> Just (measure_ a <> measure_ b)
58+
Deep s _ _ _ -> Just s
59+
{-# INLINABLE measure #-}
60+
61+
class Measurable s a | a -> s where
62+
measure_ :: a -> s
63+
instance Measurable s (Elem s) where
64+
measure_ (Elem x) = x
65+
instance Measurable s (Node s a) where
66+
measure_ (Node s _ _) = s
67+
instance (Semigroup s, Measurable s a) => Measurable s (FDigit a) where
68+
measure_ (FOne a) = measure_ a
69+
measure_ (FTwo a b) = measure_ a <> measure_ b
70+
class SemiMeasurable s a | a -> s where
71+
semimeasure :: s -> a -> s
72+
instance (Semigroup s, Measurable s a) => SemiMeasurable s (RDigit a) where
73+
semimeasure s RZero = s
74+
semimeasure s (ROne a) = s <> measure_ a
75+
instance (Semigroup s, Measurable s a)
76+
=> SemiMeasurable s (Tree s a) where
77+
semimeasure s Zero = s
78+
semimeasure s (One a) = s <> measure_ a
79+
semimeasure s (Two a b) = s <> measure_ a <> measure_ b
80+
semimeasure s (Deep t _ _ _) = s <> t
81+
82+
node
83+
:: (Semigroup s, Measurable s a)
84+
=> a -> a -> Node s a
85+
node a b = Node (measure_ a <> measure_ b) a b
86+
{-# INLINABLE node #-}
87+
88+
deep :: (Semigroup s, Measurable s a) => FDigit a -> Tree s (Node s a) -> RDigit a -> Tree s a
89+
deep pr m sf = Deep (measure_ pr `semimeasure` m `semimeasure` sf) pr m sf
90+
{-# INLINABLE deep #-}
91+
92+
snocTree :: (Measurable s a, Semigroup s) => Tree s a -> a -> Tree s a
93+
-- Note: in the last case we depart slightly from Okasaki. Following Hinze
94+
-- and Paterson, we force the *old* middle immediately to prevent a chain of
95+
-- thunks from accumulating in case of multiple sequential snocs.
96+
snocTree Zero a = One a
97+
snocTree (One a) b = Two a b
98+
snocTree (Two a b) c = Deep (measure_ a <> measure_ b <> measure_ c) (FTwo a b) Zero (ROne c)
99+
snocTree (Deep s pr m RZero) q = Deep (s <> measure_ q) pr m (ROne q)
100+
snocTree (Deep s pr !m (ROne p)) !q
101+
= Deep (s <> measure_ q) pr (snocTree m (node p q)) RZero
102+
{-# INLINABLE snocTree #-}
103+
104+
{-
105+
Theorem: snocTree is O(1) and preserves the debit invariant.
106+
107+
Proof:
108+
109+
We show that snocTree costs at most 2 units of work.
110+
111+
Reminder: The debit invariant allows the middle tree of a Deep
112+
node |pr| - |sf| debits.
113+
114+
The first two cases are trivial as they don't involve any
115+
middle trees.
116+
117+
In the third case, the debit allowance on `m`
118+
drops by 1. We do 1 unit of unshared work and pay off one debit
119+
on `m`, for a total of 2 units of work.
120+
121+
In the last case, we have two possibilities, depending on the prefix:
122+
123+
1. The prefix has one element. Then the debit allowance on `m` is 0. We force `m`
124+
(for free). We do 1 unit of unshared work. We create a suspension for the
125+
recursive call and place 2 debits on it to pay for that. Since the debit
126+
allowance for the suspension only allows 1 debit, we pay one of them off now.
127+
So the amortized cost is 2.
128+
129+
2. The prefix has two elements. Then the debit allowance on `m` is 1. We pay off
130+
that debit and force `m`. We do 1 unit of unshared work. We create a suspension
131+
for the recursive call and place 2 debits on it. This is within the debit allowance
132+
for the result. So the amortized cost is 2.
133+
-}
134+
135+
data ViewL s = EmptyL | ViewL !s (Queue s)
136+
data ViewLTree s a = EmptyLTree | ViewLTree !a (Tree s a)
137+
138+
viewl :: Semigroup s => Queue s -> ViewL s
139+
viewl (Queue q) = case viewlTree q of
140+
EmptyLTree -> EmptyL
141+
ViewLTree (Elem s) q' -> ViewL s (Queue q')
142+
{-# INLINABLE viewl #-}
143+
144+
viewlTree :: (Semigroup s, Measurable s a) => Tree s a -> ViewLTree s a
145+
viewlTree Zero = EmptyLTree
146+
viewlTree (One a) = ViewLTree a Zero
147+
viewlTree (Two a b) = ViewLTree a (One b)
148+
viewlTree (Deep _ (FTwo a b) m sf) = ViewLTree a (deep (FOne b) m sf)
149+
viewlTree (Deep _ (FOne a) m sf) = ViewLTree a $ case viewlTree m of
150+
EmptyLTree -> case sf of
151+
RZero -> Zero
152+
ROne b -> One b
153+
ViewLTree (Node p b c) m' -> Deep (p `semimeasure` m' `semimeasure` sf) (FTwo b c) m' sf
154+
{-# INLINABLE viewlTree #-}
155+
156+
-- See Okasaki PFDS Theorem 11.1 for the proof that `viewlTree` takes O(1)
157+
-- amortized time.
158+
159+
drop1 :: Semigroup s => Queue s -> Queue s
160+
drop1 q = case viewl q of
161+
EmptyL -> empty
162+
ViewL _ q' -> q'
163+
{-
164+
-- We could expand out the upper layer to avoid an unnecessary view allocation.
165+
-- Is that worth the extra code size?
166+
drop1 (Queue q) = Queue $ case q of
167+
Zero -> Zero
168+
One _ -> Zero
169+
Two _ b -> One b
170+
Deep _ (FTwo _ b) m sf -> deep (FOne b) m sf
171+
Deep _ (FOne _) m sf -> case viewlTree m of
172+
EmptyLTree -> case sf of
173+
RZero -> Zero
174+
ROne b -> One b
175+
ViewLTree (Node p b c) m' -> Deep (p `semimeasure` m' `semimeasure` sf) (FTwo b c) m' sf
176+
-}
177+
{-# INLINABLE drop1 #-}

src/Streaming/Prelude.hs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ module Streaming.Prelude (
134134
, show
135135
, cons
136136
, slidingWindow
137+
, slidingWindowSum
137138
, slidingWindowMin
138139
, slidingWindowMinBy
139140
, slidingWindowMinOn
@@ -272,8 +273,10 @@ import Data.Functor.Of
272273
import Data.Functor.Sum
273274
import Data.Monoid (Monoid (mappend, mempty))
274275
import Data.Ord (Ordering (..), comparing)
276+
import Data.Semigroup (Semigroup (..))
275277
import Foreign.C.Error (Errno(Errno), ePIPE)
276278
import Text.Read (readMaybe)
279+
import qualified Data.AnnotatedQueue as AQ
277280
import qualified Data.Foldable as Foldable
278281
import qualified Data.IntSet as IntSet
279282
import qualified Data.Sequence as Seq
@@ -2846,7 +2849,7 @@ mapMaybe phi = loop where
28462849
{-# INLINABLE mapMaybe #-}
28472850

28482851
{-| 'slidingWindow' accumulates the first @n@ elements of a stream,
2849-
update thereafter to form a sliding window of length @n@.
2852+
updating thereafter to form a sliding window of length @n@.
28502853
It follows the behavior of the slidingWindow function in
28512854
<https://hackage.haskell.org/package/conduit-combinators-1.0.4/docs/Data-Conduit-Combinators.html#v:slidingWindow conduit-combinators>.
28522855
@@ -2880,6 +2883,33 @@ slidingWindow n = setup (max 1 n :: Int) mempty
28802883
Right (x,rest) -> setup (m-1) (sequ Seq.|> x) rest
28812884
{-# INLINABLE slidingWindow #-}
28822885

2886+
{-| 'slidingWindowSum' accumulates the first @n@ elements of a stream
2887+
with elements in some 'Semigroup',
2888+
updating thereafter to form a sliding window of length @n@.
2889+
-}
2890+
slidingWindowSum :: (Monad m, Semigroup a)
2891+
=> Int
2892+
-> Stream (Of a) m b
2893+
-> Stream (Of a) m b
2894+
slidingWindowSum n = setup (max 1 n) AQ.empty
2895+
where
2896+
window !qu str = do
2897+
case AQ.measure qu of
2898+
Just s -> yield s
2899+
Nothing -> pure ()
2900+
e <- lift (next str)
2901+
case e of
2902+
Left r -> return r
2903+
Right (a,rest) ->
2904+
window (AQ.drop1 $ qu `AQ.snoc` a) rest
2905+
setup 0 !qu str = window qu str
2906+
setup m !qu str = do
2907+
e <- lift $ next str
2908+
case e of
2909+
Left r -> window qu (return r)
2910+
Right (x,rest) -> setup (m-1) (qu `AQ.snoc` x) rest
2911+
{-# INLINABLE slidingWindowSum #-}
2912+
28832913
-- | 'slidingWindowMin' finds the minimum in every sliding window of @n@
28842914
-- elements of a stream. If within a window there are multiple elements that are
28852915
-- the least, it prefers the first occurrence (if you prefer to have the last

streaming.cabal

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ library
204204
, Streaming.Prelude
205205
, Streaming.Internal
206206
, Data.Functor.Of
207+
other-modules:
208+
Data.AnnotatedQueue
207209
build-depends:
208210
base >=4.8 && <5
209211
, mtl >=2.1 && <2.3

test/test.hs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,43 @@
1+
{-# LANGUAGE ScopedTypeVariables #-}
12
module Main where
23

34
import qualified Data.Foldable as Foldable
45
import Data.Functor.Identity
56
import Data.Ord
7+
import qualified Data.Semigroup as DS
68
import qualified Streaming.Prelude as S
79
import Test.Hspec
810
import Test.QuickCheck
911

1012
toL :: S.Stream (S.Of a) Identity b -> [a]
1113
toL = runIdentity . S.toList_
1214

13-
main :: IO ()
14-
main =
15-
hspec $ do
15+
slidingWindowMin_spec :: SpecWith ()
16+
slidingWindowMin_spec =
1617
describe "slidingWindowMin" $ do
1718
it "works with a few simple cases" $ do
1819
toL (S.slidingWindowMin 2 (S.each [1, 3, 9, 4, 6, 4])) `shouldBe` [1, 3, 4, 4, 4]
1920
toL (S.slidingWindowMin 3 (S.each [1, 3, 2, 6, 3, 7, 8, 9])) `shouldBe` [1, 2, 2, 3, 3, 7]
2021
it "produces no results with empty streams" $
2122
property $ \k -> toL (S.slidingWindowMin k (mempty :: S.Stream (S.Of Int) Identity ())) `shouldBe` []
22-
it "behaves like a (S.map Foldable.minimum) (slidingWindow) for non-empty streams" $
23+
it "behaves like (S.map Foldable.minimum . slidingWindow k) for non-empty streams" $
2324
property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.minimum crashes on empty lists
2425
->
2526
toL (S.slidingWindowMin k (S.each xs)) ===
2627
toL (S.map Foldable.minimum (S.slidingWindow k (S.each (xs :: [Int]))))
28+
it "behaves like (S.map getMin . slidingWindowSum . S.map Min)" $
29+
property $ \(xs :: [Int]) k
30+
->
31+
toL (S.slidingWindowMin k (S.each xs)) ===
32+
toL (S.map DS.getMin $ S.slidingWindowSum k $ S.map DS.Min $ S.each xs)
2733
it "behaves like identity when window size is 1" $
2834
property $ \xs -> toL (S.slidingWindowMin 1 (S.each (xs :: [Int]))) === xs
2935
it "produces a prefix when the stream elements are sorted" $
3036
property $ \(Sorted xs) k ->
3137
(length xs >= k) ==> (toL (S.slidingWindowMin k (S.each (xs :: [Int]))) === take (length xs - (k - 1)) xs)
38+
39+
slidingWindowMinBy_spec :: SpecWith ()
40+
slidingWindowMinBy_spec =
3241
describe "slidingWindowMinBy" $ do
3342
it "prefers earlier elements when several elements compare equal" $ do
3443
toL (S.slidingWindowMinBy (comparing fst) 2 (S.each [(1, 1), (2, 2), (2, 3), (2, 4)])) `shouldBe`
@@ -38,6 +47,9 @@ main =
3847
->
3948
toL (S.slidingWindowMinBy (comparing fst) k (S.each xs)) ===
4049
toL (S.map (Foldable.minimumBy (comparing fst)) (S.slidingWindow k (S.each (xs :: [(Int, Int)]))))
50+
51+
slidingWindowMinOn_spec :: SpecWith ()
52+
slidingWindowMinOn_spec =
4153
describe "slidingWindowMinOn" $ do
4254
it "behaves like a (S.map (Foldable.minimumBy (comparing p))) (slidingWindow) for non-empty streams" $ do
4355
property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.minimumBy crashes on empty lists
@@ -49,6 +61,9 @@ main =
4961
(length xs >= k) ==>
5062
(toL (S.slidingWindowMinOn (const (undefined :: UnitWithLazyEq)) k (S.each (xs :: [Int]))) ===
5163
take (length xs - (k - 1)) xs)
64+
65+
slidingWindowMax_spec :: SpecWith ()
66+
slidingWindowMax_spec =
5267
describe "slidingWindowMax" $ do
5368
it "produces no results with empty streams" $
5469
property $ \k -> toL (S.slidingWindowMax k (mempty :: S.Stream (S.Of Int) Identity ())) `shouldBe` []
@@ -62,6 +77,9 @@ main =
6277
it "produces a suffix when the stream elements are sorted" $
6378
property $ \(Sorted xs) k ->
6479
(length xs >= k) ==> (toL (S.slidingWindowMax k (S.each (xs :: [Int]))) === drop (k - 1) xs)
80+
81+
slidingWindowMaxBy_spec :: SpecWith ()
82+
slidingWindowMaxBy_spec =
6583
describe "slidingWindowMaxBy" $ do
6684
it "prefers later elements when several elements compare equal" $ do
6785
toL (S.slidingWindowMaxBy (comparing fst) 2 (S.each [(1, 1), (2, 2), (2, 3), (2, -900)])) `shouldBe`
@@ -71,6 +89,9 @@ main =
7189
->
7290
toL (S.slidingWindowMaxBy (comparing fst) k (S.each xs)) ===
7391
toL (S.map (Foldable.maximumBy (comparing fst)) (S.slidingWindow k (S.each (xs :: [(Int, Int)]))))
92+
93+
slidingWindowMaxOn_spec :: SpecWith ()
94+
slidingWindowMaxOn_spec =
7495
describe "slidingWindowMaxOn" $ do
7596
it "behaves like a (S.map (Foldable.maximumBy (comparing p))) (slidingWindow) for non-empty streams" $ do
7697
property $ \(NonEmpty xs) k -- we use NonEmpty because Foldable.maximumBy crashes on empty lists
@@ -82,6 +103,16 @@ main =
82103
(length xs >= k) ==>
83104
(toL (S.slidingWindowMaxOn (const (undefined :: UnitWithLazyEq)) k (S.each (xs :: [Int]))) === drop (k - 1) xs)
84105

106+
main :: IO ()
107+
main =
108+
hspec $ do
109+
slidingWindowMin_spec
110+
slidingWindowMinBy_spec
111+
slidingWindowMinOn_spec
112+
slidingWindowMax_spec
113+
slidingWindowMaxBy_spec
114+
slidingWindowMaxOn_spec
115+
85116
data UnitWithLazyEq = UnitWithLazyEq
86117

87118
instance Eq UnitWithLazyEq where

0 commit comments

Comments
 (0)