Skip to content

Commit c7074e8

Browse files
authored
Merge pull request #215 from clj-commons/feature/add-window-fns
Add windowing streams
2 parents 5eccf71 + befdf8e commit c7074e8

File tree

2 files changed

+67
-0
lines changed

2 files changed

+67
-0
lines changed

src/manifold/stream.clj

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,6 +1105,60 @@
11051105

11061106
(source-only s'))))
11071107

1108+
(defn dropping-stream
1109+
"Creates a new stream with a buffer of size `n`, which will drop
1110+
incoming items when full.
1111+
1112+
If `source` is supplied, inserts a dropping stream after `source`
1113+
with the provided capacity."
1114+
([n]
1115+
(let [in (stream)
1116+
out (dropping-stream n in)]
1117+
(splice in out)))
1118+
([n source]
1119+
(let [sink (stream n)]
1120+
(connect-via
1121+
source
1122+
(fn [val]
1123+
(d/let-flow [put-result (try-put! sink val 0 :timeout)]
1124+
(case put-result
1125+
true true
1126+
false false
1127+
:timeout true)))
1128+
sink
1129+
{:upstream? true
1130+
:downstream? true})
1131+
sink)))
1132+
1133+
(defn sliding-stream
1134+
"Creates a new stream with a buffer of size `n`, which will drop
1135+
the oldest items when full to make room for new items.
1136+
1137+
If `source` is supplied, inserts a sliding stream after `source`
1138+
with the provided capacity."
1139+
([n]
1140+
(let [in (stream)
1141+
out (sliding-stream n in)]
1142+
(splice in out)))
1143+
([n source]
1144+
(let [sink (stream n)]
1145+
(connect-via
1146+
source
1147+
(fn [val]
1148+
(d/loop []
1149+
(d/chain
1150+
(try-put! sink val 0 :timeout)
1151+
(fn [put-result]
1152+
(case put-result
1153+
true true
1154+
false false
1155+
:timeout (d/chain (take! sink)
1156+
(fn [_] (d/recur))))))))
1157+
sink
1158+
{:upstream? true
1159+
:downstream? true})
1160+
sink)))
1161+
11081162
;;;
11091163

11101164
(alter-meta! #'->Callback assoc :private true)

test/manifold/stream_test.clj

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,19 @@
465465
(is (s/closed? sink))
466466
(is (s/closed? src))))
467467

468+
(deftest test-window-streams
469+
(testing "dropping-stream"
470+
(let [s (s/->source (range 11))
471+
sliding-s (s/dropping-stream 10 s)]
472+
(is (= (range 10)
473+
(s/stream->seq sliding-s)))))
474+
475+
(testing "sliding-stream"
476+
(let [s (s/->source (range 11))
477+
sliding-s (s/sliding-stream 10 s)]
478+
(is (= (range 1 11)
479+
(s/stream->seq sliding-s))))) )
480+
468481
;;;
469482

470483
(deftest ^:stress stress-buffered-stream

0 commit comments

Comments
 (0)