Skip to content

Commit 9237042

Browse files
committed
Better error handling in ReadWriteLock.
1 parent 368fab1 commit 9237042

File tree

2 files changed

+217
-168
lines changed

2 files changed

+217
-168
lines changed
Lines changed: 213 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -1,168 +1,213 @@
1-
require 'thread'
2-
require 'concurrent/atomic'
3-
4-
module Concurrent
5-
6-
# Ruby read-write lock implementation
7-
#
8-
# Allows any number of concurrent readers, but only one concurrent writer
9-
# (And if the "write" lock is taken, any readers who come along will have to wait)
10-
#
11-
# If readers are already active when a writer comes along, the writer will wait for
12-
# all the readers to finish before going ahead.
13-
# Any additional readers that come when the writer is already waiting, will also
14-
# wait (so writers are not starved).
15-
#
16-
# @example
17-
# lock = Concurrent::ReadWriteLock.new
18-
# lock.with_read_lock { data.retrieve }
19-
# lock.with_write_lock { data.modify! }
20-
#
21-
# @note Do **not** try to acquire the write lock while already holding a read lock
22-
# **or** try to acquire the write lock while you already have it.
23-
# This will lead to deadlock
24-
class ReadWriteLock
25-
26-
WAITING_WRITER = 1 << 15
27-
RUNNING_WRITER = 1 << 30
28-
MAX_READERS = WAITING_WRITER - 1
29-
MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1
30-
31-
# Implementation notes:
32-
# A goal is to make the uncontended path for both readers/writers lock-free
33-
# Only if there is reader-writer or writer-writer contention, should locks be used
34-
# Internal state is represented by a single integer ("counter"), and updated
35-
# using atomic compare-and-swap operations
36-
# When the counter is 0, the lock is free
37-
# Each reader increments the counter by 1 when acquiring a read lock
38-
# (and decrements by 1 when releasing the read lock)
39-
# The counter is increased by (1 << 15) for each writer waiting to acquire the
40-
# write lock, and by (1 << 30) if the write lock is taken
41-
42-
def initialize
43-
@counter = Atomic.new(0) # single integer which represents lock state
44-
@reader_q = ConditionVariable.new # queue for waiting readers
45-
@reader_mutex = Mutex.new # to protect reader queue
46-
@writer_q = ConditionVariable.new # queue for waiting writers
47-
@writer_mutex = Mutex.new # to protect writer queue
48-
end
49-
50-
def with_read_lock
51-
acquire_read_lock
52-
result = yield
53-
release_read_lock
54-
result
55-
end
56-
57-
def with_write_lock
58-
acquire_write_lock
59-
result = yield
60-
release_write_lock
61-
result
62-
end
63-
64-
def acquire_read_lock
65-
while(true)
66-
c = @counter.value
67-
raise "Too many reader threads!" if (c & MAX_READERS) == MAX_READERS
68-
69-
# If a writer is waiting when we first queue up, we need to wait
70-
if c >= WAITING_WRITER
71-
# But it is possible that the writer could finish and decrement @counter right here...
72-
@reader_mutex.synchronize do
73-
# So check again inside the synchronized section
74-
@reader_q.wait(@reader_mutex) if @counter.value >= WAITING_WRITER
75-
end
76-
77-
# after a reader has waited once, they are allowed to "barge" ahead of waiting writers
78-
# but if a writer is *running*, the reader still needs to wait (naturally)
79-
while(true)
80-
c = @counter.value
81-
if c >= RUNNING_WRITER
82-
@reader_mutex.synchronize do
83-
@reader_q.wait(@reader_mutex) if @counter.value >= RUNNING_WRITER
84-
end
85-
else
86-
return if @counter.compare_and_swap(c,c+1)
87-
end
88-
end
89-
else
90-
break if @counter.compare_and_swap(c,c+1)
91-
end
92-
end
93-
end
94-
95-
def release_read_lock
96-
while(true)
97-
c = @counter.value
98-
if @counter.compare_and_swap(c,c-1)
99-
# If one or more writers were waiting, and we were the last reader, wake a writer up
100-
if c >= WAITING_WRITER && (c & MAX_READERS) == 1
101-
@writer_mutex.synchronize { @writer_q.signal }
102-
end
103-
break
104-
end
105-
end
106-
end
107-
108-
def acquire_write_lock
109-
while(true)
110-
c = @counter.value
111-
raise "Too many writers!" if (c & MAX_WRITERS) == MAX_WRITERS
112-
113-
if c == 0 # no readers OR writers running
114-
# if we successfully swap the RUNNING_WRITER bit on, then we can go ahead
115-
break if @counter.compare_and_swap(0,RUNNING_WRITER)
116-
elsif @counter.compare_and_swap(c,c+WAITING_WRITER)
117-
while(true)
118-
# Now we have successfully incremented, so no more readers will be able to increment
119-
# (they will wait instead)
120-
# However, readers OR writers could decrement right here, OR another writer could increment
121-
@writer_mutex.synchronize do
122-
# So we have to do another check inside the synchronized section
123-
# If a writer OR reader is running, then go to sleep
124-
c = @counter.value
125-
@writer_q.wait(@writer_mutex) if (c >= RUNNING_WRITER) || ((c & MAX_READERS) > 0)
126-
end
127-
128-
# We just came out of a wait
129-
# If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
130-
# Then we are OK to stop waiting and go ahead
131-
# Otherwise go back and wait again
132-
c = @counter.value
133-
break if (c < RUNNING_WRITER) &&
134-
((c & MAX_READERS) == 0) &&
135-
@counter.compare_and_swap(c,c+RUNNING_WRITER-WAITING_WRITER)
136-
end
137-
break
138-
end
139-
end
140-
end
141-
142-
def release_write_lock
143-
while(true)
144-
c = @counter.value
145-
if @counter.compare_and_swap(c,c-RUNNING_WRITER)
146-
@reader_mutex.synchronize { @reader_q.broadcast }
147-
if (c & MAX_WRITERS) > 0 # if any writers are waiting...
148-
@writer_mutex.synchronize { @writer_q.signal }
149-
end
150-
break
151-
end
152-
end
153-
end
154-
155-
def to_s
156-
c = @counter.value
157-
s = if c >= RUNNING_WRITER
158-
"1 writer running, "
159-
elsif (c & MAX_READERS) > 0
160-
"#{c & MAX_READERS} readers running, "
161-
else
162-
""
163-
end
164-
165-
"#<ReadWriteLock:#{object_id.to_s(16)} #{s}#{(c & MAX_WRITERS) / WAITING_WRITER} writers waiting>"
166-
end
167-
end
168-
end
1+
require 'thread'
2+
require 'concurrent/atomic'
3+
require 'concurrent/errors'
4+
5+
module Concurrent
6+
7+
# Ruby read-write lock implementation
8+
#
9+
# Allows any number of concurrent readers, but only one concurrent writer
10+
# (And if the "write" lock is taken, any readers who come along will have to wait)
11+
#
12+
# If readers are already active when a writer comes along, the writer will wait for
13+
# all the readers to finish before going ahead.
14+
# Any additional readers that come when the writer is already waiting, will also
15+
# wait (so writers are not starved).
16+
#
17+
# @example
18+
# lock = Concurrent::ReadWriteLock.new
19+
# lock.with_read_lock { data.retrieve }
20+
# lock.with_write_lock { data.modify! }
21+
#
22+
# @note Do **not** try to acquire the write lock while already holding a read lock
23+
# **or** try to acquire the write lock while you already have it.
24+
# This will lead to deadlock
25+
class ReadWriteLock
26+
27+
# @!visibility private
28+
WAITING_WRITER = 1 << 15
29+
30+
# @!visibility private
31+
RUNNING_WRITER = 1 << 30
32+
33+
# @!visibility private
34+
MAX_READERS = WAITING_WRITER - 1
35+
36+
# @!visibility private
37+
MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1
38+
39+
# Implementation notes:
40+
# A goal is to make the uncontended path for both readers/writers lock-free
41+
# Only if there is reader-writer or writer-writer contention, should locks be used
42+
# Internal state is represented by a single integer ("counter"), and updated
43+
# using atomic compare-and-swap operations
44+
# When the counter is 0, the lock is free
45+
# Each reader increments the counter by 1 when acquiring a read lock
46+
# (and decrements by 1 when releasing the read lock)
47+
# The counter is increased by (1 << 15) for each writer waiting to acquire the
48+
# write lock, and by (1 << 30) if the write lock is taken
49+
50+
# Create a new `ReadWriteLock` in the unlocked state.
51+
def initialize
52+
@counter = Atomic.new(0) # single integer which represents lock state
53+
@reader_q = ConditionVariable.new # queue for waiting readers
54+
@reader_mutex = Mutex.new # to protect reader queue
55+
@writer_q = ConditionVariable.new # queue for waiting writers
56+
@writer_mutex = Mutex.new # to protect writer queue
57+
end
58+
59+
# Execute a block operation within a read lock.
60+
#
61+
# @yield the task to be performed within the lock.
62+
#
63+
# @return [Object] the result of the block operation.
64+
#
65+
# @raise [ArgumentError] when no block is given.
66+
# @raise [Concurrent::ResourceLimitError] if the maximum number of readers
67+
# is exceeded.
68+
def with_read_lock
69+
raise ArgumentError.new('no block given') unless block_given?
70+
acquire_read_lock
71+
yield
72+
ensure => ex
73+
release_read_lock unless ex.is_a? Concurrent::ResourceLimitError
74+
end
75+
76+
# Execute a block operation within a write lock.
77+
#
78+
# @yield the task to be performed within the lock.
79+
#
80+
# @return [Object] the result of the block operation.
81+
#
82+
# @raise [ArgumentError] when no block is given.
83+
# @raise [Concurrent::ResourceLimitError] if the maximum number of readers
84+
# is exceeded.
85+
def with_write_lock
86+
raise ArgumentError.new('no block given') unless block_given?
87+
acquire_write_lock
88+
yield
89+
ensure => ex
90+
release_write_lock unless ex.is_a? Concurrent::ResourceLimitError
91+
end
92+
93+
# Acquire a read lock.
94+
#
95+
# @raise [Concurrent::ResourceLimitError] if the maximum number of readers
96+
# is exceeded.
97+
def acquire_read_lock
98+
while(true)
99+
c = @counter.value
100+
raise ResourceLimitError.new('Too many reader threads') if (c & MAX_READERS) == MAX_READERS
101+
102+
# If a writer is waiting when we first queue up, we need to wait
103+
if c >= WAITING_WRITER
104+
# But it is possible that the writer could finish and decrement @counter right here...
105+
@reader_mutex.synchronize do
106+
# So check again inside the synchronized section
107+
@reader_q.wait(@reader_mutex) if @counter.value >= WAITING_WRITER
108+
end
109+
110+
# after a reader has waited once, they are allowed to "barge" ahead of waiting writers
111+
# but if a writer is *running*, the reader still needs to wait (naturally)
112+
while(true)
113+
c = @counter.value
114+
if c >= RUNNING_WRITER
115+
@reader_mutex.synchronize do
116+
@reader_q.wait(@reader_mutex) if @counter.value >= RUNNING_WRITER
117+
end
118+
else
119+
return if @counter.compare_and_swap(c,c+1)
120+
end
121+
end
122+
else
123+
break if @counter.compare_and_swap(c,c+1)
124+
end
125+
end
126+
true
127+
end
128+
129+
# Release a previously acquired read lock.
130+
def release_read_lock
131+
while(true)
132+
c = @counter.value
133+
if @counter.compare_and_swap(c,c-1)
134+
# If one or more writers were waiting, and we were the last reader, wake a writer up
135+
if c >= WAITING_WRITER && (c & MAX_READERS) == 1
136+
@writer_mutex.synchronize { @writer_q.signal }
137+
end
138+
break
139+
end
140+
end
141+
true
142+
end
143+
144+
# Acquire a write lock.
145+
#
146+
# @raise [Concurrent::ResourceLimitError] if the maximum number of writers
147+
# is exceeded.
148+
def acquire_write_lock
149+
while(true)
150+
c = @counter.value
151+
raise ResourceLimitError.new('Too many writer threads') if (c & MAX_WRITERS) == MAX_WRITERS
152+
153+
if c == 0 # no readers OR writers running
154+
# if we successfully swap the RUNNING_WRITER bit on, then we can go ahead
155+
break if @counter.compare_and_swap(0,RUNNING_WRITER)
156+
elsif @counter.compare_and_swap(c,c+WAITING_WRITER)
157+
while(true)
158+
# Now we have successfully incremented, so no more readers will be able to increment
159+
# (they will wait instead)
160+
# However, readers OR writers could decrement right here, OR another writer could increment
161+
@writer_mutex.synchronize do
162+
# So we have to do another check inside the synchronized section
163+
# If a writer OR reader is running, then go to sleep
164+
c = @counter.value
165+
@writer_q.wait(@writer_mutex) if (c >= RUNNING_WRITER) || ((c & MAX_READERS) > 0)
166+
end
167+
168+
# We just came out of a wait
169+
# If we successfully turn the RUNNING_WRITER bit on with an atomic swap,
170+
# Then we are OK to stop waiting and go ahead
171+
# Otherwise go back and wait again
172+
c = @counter.value
173+
break if (c < RUNNING_WRITER) &&
174+
((c & MAX_READERS) == 0) &&
175+
@counter.compare_and_swap(c,c+RUNNING_WRITER-WAITING_WRITER)
176+
end
177+
break
178+
end
179+
end
180+
true
181+
end
182+
183+
# Release a previously acquired write lock.
184+
def release_write_lock
185+
while(true)
186+
c = @counter.value
187+
if @counter.compare_and_swap(c,c-RUNNING_WRITER)
188+
@reader_mutex.synchronize { @reader_q.broadcast }
189+
if (c & MAX_WRITERS) > 0 # if any writers are waiting...
190+
@writer_mutex.synchronize { @writer_q.signal }
191+
end
192+
break
193+
end
194+
end
195+
true
196+
end
197+
198+
# Returns a string representing *obj*. Includes the current reader and
199+
# writer counts.
200+
def to_s
201+
c = @counter.value
202+
s = if c >= RUNNING_WRITER
203+
"1 writer running, "
204+
elsif (c & MAX_READERS) > 0
205+
"#{c & MAX_READERS} readers running, "
206+
else
207+
""
208+
end
209+
210+
"#<ReadWriteLock:#{object_id.to_s(16)} #{s}#{(c & MAX_WRITERS) / WAITING_WRITER} writers waiting>"
211+
end
212+
end
213+
end

lib/concurrent/errors.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ module Concurrent
2424
# possibly because of a reject policy or other internal error.
2525
RejectedExecutionError = Class.new(StandardError)
2626

27+
# Raised when any finite resource, such as a lock counter, exceeds its
28+
# maximum limit/threshold.
29+
ResourceLimitError = Class.new(StandardError)
30+
2731
# Raised when an operation times out.
2832
TimeoutError = Class.new(StandardError)
2933

0 commit comments

Comments
 (0)