4
4
5
5
module Concurrent
6
6
7
+ # An `MVar` is a single-element container that blocks on `get` if it is empty,
8
+ # and blocks on `put` if it is full. It is safe to use an `MVar` from
9
+ # multiple threads. `MVar` can be seen as a single-element blocking queue, or
10
+ # a rendezvous variable.
11
+ #
12
+ # An `MVar` is typically used to transfer objects between threads, where the
13
+ # sending thread will block if the previous message hasn't been taken yet by the
14
+ # receiving thread. It can also be used to control access to some global shared
15
+ # state, where threads `take` the value, perform some operation, and then
16
+ # `put` it back.
7
17
class MVar
8
18
9
19
include Dereferenceable
10
20
21
+ # Unique value that represents that an `MVar` was empty
11
22
EMPTY = Object . new
23
+
24
+ # Unique value that represents that an `MVar` timed out before it was able
25
+ # to produce a value.
12
26
TIMEOUT = Object . new
13
27
28
+ # Create a new `MVar`, either empty or with an initial value.
29
+ #
30
+ # @param [Hash] opts the options controlling how the future will be processed
31
+ # @option opts [Boolean] :operation (false) when `true` will execute the future on the global
32
+ # operation pool (for long-running operations), when `false` will execute the future on the
33
+ # global task pool (for short-running tasks)
34
+ # @option opts [object] :executor when provided will run all operations on
35
+ # this executor rather than the global thread pool (overrides :operation)
36
+ # @option opts [String] :dup_on_deref (false) call `#dup` before returning the data
37
+ # @option opts [String] :freeze_on_deref (false) call `#freeze` before returning the data
38
+ # @option opts [String] :copy_on_deref (nil) call the given `Proc` passing the internal value and
39
+ # returning the value returned from the proc
14
40
def initialize ( value = EMPTY , opts = { } )
15
41
@value = value
16
42
@mutex = Mutex . new
@@ -19,6 +45,10 @@ def initialize(value = EMPTY, opts = {})
19
45
set_deref_options ( opts )
20
46
end
21
47
48
+ # Remove the value from an `MVar`, leaving it empty, and blocking if there
49
+ # isn't a value. A timeout can be set to limit the time spent blocked, in
50
+ # which case it returns `TIMEOUT` if the time is exceeded.
51
+ # @return [Object] the value that was taken, or `TIMEOUT`
22
52
def take ( timeout = nil )
23
53
@mutex . synchronize do
24
54
wait_for_full ( timeout )
@@ -35,6 +65,10 @@ def take(timeout = nil)
35
65
end
36
66
end
37
67
68
+ # Put a value into an `MVar`, blocking if there is already a value until
69
+ # it is empty. A timeout can be set to limit the time spent blocked, in
70
+ # which case it returns `TIMEOUT` if the time is exceeded.
71
+ # @return [Object] the value that was put, or `TIMEOUT`
38
72
def put ( value , timeout = nil )
39
73
@mutex . synchronize do
40
74
wait_for_empty ( timeout )
@@ -50,6 +84,11 @@ def put(value, timeout = nil)
50
84
end
51
85
end
52
86
87
+ # Atomically `take`, yield the value to a block for transformation, and then
88
+ # `put` the transformed value. Returns the transformed value. A timeout can
89
+ # be set to limit the time spent blocked, in which case it returns `TIMEOUT`
90
+ # if the time is exceeded.
91
+ # @return [Object] the transformed value, or `TIMEOUT`
53
92
def modify ( timeout = nil )
54
93
raise ArgumentError . new ( 'no block given' ) unless block_given?
55
94
@@ -68,6 +107,7 @@ def modify(timeout = nil)
68
107
end
69
108
end
70
109
110
+ # Non-blocking version of `take`, that returns `EMPTY` instead of blocking.
71
111
def try_take!
72
112
@mutex . synchronize do
73
113
if unlocked_full?
@@ -81,6 +121,7 @@ def try_take!
81
121
end
82
122
end
83
123
124
+ # Non-blocking version of `put`, that returns whether or not it was successful.
84
125
def try_put! ( value )
85
126
@mutex . synchronize do
86
127
if unlocked_empty?
@@ -93,6 +134,7 @@ def try_put!(value)
93
134
end
94
135
end
95
136
137
+ # Non-blocking version of `put` that will overwrite an existing value.
96
138
def set! ( value )
97
139
@mutex . synchronize do
98
140
old_value = @value
@@ -102,6 +144,7 @@ def set!(value)
102
144
end
103
145
end
104
146
147
+ # Non-blocking version of `modify` that will yield with `EMPTY` if there is no value yet.
105
148
def modify!
106
149
raise ArgumentError . new ( 'no block given' ) unless block_given?
107
150
@@ -117,10 +160,12 @@ def modify!
117
160
end
118
161
end
119
162
163
+ # Returns if the `MVar` is currently empty.
120
164
def empty?
121
165
@mutex . synchronize { @value == EMPTY }
122
166
end
123
167
168
+ # Returns if the `MVar` currently contains a value.
124
169
def full?
125
170
not empty?
126
171
end
0 commit comments