Skip to content

Commit b9628b3

Browse files
committed
Add Lock, Condition and convert ReadWriteLock
1 parent 0628986 commit b9628b3

File tree

6 files changed

+134
-61
lines changed

6 files changed

+134
-61
lines changed

lib/concurrent/atomic/read_write_lock.rb

Lines changed: 46 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
require 'thread'
22
require 'concurrent/atomic/atomic_reference'
33
require 'concurrent/errors'
4+
require 'concurrent/synchronization'
45

56
module Concurrent
67

@@ -26,19 +27,19 @@ module Concurrent
2627
# This will lead to deadlock
2728
#
2829
# @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html java.util.concurrent.ReentrantReadWriteLock
29-
class ReadWriteLock
30+
class ReadWriteLock < Synchronization::Object
3031

3132
# @!visibility private
32-
WAITING_WRITER = 1 << 15
33+
WAITING_WRITER = 1 << 15
3334

3435
# @!visibility private
35-
RUNNING_WRITER = 1 << 30
36+
RUNNING_WRITER = 1 << 30
3637

3738
# @!visibility private
38-
MAX_READERS = WAITING_WRITER - 1
39+
MAX_READERS = WAITING_WRITER - 1
3940

4041
# @!visibility private
41-
MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1
42+
MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1
4243

4344
# Implementation notes:
4445
# A goal is to make the uncontended path for both readers/writers lock-free
@@ -53,11 +54,11 @@ class ReadWriteLock
5354

5455
# Create a new `ReadWriteLock` in the unlocked state.
5556
def initialize
56-
@counter = AtomicReference.new(0) # single integer which represents lock state
57-
@reader_q = ConditionVariable.new # queue for waiting readers
58-
@reader_mutex = Mutex.new # to protect reader queue
59-
@writer_q = ConditionVariable.new # queue for waiting writers
60-
@writer_mutex = Mutex.new # to protect writer queue
57+
@Counter = AtomicReference.new(0) # single integer which represents lock state
58+
@ReadLock = Synchronization::Lock.new
59+
@WriteLock = Synchronization::Lock.new
60+
ensure_ivar_visibility!
61+
super()
6162
end
6263

6364
# Execute a block operation within a read lock.
@@ -106,47 +107,41 @@ def with_write_lock
106107
# @raise [Concurrent::ResourceLimitError] if the maximum number of readers
107108
# is exceeded.
108109
def acquire_read_lock
109-
while(true)
110-
c = @counter.value
110+
while true
111+
c = @Counter.value
111112
raise ResourceLimitError.new('Too many reader threads') if max_readers?(c)
112113

113114
# If a writer is waiting when we first queue up, we need to wait
114115
if waiting_writer?(c)
115-
# But it is possible that the writer could finish and decrement @counter right here...
116-
@reader_mutex.synchronize do
117-
# So check again inside the synchronized section
118-
@reader_q.wait(@reader_mutex) if waiting_writer?
119-
end
116+
@ReadLock.wait_until { !waiting_writer? }
120117

121118
# after a reader has waited once, they are allowed to "barge" ahead of waiting writers
122119
# but if a writer is *running*, the reader still needs to wait (naturally)
123-
while(true)
124-
c = @counter.value
120+
while true
121+
c = @Counter.value
125122
if running_writer?(c)
126-
@reader_mutex.synchronize do
127-
@reader_q.wait(@reader_mutex) if running_writer?
128-
end
123+
@ReadLock.wait_until { !running_writer? }
129124
else
130-
return if @counter.compare_and_swap(c,c+1)
125+
return if @Counter.compare_and_swap(c, c+1)
131126
end
132127
end
133128
else
134-
break if @counter.compare_and_swap(c,c+1)
129+
break if @Counter.compare_and_swap(c, c+1)
135130
end
136-
end
131+
end
137132
true
138133
end
139134

140135
# Release a previously acquired read lock.
141136
#
142137
# @return [Boolean] true if the lock is successfully released
143138
def release_read_lock
144-
while(true)
145-
c = @counter.value
146-
if @counter.compare_and_swap(c,c-1)
139+
while true
140+
c = @Counter.value
141+
if @Counter.compare_and_swap(c, c-1)
147142
# If one or more writers were waiting, and we were the last reader, wake a writer up
148143
if waiting_writer?(c) && running_readers(c) == 1
149-
@writer_mutex.synchronize { @writer_q.signal }
144+
@WriteLock.signal
150145
end
151146
break
152147
end
@@ -161,32 +156,31 @@ def release_read_lock
161156
# @raise [Concurrent::ResourceLimitError] if the maximum number of writers
162157
# is exceeded.
163158
def acquire_write_lock
164-
while(true)
165-
c = @counter.value
159+
while true
160+
c = @Counter.value
166161
raise ResourceLimitError.new('Too many writer threads') if max_writers?(c)
167162

168163
if c == 0 # no readers OR writers running
169164
# if we successfully swap the RUNNING_WRITER bit on, then we can go ahead
170-
break if @counter.compare_and_swap(0,RUNNING_WRITER)
171-
elsif @counter.compare_and_swap(c,c+WAITING_WRITER)
172-
while(true)
165+
break if @Counter.compare_and_swap(0, RUNNING_WRITER)
166+
elsif @Counter.compare_and_swap(c, c+WAITING_WRITER)
167+
while true
173168
# Now we have successfully incremented, so no more readers will be able to increment
174169
# (they will wait instead)
175170
# However, readers OR writers could decrement right here, OR another writer could increment
176-
@writer_mutex.synchronize do
171+
@WriteLock.wait_until do
177172
# So we have to do another check inside the synchronized section
178173
# If a writer OR reader is running, then go to sleep
179-
c = @counter.value
180-
@writer_q.wait(@writer_mutex) if running_writer?(c) || running_readers?(c)
174+
c = @Counter.value
175+
!running_writer?(c) && !running_readers?(c)
181176
end
182177

183178
# We just came out of a wait
184179
# If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
185180
# Then we are OK to stop waiting and go ahead
186181
# Otherwise go back and wait again
187-
c = @counter.value
188-
break if !running_writer?(c) && !running_readers?(c) &&
189-
@counter.compare_and_swap(c,c+RUNNING_WRITER-WAITING_WRITER)
182+
c = @Counter.value
183+
break if !running_writer?(c) && !running_readers?(c) && @Counter.compare_and_swap(c, c+RUNNING_WRITER-WAITING_WRITER)
190184
end
191185
break
192186
end
@@ -198,67 +192,60 @@ def acquire_write_lock
198192
#
199193
# @return [Boolean] true if the lock is successfully released
200194
def release_write_lock
201-
while(true)
202-
c = @counter.value
203-
if @counter.compare_and_swap(c,c-RUNNING_WRITER)
204-
@reader_mutex.synchronize { @reader_q.broadcast }
205-
if waiting_writers(c) > 0 # if any writers are waiting...
206-
@writer_mutex.synchronize { @writer_q.signal }
207-
end
208-
break
209-
end
210-
end
195+
c = @Counter.update { |c| c-RUNNING_WRITER }
196+
@ReadLock.broadcast
197+
@WriteLock.signal if waiting_writers(c) > 0
211198
true
212199
end
213200

214201
# Queries if the write lock is held by any thread.
215202
#
216203
# @return [Boolean] true if the write lock is held else false`
217204
def write_locked?
218-
@counter.value >= RUNNING_WRITER
205+
@Counter.value >= RUNNING_WRITER
219206
end
220207

221208
# Queries whether any threads are waiting to acquire the read or write lock.
222209
#
223210
# @return [Boolean] true if any threads are waiting for a lock else false
224211
def has_waiters?
225-
waiting_writer?(@counter.value)
212+
waiting_writer?(@Counter.value)
226213
end
227214

228215
private
229216

230217
# @!visibility private
231-
def running_readers(c = @counter.value)
218+
def running_readers(c = @Counter.value)
232219
c & MAX_READERS
233220
end
234221

235222
# @!visibility private
236-
def running_readers?(c = @counter.value)
223+
def running_readers?(c = @Counter.value)
237224
(c & MAX_READERS) > 0
238225
end
239226

240227
# @!visibility private
241-
def running_writer?(c = @counter.value)
228+
def running_writer?(c = @Counter.value)
242229
c >= RUNNING_WRITER
243230
end
244231

245232
# @!visibility private
246-
def waiting_writers(c = @counter.value)
233+
def waiting_writers(c = @Counter.value)
247234
(c & MAX_WRITERS) / WAITING_WRITER
248235
end
249236

250237
# @!visibility private
251-
def waiting_writer?(c = @counter.value)
238+
def waiting_writer?(c = @Counter.value)
252239
c >= WAITING_WRITER
253240
end
254241

255242
# @!visibility private
256-
def max_readers?(c = @counter.value)
243+
def max_readers?(c = @Counter.value)
257244
(c & MAX_READERS) == MAX_READERS
258245
end
259246

260247
# @!visibility private
261-
def max_writers?(c = @counter.value)
248+
def max_writers?(c = @Counter.value)
262249
(c & MAX_WRITERS) == MAX_WRITERS
263250
end
264251
end

lib/concurrent/channel/buffered_channel.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ class BufferedChannel
55

66
def initialize(size)
77
@mutex = Mutex.new
8-
@condition = ConditionVariable.new
98
@buffer_condition = ConditionVariable.new
109

1110
@probe_set = WaitableList.new

lib/concurrent/synchronization.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
require 'concurrent/synchronization/rbx_object'
77
require 'concurrent/synchronization/object'
88

9+
require 'concurrent/synchronization/condition'
10+
require 'concurrent/synchronization/lock'
11+
912
module Concurrent
1013
# {include:file:doc/synchronization.md}
1114
module Synchronization

lib/concurrent/synchronization/abstract_object.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ def #{name}=(value)
149149
end
150150
names.map { |n| [n, :"#{n}="] }.flatten
151151
end
152-
153152
end
154153
end
155154
end
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
module Concurrent
2+
module Synchronization
3+
class Condition < Object
4+
5+
singleton_class.send :alias_method, :private_new, :new
6+
private_class_method :new
7+
8+
def initialize(lock)
9+
@Lock = lock
10+
ensure_ivar_visibility!
11+
super()
12+
end
13+
14+
def wait(timeout = nil)
15+
@Lock.synchronize { ns_wait(timeout) }
16+
end
17+
18+
def ns_wait(timeout = nil)
19+
synchronize { super(timeout) }
20+
end
21+
22+
def wait_until(timeout = nil, &condition)
23+
@Lock.synchronize { ns_wait_until(timeout, &condition) }
24+
end
25+
26+
def ns_wait_until(timeout = nil, &condition)
27+
synchronize { super(timeout, &condition) }
28+
end
29+
30+
def signal
31+
@Lock.synchronize { ns_signal }
32+
end
33+
34+
def ns_signal
35+
synchronize { super }
36+
end
37+
38+
def broadcast
39+
@Lock.synchronize { ns_broadcast }
40+
end
41+
42+
def ns_broadcast
43+
synchronize { super }
44+
end
45+
end
46+
47+
class Object < Implementation
48+
def new_condition
49+
Condition.private_new(self)
50+
end
51+
end
52+
end
53+
end
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
module Concurrent
2+
module Synchronization
3+
class Lock < Object
4+
5+
public :synchronize
6+
7+
def wait(timeout = nil)
8+
synchronize { ns_wait(timeout) }
9+
end
10+
11+
public :ns_wait
12+
13+
def wait_until(timeout = nil, &condition)
14+
synchronize { ns_wait_until(timeout, &condition) }
15+
end
16+
17+
public :ns_wait_until
18+
19+
def signal
20+
synchronize { ns_signal }
21+
end
22+
23+
public :ns_signal
24+
25+
def broadcast
26+
synchronize { ns_broadcast }
27+
end
28+
29+
public :ns_broadcast
30+
end
31+
end
32+
end

0 commit comments

Comments
 (0)