Skip to content

Commit 324a4a2

Browse files
committed
Add parallel reducers
Wildly incomplete.
1 parent 9610fbf commit 324a4a2

18 files changed

+729
-0
lines changed

streaming/reducer2/base.rkt

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#lang racket
2+
3+
4+
(provide
5+
(contract-out
6+
#:unprotected-submodule no-contract
7+
[reducer? (-> any/c boolean?)]
8+
[make-reducer
9+
(->* (#:starter (-> any/c)
10+
#:accumulator (-> any/c any/c any/c))
11+
(#:finisher (-> any/c any/c)
12+
#:termination-checker (or/c (-> any/c boolean?) #false)
13+
#:cloner (or/c (-> any/c any/c) #false)
14+
#:merger (or/c (-> any/c any/c any/c) #false)
15+
#:ordered? boolean?
16+
#:concurrent? boolean?)
17+
reducer?)]
18+
[reducer-starter (-> reducer? (-> any/c))]
19+
[reducer-accumulator (-> reducer? (-> any/c any/c any/c))]
20+
[reducer-finisher (-> reducer? (-> any/c any/c))]
21+
[reducer-termination-checker (-> reducer? (or/c (-> any/c boolean?) #false))]
22+
[reducer-cloner (-> reducer? (or/c (-> any/c any/c) #false))]
23+
[reducer-merger (-> reducer? (or/c (-> any/c any/c any/c) #false))]
24+
[reducer-ordered? (-> reducer? boolean?)]
25+
[reducer-concurrent? (-> reducer? boolean?)]))
26+
27+
28+
29+
(struct reducer
30+
(starter ; (-> S)
31+
accumulator ; (-> S E S)
32+
finisher ; (-> S R)
33+
termination-checker ; (or/c (-> S boolean?) #false)
34+
cloner ; (or/c (-> S S) #false)
35+
merger ; (or/c (-> S S S) #false)
36+
ordered? ; boolean?
37+
concurrent?)) ; boolean?
38+
39+
40+
(define (make-reducer #:starter starter
41+
#:accumulator accumulator
42+
#:finisher [finisher values]
43+
#:termination-checker [termination-checker #false]
44+
#:cloner [cloner #false]
45+
#:merger [merger #false]
46+
#:ordered? [ordered? #true]
47+
#:concurrent? [concurrent? #false])
48+
(reducer starter
49+
accumulator
50+
finisher
51+
termination-checker
52+
cloner
53+
merger
54+
ordered?
55+
concurrent?))
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#lang racket
2+
3+
4+
(define nondeterminism-prompt-tag (make-continuation-prompt-tag))
5+
6+
7+
(define-syntax-rule (all-possible-results body ...)
8+
(call-with-nondeterminism (λ () body ...)))
9+
10+
11+
(define (call-with-nondeterminism proc)
12+
(call-set-producer-with-nondeterminism
13+
(λ () (set (proc)))))
14+
15+
16+
(define (call-set-producer-with-nondeterminism set-producer)
17+
(call-with-continuation-prompt
18+
set-producer
19+
nondeterminism-prompt-tag
20+
(λ (k)
21+
(set-union
22+
(call-set-producer-with-nondeterminism (λ () (k #true)))
23+
(call-set-producer-with-nondeterminism (λ () (k #false)))))))
24+
25+
26+
(define (flip)
27+
(call-with-composable-continuation
28+
(λ (k) (abort-current-continuation nondeterminism-prompt-tag k))
29+
nondeterminism-prompt-tag))
30+
31+
32+
#;(all-possible-results
33+
(list (if (flip) 'a 'b)
34+
(if (flip) 'a 'b)))
35+
36+
37+
(define (dynamic-log proc)
38+
(dynamic-wind
39+
(λ () (displayln "going in"))
40+
proc
41+
(λ () (displayln "coming out"))))
42+
43+
44+
#;(all-possible-results
45+
(dynamic-log
46+
(λ ()
47+
(for/sum ([_ (in-range 3)])
48+
(if (flip) 1 2)))))
49+
50+
51+
(define (flip-sum n)
52+
(define total (box 0))
53+
(for ([_ (in-range n)])
54+
(set-box! total (+ (if (flip) 1 2) (unbox total))))
55+
(unbox total))
56+
57+
58+
#;(all-possible-results
59+
(dynamic-log
60+
(λ ()
61+
(flip-sum 3))))
62+
63+
64+
(module+ main
65+
(all-possible-results
66+
(define total #f)
67+
(define stack (list 0))
68+
(define done? #false)
69+
(dynamic-wind
70+
(λ ()
71+
(set! total (first stack))
72+
(set! done? #false))
73+
(λ ()
74+
(for ([_ (in-range 3)])
75+
(define next (if (flip) 1 2))
76+
(set! total (+ next total)))
77+
(set! done? #true)
78+
(set! stack (rest stack))
79+
total)
80+
(λ ()
81+
(unless done?
82+
(set! stack (cons total stack)))))))
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#lang racket/base
2+
3+
4+
(require racket/contract/base)
5+
6+
7+
(provide
8+
(contract-out
9+
#:unprotected-submodule unchecked
10+
[into-any-match? (-> (-> any/c boolean?) reducer?)]))
11+
12+
13+
(require (submod rebellion/streaming/reducer2/base no-contract)
14+
rebellion/streaming/reducer2/reducer-map
15+
rebellion/streaming/reducer2/into-binary-fold)
16+
17+
18+
;@----------------------------------------------------------------------------------------------------
19+
20+
21+
(define (into-any-match? predicate)
22+
(reducer-map (into-binary-fold (λ (a b) (or a b)) #false
23+
#:termination-checker values
24+
#:ordered? #false)
25+
#:domain predicate))
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#lang racket/base
2+
3+
4+
(require (submod rebellion/streaming/reducer2/base no-contract))
5+
6+
7+
(provide into-binary-fold)
8+
9+
10+
(define (into-binary-fold f x
11+
#:finisher [finisher values]
12+
#:ordered? [ordered? #true]
13+
#:termination-checker [termination-checker #false])
14+
(make-reducer #:starter (λ () x)
15+
#:accumulator (λ (s e) (f e s))
16+
#:finisher finisher
17+
#:termination-checker termination-checker
18+
#:merger f
19+
#:cloner values
20+
#:ordered? ordered?
21+
#:concurrent? #true))

streaming/reducer2/into-list.rkt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#lang racket
2+
3+
4+
(require rebellion/streaming/reducer2/base)
5+
6+
7+
(define into-list
8+
(make-reducer #:starter (λ () '())
9+
#:accumulator (λ (s e) (cons e s))
10+
#:finisher reverse
11+
#:cloner values
12+
#:merger (λ (left right) (append right left))
13+
#:concurrent? #true))
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#lang racket/base
2+
3+
4+
(require rebellion/streaming/reducer2/base
5+
rebellion/streaming/reducer2/into-binary-fold)
6+
7+
8+
(provide into-product)
9+
10+
11+
(define into-product (into-binary-fold * 1 #:ordered? #false))

streaming/reducer2/into-sum.rkt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#lang racket/base
2+
3+
4+
(require racket/unsafe/ops
5+
rebellion/streaming/reducer2/into-binary-fold)
6+
7+
8+
(provide into-sum)
9+
10+
11+
(define into-sum (into-binary-fold unsafe-fx+ 0 #:ordered? #false))
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#lang racket/base
2+
3+
(require racket/unsafe/ops
4+
rebellion/streaming/reducer2/into-sum
5+
rebellion/streaming/reducer2/vector-reduce)
6+
7+
(let ([vec (build-vector 100000000 values)])
8+
(time (vector-parallel-reduce vec into-sum)))
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#lang racket/base
2+
3+
4+
(require rebellion/streaming/reducer2/parallel-reduce
5+
rebellion/streaming/reducer2/into-sum)
6+
7+
8+
(collect-garbage)
9+
(collect-garbage)
10+
(collect-garbage)
11+
12+
13+
(let ([vec (build-vector 100000000 values)])
14+
(time (parallel-reduce vec into-sum)))
15+
16+
17+
(collect-garbage)
18+
(collect-garbage)
19+
(collect-garbage)
20+
21+
22+
(let ([vec (build-vector 100000000 values)])
23+
(time (for/sum ([x (in-vector vec)]) x)))
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#lang racket
2+
3+
4+
5+
(require rebellion/streaming/reducer2/parallel-reduce
6+
rebellion/streaming/reducer2/sequential-reduce
7+
(submod rebellion/streaming/reducer2/into-any-match unchecked))
8+
9+
10+
11+
(define size 1000000000)
12+
13+
(collect-garbage)
14+
(collect-garbage)
15+
(collect-garbage)
16+
17+
(let ([vec (build-vector size values)])
18+
(time (parallel-reduce vec (into-any-match? (λ (x) (> x (/ size 2)))))))
19+
20+
(collect-garbage)
21+
(collect-garbage)
22+
(collect-garbage)
23+
24+
(let ([vec (build-vector size values)])
25+
(time (for/or ([x (in-vector vec)]) (> x (/ size 2)))))
26+
27+
28+
(collect-garbage)
29+
(collect-garbage)
30+
(collect-garbage)
31+
32+
(let ([vec (build-vector size values)])
33+
(time (sequential-reduce vec (into-any-match? (λ (x) (> x (/ size 2)))))))

0 commit comments

Comments
 (0)