|
| 1 | +require 'thread' |
| 2 | +require 'concurrent/atomic/atomic_reference' |
| 3 | +require 'concurrent/errors' |
| 4 | +require 'concurrent/synchronization' |
| 5 | +require 'concurrent/atomic/thread_local_var' |
| 6 | + |
| 7 | +module Concurrent |
| 8 | + |
| 9 | + # Re-entrant read-write lock implementation |
| 10 | + # |
| 11 | + # Allows any number of concurrent readers, but only one concurrent writer |
| 12 | + # (And while the "write" lock is taken, no read locks can be obtained either. |
| 13 | + # Hence, the write lock can also be called an "exclusive" lock.) |
| 14 | + # |
| 15 | + # If another thread has taken a read lock, any thread which wants a write lock |
| 16 | + # will block until all the readers release their locks. However, once a thread |
| 17 | + # starts waiting to obtain a write lock, any additional readers that come along |
| 18 | + # will also wait (so writers are not starved). |
| 19 | + # |
| 20 | + # A thread can acquire both a read and write lock at the same time. A thread can |
| 21 | + # also acquire a read lock OR a write lock more than once. Only when the read (or |
| 22 | + # write) lock is released as many times as it was acquired, will the thread |
| 23 | + # actually let it go, allowing other threads which might have been waiting |
| 24 | + # to proceed. |
| 25 | + # |
| 26 | + # If both read and write locks are acquired by the same thread, it is not strictly |
| 27 | + # necessary to release them in the same order they were acquired. In other words, |
| 28 | + # the following code is legal: |
| 29 | + # |
| 30 | + # @example |
| 31 | + # lock = Concurrent::ReentrantReadWriteLock.new |
| 32 | + # lock.acquire_write_lock |
| 33 | + # lock.acquire_read_lock |
| 34 | + # lock.release_write_lock |
| 35 | + # # At this point, the current thread is holding only a read lock, not a write |
| 36 | + # # lock. So other threads can take read locks, but not a write lock. |
| 37 | + # lock.release_read_lock |
| 38 | + # # Now the current thread is not holding either a read or write lock, so |
| 39 | + # # another thread could potentially acquire a write lock. |
| 40 | + # |
| 41 | + # This implementation was inspired by `java.util.concurrent.ReentrantReadWriteLock`. |
| 42 | + # |
| 43 | + # @example |
| 44 | + # lock = Concurrent::ReentrantReadWriteLock.new |
| 45 | + # lock.with_read_lock { data.retrieve } |
| 46 | + # lock.with_write_lock { data.modify! } |
| 47 | + # |
| 48 | + # @see http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantReadWriteLock.html java.util.concurrent.ReentrantReadWriteLock |
| 49 | + class ReentrantReadWriteLock < Synchronization::Object |
| 50 | + |
| 51 | + # Implementation notes: |
| 52 | + # |
| 53 | + # A goal is to make the uncontended path for both readers/writers mutex-free |
| 54 | + # Only if there is reader-writer or writer-writer contention, should mutexes be used |
| 55 | + # Otherwise, a single CAS operation is all we need to acquire/release a lock |
| 56 | + # |
| 57 | + # Internal state is represented by a single integer ("counter"), and updated |
| 58 | + # using atomic compare-and-swap operations |
| 59 | + # When the counter is 0, the lock is free |
| 60 | + # Each thread which has one OR MORE read locks increments the counter by 1 |
| 61 | + # (and decrements by 1 when releasing the read lock) |
| 62 | + # The counter is increased by (1 << 15) for each writer waiting to acquire the |
| 63 | + # write lock, and by (1 << 29) if the write lock is taken |
| 64 | + # |
| 65 | + # Additionally, each thread uses a thread-local variable to count how many times |
| 66 | + # it has acquired a read lock, AND how many times it has acquired a write lock. |
| 67 | + # It uses a similar trick; an increment of 1 means a read lock was taken, and |
| 68 | + # an increment of (1 << 15) means a write lock was taken |
| 69 | + # This is what makes re-entrancy possible |
| 70 | + # |
| 71 | + # 2 rules are followed to ensure good liveness properties: |
| 72 | + # 1) Once a writer has queued up and is waiting for a write lock, no other thread |
| 73 | + # can take a lock without waiting |
| 74 | + # 2) When a write lock is released, readers are given the "first chance" to wake |
| 75 | + # up and acquire a read lock |
| 76 | + # Following these rules means readers and writers tend to "take turns", so neither |
| 77 | + # can starve the other, even under heavy contention |
| 78 | + |
| 79 | + # @!visibility private |
| 80 | + READER_BITS = 15 |
| 81 | + # @!visibility private |
| 82 | + WRITER_BITS = 14 |
| 83 | + |
| 84 | + # Used with @Counter: |
| 85 | + # @!visibility private |
| 86 | + WAITING_WRITER = 1 << READER_BITS |
| 87 | + # @!visibility private |
| 88 | + RUNNING_WRITER = 1 << (READER_BITS + WRITER_BITS) |
| 89 | + # @!visibility private |
| 90 | + MAX_READERS = WAITING_WRITER - 1 |
| 91 | + # @!visibility private |
| 92 | + MAX_WRITERS = RUNNING_WRITER - MAX_READERS - 1 |
| 93 | + |
| 94 | + # Used with @HeldCount: |
| 95 | + # @!visibility private |
| 96 | + WRITE_LOCK_HELD = 1 << READER_BITS |
| 97 | + # @!visibility private |
| 98 | + READ_LOCK_MASK = WRITE_LOCK_HELD - 1 |
| 99 | + # @!visibility private |
| 100 | + WRITE_LOCK_MASK = MAX_WRITERS |
| 101 | + |
| 102 | + # Create a new `ReentrantReadWriteLock` in the unlocked state. |
| 103 | + def initialize |
| 104 | + @Counter = AtomicFixnum.new(0) # single integer which represents lock state |
| 105 | + @ReadQueue = Synchronization::Lock.new # used to queue waiting readers |
| 106 | + @WriteQueue = Synchronization::Lock.new # used to queue waiting writers |
| 107 | + @HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread |
| 108 | + ensure_ivar_visibility! |
| 109 | + end |
| 110 | + |
| 111 | + # Execute a block operation within a read lock. |
| 112 | + # |
| 113 | + # @yield the task to be performed within the lock. |
| 114 | + # |
| 115 | + # @return [Object] the result of the block operation. |
| 116 | + # |
| 117 | + # @raise [ArgumentError] when no block is given. |
| 118 | + # @raise [Concurrent::ResourceLimitError] if the maximum number of readers |
| 119 | + # is exceeded. |
| 120 | + def with_read_lock |
| 121 | + raise ArgumentError.new('no block given') unless block_given? |
| 122 | + acquire_read_lock |
| 123 | + begin |
| 124 | + yield |
| 125 | + ensure |
| 126 | + release_read_lock |
| 127 | + end |
| 128 | + end |
| 129 | + |
| 130 | + # Execute a block operation within a write lock. |
| 131 | + # |
| 132 | + # @yield the task to be performed within the lock. |
| 133 | + # |
| 134 | + # @return [Object] the result of the block operation. |
| 135 | + # |
| 136 | + # @raise [ArgumentError] when no block is given. |
| 137 | + # @raise [Concurrent::ResourceLimitError] if the maximum number of readers |
| 138 | + # is exceeded. |
| 139 | + def with_write_lock |
| 140 | + raise ArgumentError.new('no block given') unless block_given? |
| 141 | + acquire_write_lock |
| 142 | + begin |
| 143 | + yield |
| 144 | + ensure |
| 145 | + release_write_lock |
| 146 | + end |
| 147 | + end |
| 148 | + |
| 149 | + # Acquire a read lock. If a write lock is held by another thread, will block |
| 150 | + # until it is released. |
| 151 | + # |
| 152 | + # @return [Boolean] true if the lock is successfully acquired |
| 153 | + # |
| 154 | + # @raise [Concurrent::ResourceLimitError] if the maximum number of readers |
| 155 | + # is exceeded. |
| 156 | + def acquire_read_lock |
| 157 | + if (held = @HeldCount.value) > 0 |
| 158 | + # If we already have a lock, there's no need to wait |
| 159 | + if held & READ_LOCK_MASK == 0 |
| 160 | + # But we do need to update the counter, if we were holding a write |
| 161 | + # lock but not a read lock |
| 162 | + @Counter.update { |c| c + 1 } |
| 163 | + end |
| 164 | + @HeldCount.value = held + 1 |
| 165 | + return true |
| 166 | + end |
| 167 | + |
| 168 | + while true |
| 169 | + c = @Counter.value |
| 170 | + raise ResourceLimitError.new('Too many reader threads') if max_readers?(c) |
| 171 | + |
| 172 | + # If a writer is waiting OR running when we first queue up, we need to wait |
| 173 | + if waiting_or_running_writer?(c) |
| 174 | + # Before going to sleep, check again with the ReadQueue mutex held |
| 175 | + @ReadQueue.synchronize do |
| 176 | + @ReadQueue.ns_wait if waiting_or_running_writer? |
| 177 | + end |
| 178 | + # Note: the above 'synchronize' block could have used #wait_until, |
| 179 | + # but that waits repeatedly in a loop, checking the wait condition |
| 180 | + # each time it wakes up (to protect against spurious wakeups) |
| 181 | + # But we are already in a loop, which is only broken when we successfully |
| 182 | + # acquire the lock! So we don't care about spurious wakeups, and would |
| 183 | + # rather not pay the extra overhead of using #wait_until |
| 184 | + |
| 185 | + # After a reader has waited once, they are allowed to "barge" ahead of waiting writers |
| 186 | + # But if a writer is *running*, the reader still needs to wait (naturally) |
| 187 | + while true |
| 188 | + c = @Counter.value |
| 189 | + if running_writer?(c) |
| 190 | + @ReadQueue.synchronize do |
| 191 | + @ReadQueue.ns_wait if running_writer? |
| 192 | + end |
| 193 | + elsif @Counter.compare_and_set(c, c+1) |
| 194 | + @HeldCount.value = held + 1 |
| 195 | + return true |
| 196 | + end |
| 197 | + end |
| 198 | + elsif @Counter.compare_and_set(c, c+1) |
| 199 | + @HeldCount.value = held + 1 |
| 200 | + return true |
| 201 | + end |
| 202 | + end |
| 203 | + end |
| 204 | + |
| 205 | + # Try to acquire a read lock and return true if we succeed. If it cannot be |
| 206 | + # acquired immediately, return false. |
| 207 | + # |
| 208 | + # @return [Boolean] true if the lock is successfully acquired |
| 209 | + def try_read_lock |
| 210 | + if (held = @HeldCount.value) > 0 |
| 211 | + if held & READ_LOCK_MASK == 0 |
| 212 | + # If we hold a write lock, but not a read lock... |
| 213 | + @Counter.update { |c| c + 1 } |
| 214 | + end |
| 215 | + @HeldCount.value = held + 1 |
| 216 | + return true |
| 217 | + else |
| 218 | + c = @Counter.value |
| 219 | + if !waiting_or_running_writer?(c) && @Counter.compare_and_set(c, c+1) |
| 220 | + @HeldCount.value = held + 1 |
| 221 | + return true |
| 222 | + end |
| 223 | + end |
| 224 | + false |
| 225 | + end |
| 226 | + |
| 227 | + # Release a previously acquired read lock. |
| 228 | + # |
| 229 | + # @return [Boolean] true if the lock is successfully released |
| 230 | + def release_read_lock |
| 231 | + held = @HeldCount.value = @HeldCount.value - 1 |
| 232 | + rlocks_held = held & READ_LOCK_MASK |
| 233 | + if rlocks_held == 0 |
| 234 | + c = @Counter.update { |counter| counter - 1 } |
| 235 | + # If one or more writers were waiting, and we were the last reader, wake a writer up |
| 236 | + if waiting_or_running_writer?(c) && running_readers(c) == 0 |
| 237 | + @WriteQueue.signal |
| 238 | + end |
| 239 | + elsif rlocks_held == READ_LOCK_MASK |
| 240 | + raise IllegalOperationError, "Cannot release a read lock which is not held" |
| 241 | + end |
| 242 | + true |
| 243 | + end |
| 244 | + |
| 245 | + # Acquire a write lock. Will block and wait for all active readers and writers. |
| 246 | + # |
| 247 | + # @return [Boolean] true if the lock is successfully acquired |
| 248 | + # |
| 249 | + # @raise [Concurrent::ResourceLimitError] if the maximum number of writers |
| 250 | + # is exceeded. |
| 251 | + def acquire_write_lock |
| 252 | + if (held = @HeldCount.value) >= WRITE_LOCK_HELD |
| 253 | + # if we already have a write (exclusive) lock, there's no need to wait |
| 254 | + @HeldCount.value = held + WRITE_LOCK_HELD |
| 255 | + return true |
| 256 | + end |
| 257 | + |
| 258 | + while true |
| 259 | + c = @Counter.value |
| 260 | + raise ResourceLimitError.new('Too many writer threads') if max_writers?(c) |
| 261 | + |
| 262 | + # To go ahead and take the lock without waiting, there must be no writer |
| 263 | + # running right now, AND no writers who came before us still waiting to |
| 264 | + # acquire the lock |
| 265 | + # Additionally, if any read locks have been taken, we must hold all of them |
| 266 | + if c == held |
| 267 | + # If we successfully swap the RUNNING_WRITER bit on, then we can go ahead |
| 268 | + if @Counter.compare_and_set(c, c+RUNNING_WRITER) |
| 269 | + @HeldCount.value = held + WRITE_LOCK_HELD |
| 270 | + return true |
| 271 | + end |
| 272 | + elsif @Counter.compare_and_set(c, c+WAITING_WRITER) |
| 273 | + while true |
| 274 | + # Now we have successfully incremented, so no more readers will be able to increment |
| 275 | + # (they will wait instead) |
| 276 | + # However, readers OR writers could decrement right here |
| 277 | + @WriteQueue.synchronize do |
| 278 | + # So we have to do another check inside the synchronized section |
| 279 | + # If a writer OR another reader is running, then go to sleep |
| 280 | + c = @Counter.value |
| 281 | + @WriteQueue.ns_wait if running_writer?(c) || running_readers(c) != held |
| 282 | + end |
| 283 | + # Note: if you are thinking of replacing the above 'synchronize' block |
| 284 | + # with #wait_until, read the comment in #acquire_read_lock first! |
| 285 | + |
| 286 | + # We just came out of a wait |
| 287 | + # If we successfully turn the RUNNING_WRITER bit on with an atomic swap, |
| 288 | + # then we are OK to stop waiting and go ahead |
| 289 | + # Otherwise go back and wait again |
| 290 | + c = @Counter.value |
| 291 | + if !running_writer?(c) && |
| 292 | + running_readers(c) == held && |
| 293 | + @Counter.compare_and_set(c, c+RUNNING_WRITER-WAITING_WRITER) |
| 294 | + @HeldCount.value = held + WRITE_LOCK_HELD |
| 295 | + return true |
| 296 | + end |
| 297 | + end |
| 298 | + end |
| 299 | + end |
| 300 | + end |
| 301 | + |
| 302 | + # Try to acquire a write lock and return true if we succeed. If it cannot be |
| 303 | + # acquired immediately, return false. |
| 304 | + # |
| 305 | + # @return [Boolean] true if the lock is successfully acquired |
| 306 | + def try_write_lock |
| 307 | + if (held = @HeldCount.value) >= WRITE_LOCK_HELD |
| 308 | + @HeldCount.value = held + WRITE_LOCK_HELD |
| 309 | + return true |
| 310 | + else |
| 311 | + c = @Counter.value |
| 312 | + if !waiting_or_running_writer?(c) && |
| 313 | + running_readers(c) == held && |
| 314 | + @Counter.compare_and_set(c, c+RUNNING_WRITER) |
| 315 | + @HeldCount.value = held + WRITE_LOCK_HELD |
| 316 | + return true |
| 317 | + end |
| 318 | + end |
| 319 | + false |
| 320 | + end |
| 321 | + |
| 322 | + # Release a previously acquired write lock. |
| 323 | + # |
| 324 | + # @return [Boolean] true if the lock is successfully released |
| 325 | + def release_write_lock |
| 326 | + held = @HeldCount.value = @HeldCount.value - WRITE_LOCK_HELD |
| 327 | + wlocks_held = held & WRITE_LOCK_MASK |
| 328 | + if wlocks_held == 0 |
| 329 | + c = @Counter.update { |counter| counter - RUNNING_WRITER } |
| 330 | + @ReadQueue.broadcast |
| 331 | + @WriteQueue.signal if waiting_writers(c) > 0 |
| 332 | + elsif wlocks_held == WRITE_LOCK_MASK |
| 333 | + raise IllegalOperationError, "Cannot release a write lock which is not held" |
| 334 | + end |
| 335 | + true |
| 336 | + end |
| 337 | + |
| 338 | + private |
| 339 | + |
| 340 | + # @!visibility private |
| 341 | + def running_readers(c = @Counter.value) |
| 342 | + c & MAX_READERS |
| 343 | + end |
| 344 | + |
| 345 | + # @!visibility private |
| 346 | + def running_readers?(c = @Counter.value) |
| 347 | + (c & MAX_READERS) > 0 |
| 348 | + end |
| 349 | + |
| 350 | + # @!visibility private |
| 351 | + def running_writer?(c = @Counter.value) |
| 352 | + c >= RUNNING_WRITER |
| 353 | + end |
| 354 | + |
| 355 | + # @!visibility private |
| 356 | + def waiting_writers(c = @Counter.value) |
| 357 | + (c & MAX_WRITERS) >> READER_BITS |
| 358 | + end |
| 359 | + |
| 360 | + # @!visibility private |
| 361 | + def waiting_or_running_writer?(c = @Counter.value) |
| 362 | + c >= WAITING_WRITER |
| 363 | + end |
| 364 | + |
| 365 | + # @!visibility private |
| 366 | + def max_readers?(c = @Counter.value) |
| 367 | + (c & MAX_READERS) == MAX_READERS |
| 368 | + end |
| 369 | + |
| 370 | + # @!visibility private |
| 371 | + def max_writers?(c = @Counter.value) |
| 372 | + (c & MAX_WRITERS) == MAX_WRITERS |
| 373 | + end |
| 374 | + end |
| 375 | +end |
0 commit comments