Skip to content

Commit 0078198

Browse files
committed
Merge pull request #243 from pixie-lang/csp
CSP anyone?
2 parents fed04da + 3614471 commit 0078198

File tree

8 files changed

+560
-2
lines changed

8 files changed

+560
-2
lines changed

pixie/buffers.pxi

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
(ns pixie.buffers)
2+
3+
(defn acopy [src src-start dest dest-start len]
4+
(loop [cnt 0]
5+
(when (< cnt len)
6+
(aset dest
7+
(+ dest-start cnt)
8+
(aget src (+ src-start cnt)))
9+
(recur (inc cnt)))))
10+
11+
12+
(defprotocol IMutableBuffer
13+
(remove! [this])
14+
(add! [this])
15+
(full? [this]))
16+
17+
(defprotocol IResizableMutableBuffer
18+
(add-unbounded! [this val])
19+
(resize! [this new-size]))
20+
21+
(deftype RingBuffer [head tail length arr]
22+
IMutableBuffer
23+
(remove! [this]
24+
(when-not (zero? length)
25+
(let [x (aget arr tail)]
26+
(aset arr tail nil)
27+
(set-field! this :tail (int (rem (inc tail) (alength arr))))
28+
(set-field! this :length (dec length))
29+
x)))
30+
(add! [this x]
31+
(assert (< length (alength arr)))
32+
(aset arr head x)
33+
(set-field! this :head (int (rem (inc head) (alength arr))))
34+
(set-field! this :length (inc length))
35+
nil)
36+
37+
(full? [this]
38+
(= length (alength arr)))
39+
40+
41+
IResizableMutableBuffer
42+
(resize! [this new-size]
43+
(let [new-arr (make-array new-size)]
44+
(cond
45+
(< tail head)
46+
(do (acopy arr tail new-arr 0 length)
47+
(set-field! this :tail 0)
48+
(set-field! this :head length)
49+
(set-field! this :arr new-arr))
50+
51+
(> tail head)
52+
(do (acopy arr tail new-arr 0 (- (alength arr) tail))
53+
(acopy arr 0 new-arr (- (alength arr) tail) head)
54+
(set-field! this :tail 0)
55+
(set-field! this :head length)
56+
(set-field! this :arr new-arr))
57+
58+
59+
(full? this)
60+
(do (acopy arr tail new-arr 0 length)
61+
(set-field! this :tail 0)
62+
(set-field! this :head length)
63+
(set-field! this :arr new-arr))
64+
65+
66+
:else
67+
(do (set-field! this :tail 0)
68+
(set-field! this :head 0)
69+
(set-field! this :arr new-arr)))))
70+
71+
(add-unbounded! [this val]
72+
(when (full? this)
73+
(resize! this (* 2 length)))
74+
(add! this val))
75+
76+
ICounted
77+
(-count [this]
78+
length))
79+
80+
81+
(defn ring-buffer [size]
82+
(assert (> size 0) "Can't create a ring buffer of size <= 0")
83+
(->RingBuffer 0 0 0 (make-array size)))
84+
85+
86+
(defn fixed-buffer [size]
87+
(ring-buffer size))
88+
89+
90+
(deftype DroppingBuffer [buf]
91+
IMutableBuffer
92+
(full? [this]
93+
false)
94+
(remove! [this]
95+
(remove! buf))
96+
(add! [this val]
97+
(when-not (full? buf)
98+
(add! buf val)))
99+
100+
ICounted
101+
(-count [this]
102+
(count buf)))
103+
104+
(defn dropping-buffer [size]
105+
(->DroppingBuffer (ring-buffer size)))
106+
107+
108+
(deftype SlidingBuffer [buf]
109+
IMutableBuffer
110+
(full? [this]
111+
false)
112+
(remove! [this]
113+
(remove! buf))
114+
(add! [this val]
115+
(when (full? buf)
116+
(remove! buf))
117+
(add! buf val))
118+
119+
ICounted
120+
(-count [this]
121+
(count buf)))
122+
123+
(defn sliding-buffer [size]
124+
(->SlidingBuffer (ring-buffer size)))
125+
126+
(defn empty-buffer? [buf]
127+
(= (count buf) 0))
128+
129+
130+
(deftype NullBuffer []
131+
IMutableBuffer
132+
(full? [this]
133+
true)
134+
ICounted
135+
(-count [this] 0))
136+
137+
(def null-buffer (->NullBuffer))
138+
139+
(extend -reduce IMutableBuffer
140+
(fn [buf f acc]
141+
(loop [acc acc]
142+
(if (reduced? acc)
143+
@acc
144+
(if (pos? (count buf))
145+
(recur (f acc (remove! buf)))
146+
acc)))))

pixie/channels.pxi

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
(ns pixie.channels
2+
(require pixie.stacklets :as st)
3+
(require pixie.buffers :as b))
4+
5+
(defprotocol ICancelable
6+
(-canceled? [this] "Determines if a request (such as a callback) that can be canceled")
7+
(-commit! [this]))
8+
9+
(defprotocol IReadPort
10+
(-take! [this cfn] "Take a value from this port passing it to a cancellable function"))
11+
12+
(defprotocol IWritePort
13+
(-put! [this itm cfn] "Write a value to this port passing true if the write succeeds and the
14+
callback isn't canceled"))
15+
16+
(defprotocol ICloseable
17+
(-close! [this] "Closes the channel, future writes will be rejected, future reads will
18+
drain the channel before returning nil."))
19+
20+
(deftype OpCell [val cfn]
21+
IIndexed
22+
(-nth [this idx]
23+
(cond
24+
(= idx 0) val
25+
(= idx 1) cfn
26+
:else (throw "Index out of range")))
27+
(-nth-not-found [this idx not-found]
28+
(cond
29+
(= idx 0) val
30+
(= idx 1) cfn
31+
:else not-found))
32+
ICounted
33+
(-count [this]
34+
2)
35+
ICancelable
36+
(-canceled? [this]
37+
(canceled? cfn)))
38+
39+
(defn canceled? [this]
40+
(-canceled? this))
41+
42+
43+
(defn -move-puts-to-buffer [puts buffer]
44+
(loop []
45+
(if (or (b/full? buffer)
46+
(b/empty-buffer? puts))
47+
nil
48+
(let [[val cfn] (b/remove! puts)]
49+
(if (cancelled? cfn)
50+
(recur)
51+
(do (st/-run-later (partial cfn true))
52+
(b/add! buffer val)
53+
(recur)))))))
54+
55+
(defn -get-non-canceled! [buffer]
56+
(loop []
57+
(if (b/empty-buffer? buffer)
58+
nil
59+
(let [v (b/remove! buffer)]
60+
(if (canceled? v)
61+
(recur)
62+
v)))))
63+
64+
65+
(deftype MultiReaderWriterChannel [puts takes buffer closed? ops-since-last-clean]
66+
IReadPort
67+
(-take! [this cfn]
68+
(if (canceled? cfn)
69+
false
70+
(if (and closed?
71+
(b/empty-buffer? buffer)
72+
(b/empty-buffer? puts))
73+
(do (-commit! cfn)
74+
(st/-run-later (partial cfn nil))
75+
false)
76+
(if (not (b/empty-buffer? buffer))
77+
(do (-commit! cfn)
78+
(st/-run-later (partial cfn (b/remove! buffer)))
79+
(-move-puts-to-buffer puts buffer))
80+
81+
(if-let [[v pcfn] (-get-non-canceled! puts)]
82+
(do (-commit! pcfn)
83+
(-commit! cfn)
84+
(st/-run-later (partial pcfn true))
85+
(st/-run-later (partial cfn v))
86+
true)
87+
(do (set-field! this :ops-since-last-clean (inc ops-since-last-clean))
88+
(b/add-unbounded! takes cfn)
89+
true))))))
90+
IWritePort
91+
(-put! [this val cfn]
92+
(if (or (canceled? cfn))
93+
false
94+
(if closed?
95+
(do (-commit! cfn)
96+
(st/-run-later (partial cfn false))
97+
false)
98+
(if-let [tfn (-get-non-canceled! takes)]
99+
(do (-commit! cfn)
100+
(-commit! tfn)
101+
(st/-run-later (partial tfn val))
102+
(st/-run-later (partial cfn true))
103+
true)
104+
(if (not (b/full? buffer))
105+
(do (b/add! buffer val)
106+
(-commit! cfn)
107+
(st/-run-later (partial cfn true))
108+
true)
109+
(do (b/add-unbounded! puts (->OpCell val cfn))
110+
(set-field! this :ops-since-last-clean (inc ops-since-last-clean))
111+
true))))))
112+
ICloseable
113+
(-close! [this]
114+
(set-field! this :closed? true)
115+
(when (not (b/empty-buffer? takes))
116+
(loop []
117+
(when-let [tfn (-get-non-canceled! takes)]
118+
(-commit! tfn)
119+
(st/-run-later (partial tfn nil))
120+
(recur))))))
121+
122+
(defn chan
123+
"Creates a CSP channel with the given buffer. If an integer is provided as the argument
124+
creates a channel with a fixed buffer of that size. "
125+
([]
126+
(chan 0))
127+
([size-or-buffer]
128+
(if (= 0 size-or-buffer)
129+
(->MultiReaderWriterChannel (b/ring-buffer 8)
130+
(b/ring-buffer 8)
131+
b/null-buffer
132+
false
133+
0)
134+
(if (integer? size-or-buffer)
135+
(->MultiReaderWriterChannel (b/ring-buffer 8)
136+
(b/ring-buffer 8)
137+
(b/fixed-buffer size-or-buffer)
138+
false
139+
0)
140+
(->MultiReaderWriterChannel (b/ring-buffer 8)
141+
(b/ring-buffer 8)
142+
size-or-buffer
143+
false
144+
0)))))
145+
146+
(deftype AltHandler [atm f]
147+
ICancelable
148+
(-canceled? [this]
149+
@atm)
150+
(-commit! [this]
151+
(reset! atm true))
152+
IFn
153+
(-invoke [this & args]
154+
(apply f args)))
155+
156+
(defn alt-handlers [fns]
157+
(mapv (partial ->AltHandler (atom false)) fns))
158+
159+
(extend -canceled? IFn
160+
(fn [this] false))
161+
162+
(extend -commit! IFn
163+
(fn [this] nil))
164+
165+
(defn alts! [ops k options]
166+
(let [handler-atom (atom false)]
167+
(reduce
168+
(fn [_ op]
169+
(if (vector? op)
170+
(let [[c val] op
171+
f (fn [v]
172+
(st/-run-later (partial k [c v])))]
173+
(-put! c val (->AltHandler handler-atom f)))
174+
(let [c op
175+
f (fn [v]
176+
(st/-run-later (partial k [c v])))]
177+
(-take! c (->AltHandler handler-atom f)))))
178+
nil
179+
ops)
180+
(when (and (contains? options :default)
181+
(not @handler-atom))
182+
(reset! handler-atom true)
183+
(st/-run-later (partial k [:default (:default options)])))))

pixie/csp.pxi

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
(ns pixie.csp
2+
(require pixie.stacklets :as st)
3+
(require pixie.buffers :as b)
4+
(require pixie.channels :as chans))
5+
6+
(def chan chans/chan)
7+
8+
(defn close!
9+
"Closes the channel, future writes will be rejected, future reads will
10+
drain the channel before returning nil."
11+
[c]
12+
(chans/-close! c))
13+
14+
(def -null-callback (fn [_] nil))
15+
16+
(defn put!
17+
"Puts the value into the channel, calling the optional callback when the operation has
18+
completed."
19+
([c v]
20+
(chans/-put! c v -null-callback))
21+
([c v f]
22+
(chans/-put! c v f)))
23+
24+
(defn take!
25+
"Takes a value from a channel, calling the provided callback when completed"
26+
([c f]
27+
(chans/-take! c f)))
28+
29+
(defn >! [c v]
30+
(st/call-cc (fn [k]
31+
(chans/-put! c v (partial st/run-and-process k)))))
32+
33+
(defn <! [c]
34+
(st/call-cc (fn [k]
35+
(chans/-take! c (partial st/run-and-process k)))))
36+
37+
38+
(defmacro go [& body]
39+
`(let [ret-chan# (chans/chan 1)]
40+
(st/spawn (put! ret-chan# (do ~@body))
41+
(close! ret-chan#))
42+
ret-chan#))
43+
44+
45+
(extend -reduce chans/IReadPort
46+
(fn [c f init]
47+
(loop [acc init]
48+
(if (reduced? acc)
49+
@acc
50+
(let [v (<! c)]
51+
(if (nil? v)
52+
acc
53+
(recur (f acc v))))))))
54+
55+
56+
(defn alts!
57+
([ops]
58+
(st/call-cc (fn [k]
59+
(chans/alts! ops (partial st/run-and-process k) nil))))
60+
([ops & opts]
61+
(st/call-cc (fn [k]
62+
(chans/alts! ops (partial st/run-and-process k) (apply hashmap opts))))))

0 commit comments

Comments
 (0)