Skip to content

Commit 72615ee

Browse files
committed
Merge pull request #17 from chrisseaton/mvars
MVar: initial implementation.
2 parents fa10810 + fc690d0 commit 72615ee

File tree

6 files changed

+476
-7
lines changed

6 files changed

+476
-7
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ The design goals of this gem are:
3434
loosely based on the [MailboxProcessor](http://blogs.msdn.com/b/dsyme/archive/2010/02/15/async-and-parallel-design-patterns-in-f-part-3-agents.aspx)
3535
agent in [F#](http://msdn.microsoft.com/en-us/library/ee370357.aspx)
3636
* [Dataflow](https://github.com/jdantonio/concurrent-ruby/blob/master/md/dataflow.md) loosely based on the syntax of Akka and Habanero Java
37+
* [MVar](https://github.com/jdantonio/concurrent-ruby/blob/master/md/mvar.md) inspired by Haskell
3738

3839
### Semantic Versioning
3940

lib/concurrent.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
require 'concurrent/dereferenceable'
1414
require 'concurrent/event'
1515
require 'concurrent/future'
16+
require 'concurrent/mvar'
1617
require 'concurrent/obligation'
1718
require 'concurrent/postable'
1819
require 'concurrent/promise'

lib/concurrent/dereferenceable.rb

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,23 @@ def set_deref_options(opts = {})
3232
# by the #set_deref_options method.
3333
def value
3434
mutex.synchronize do
35-
return nil if @value.nil?
36-
return @value if @do_nothing_on_deref
37-
value = @value
38-
value = @copy_on_deref.call(value) if @copy_on_deref
39-
value = value.dup if @dup_on_deref
40-
value = value.freeze if @freeze_on_deref
41-
value
35+
apply_deref_options(@value)
4236
end
4337
end
4438
alias_method :deref, :value
4539

4640
protected
4741

42+
def apply_deref_options(value)
43+
return nil if value.nil?
44+
return value if @do_nothing_on_deref
45+
value = value
46+
value = @copy_on_deref.call(value) if @copy_on_deref
47+
value = value.dup if @dup_on_deref
48+
value = value.freeze if @freeze_on_deref
49+
value
50+
end
51+
4852
def mutex # :nodoc:
4953
@mutex
5054
end

lib/concurrent/mvar.rb

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
require 'concurrent/event'
2+
3+
module Concurrent
4+
5+
class MVar
6+
7+
include Dereferenceable
8+
9+
EMPTY = Object.new
10+
TIMEOUT = Object.new
11+
12+
def initialize(value = EMPTY, opts = {})
13+
@value = value
14+
@mutex = Mutex.new
15+
@empty_condition = ConditionVariable.new
16+
@full_condition = ConditionVariable.new
17+
set_deref_options(opts)
18+
end
19+
20+
def take(timeout = nil)
21+
@mutex.synchronize do
22+
# If the value isn't empty, wait for full to be signalled
23+
@full_condition.wait(@mutex, timeout) if empty?
24+
25+
# If we timed out we'll still be empty
26+
if full?
27+
value = @value
28+
@value = EMPTY
29+
@empty_condition.signal
30+
apply_deref_options(value)
31+
else
32+
TIMEOUT
33+
end
34+
end
35+
end
36+
37+
def put(value, timeout = nil)
38+
@mutex.synchronize do
39+
# Unless the value is empty, wait for empty to be signalled
40+
@empty_condition.wait(@mutex, timeout) if full?
41+
42+
# If we timed out we won't be empty
43+
if empty?
44+
@value = value
45+
@full_condition.signal
46+
apply_deref_options(value)
47+
else
48+
TIMEOUT
49+
end
50+
end
51+
end
52+
53+
def modify(timeout = nil)
54+
raise ArgumentError.new('no block given') unless block_given?
55+
56+
@mutex.synchronize do
57+
# If the value isn't empty, wait for full to be signalled
58+
@full_condition.wait(@mutex, timeout) if empty?
59+
60+
# If we timed out we'll still be empty
61+
if full?
62+
value = @value
63+
@value = yield value
64+
@full_condition.signal
65+
apply_deref_options(value)
66+
else
67+
TIMEOUT
68+
end
69+
end
70+
end
71+
72+
def try_take!
73+
@mutex.synchronize do
74+
if full?
75+
value = @value
76+
@value = EMPTY
77+
@empty_condition.signal
78+
apply_deref_options(value)
79+
else
80+
EMPTY
81+
end
82+
end
83+
end
84+
85+
def try_put!(value)
86+
@mutex.synchronize do
87+
if empty?
88+
@value = value
89+
@full_condition.signal
90+
true
91+
else
92+
false
93+
end
94+
end
95+
end
96+
97+
def set!(value)
98+
@mutex.synchronize do
99+
old_value = @value
100+
@value = value
101+
@full_condition.signal
102+
apply_deref_options(old_value)
103+
end
104+
end
105+
106+
def modify!(timeout = nil)
107+
raise ArgumentError.new('no block given') unless block_given?
108+
109+
@mutex.synchronize do
110+
value = @value
111+
@value = yield value
112+
if @value == EMPTY
113+
@empty_condition.signal
114+
else
115+
@full_condition.signal
116+
end
117+
apply_deref_options(value)
118+
end
119+
end
120+
121+
def empty?
122+
@value == EMPTY
123+
end
124+
125+
def full?
126+
not empty?
127+
end
128+
129+
end
130+
131+
end

md/mvar.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# MVar
2+
3+
An `MVar` is a synchronized single element container. They are empty or contain
4+
one item. Taking a value from an empty `MVar` blocks, as does putting a value
5+
into a full one. You can either think of them as blocking queue of length one,
6+
or a special kind of mutable variable.
7+
8+
On top of the fundamental `#put` and `#take` operations, we also provide a
9+
`#mutate` that is atomic with respect to operations on the same instance. These
10+
operations all support timeouts.
11+
12+
We also support non-blocking operations `#try_put!` and `#try_take!`, a `#set!`
13+
that ignores existing values, a `#value` that returns the value without removing
14+
it or returns `MVar::EMPTY`, and a `#modify!` that yields `MVar::EMPTY` if the
15+
`MVar` is empty and can be used to set `MVar::EMPTY`. You shouldn't use these
16+
operations in the first instance.
17+
18+
`MVar` is a Dereferenceable.
19+
20+
`MVar` is related to M-structures in Id, `MVar` in Haskell and `SyncVar` in
21+
Scala.
22+
23+
See:
24+
25+
1. P. Barth, R. Nikhil, and Arvind. M-Structures: Extending a parallel, non-
26+
strict, functional language with state. In Proceedings of the 5th ACM Conference
27+
on Functional Programming Languages and Computer Architecture (FPCA), 1991.
28+
29+
2. S. Peyton Jones, A. Gordon, and S. Finne. Concurrent Haskell. In Proceedings of the 23rd Symposium on Principles of Programming Languages (PoPL), 1996.
30+
31+
Note that unlike the original Haskell paper, our `#take` is blocking. This is
32+
how Haskell and Scala do it today.

0 commit comments

Comments
 (0)