Skip to content

Commit 9b2504b

Browse files
authored
Merge pull request #85 from JuliaData/jps/rwlock-vendor
Vendor ReadWriteLock from ConcurrentUtils
2 parents c49fe52 + 0d68824 commit 9b2504b

File tree

5 files changed

+164
-8
lines changed

5 files changed

+164
-8
lines changed

Project.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,17 @@ version = "0.4.7"
66

77
[deps]
88
ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd"
9-
ConcurrentUtils = "3df5f688-6c4c-4767-8685-17f5ad261477"
109
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
1110
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
1211
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
1312
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
1413
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
1514
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
1615
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
16+
UnsafeAtomics = "013be700-e6cd-48c3-b4a1-df204f14c38f"
1717

1818
[compat]
1919
ConcurrentCollections = "0.1"
20-
ConcurrentUtils = "0.1"
2120
DataStructures = "0.18"
2221
ScopedValues = "1"
2322
julia = "1.8"

src/MemPool.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ approx_size(f::FileRef) = f.size
5454

5555
include("io.jl")
5656
include("lock.jl")
57+
include("read_write_lock.jl")
5758
include("datastore.jl")
5859

5960
"""

src/datastore.jl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Distributed
2-
import ConcurrentUtils as CU
32

43
mutable struct DRef
54
owner::Int
@@ -515,7 +514,7 @@ function poolget(ref::DRef)
515514
original_ref = ref
516515

517516
# Check global redirect cache
518-
ref = CU.lock_read(REDIRECT_CACHE_LOCK) do
517+
ref = lock_read(REDIRECT_CACHE_LOCK) do
519518
get(REDIRECT_CACHE, ref, ref)
520519
end
521520

@@ -546,7 +545,7 @@ end
546545

547546
function _getlocal(id, remote)
548547
state = with_lock(()->datastore[id], datastore_lock)
549-
CU.lock_read(state.lock) do
548+
lock_read(state.lock) do
550549
if state.redirect !== nothing
551550
return RedirectTo(state.redirect)
552551
end
@@ -623,7 +622,7 @@ struct RedirectTo
623622
end
624623

625624
const REDIRECT_CACHE = WeakKeyDict{DRef,DRef}()
626-
const REDIRECT_CACHE_LOCK = CU.ReadWriteLock()
625+
const REDIRECT_CACHE_LOCK = ReadWriteLock()
627626

628627
## Default data directory
629628

src/read_write_lock.jl

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Adapted from ConcurrentUtils/src/read_write_lock.jl
2+
3+
import UnsafeAtomics
4+
5+
abstract type AbstractReadWriteLock <: Base.AbstractLock end
6+
7+
const NOTLOCKED = UInt64(0)
8+
const NREADERS_INC = UInt64(2)
9+
const WRITELOCK_MASK = UInt64(1)
10+
11+
const NReadersAndWritelock = UInt64
12+
13+
mutable struct ReadWriteLock <: AbstractReadWriteLock
14+
@atomic nreaders_and_writelock::NReadersAndWritelock
15+
# TODO: use condition variables with lock-free notify
16+
const lock::ReentrantLock
17+
const cond_read::Threads.Condition
18+
const cond_write::Threads.Condition
19+
end
20+
21+
function fieldoffset_by_name(T, field)
22+
for idx in 1:nfields(T)
23+
if fieldnames(T)[idx] == field
24+
return fieldoffset(T, idx)
25+
end
26+
end
27+
error("No such field for $T: $field")
28+
end
29+
const OFFSET_NREADERS_AND_WRITELOCK =
30+
fieldoffset_by_name(ReadWriteLock, :nreaders_and_writelock)
31+
32+
function ReadWriteLock()
33+
lock = ReentrantLock()
34+
cond_read = Threads.Condition(lock)
35+
cond_write = Threads.Condition(lock)
36+
return ReadWriteLock(NOTLOCKED, lock, cond_read, cond_write)
37+
end
38+
39+
# Not very efficient but lock-free
40+
function trylock_read(rwlock::ReadWriteLock; nspins = -∞, ntries = -∞)
41+
local ns::Int = 0
42+
local nt::Int = 0
43+
while true
44+
old = @atomic :monotonic rwlock.nreaders_and_writelock
45+
if iszero(old & WRITELOCK_MASK)
46+
# Try to acquire reader lock without the responsibility to receive or send the
47+
# notification:
48+
old, success = @atomicreplace(
49+
:acquire_release,
50+
:monotonic,
51+
rwlock.nreaders_and_writelock,
52+
old => old + NREADERS_INC,
53+
)
54+
success && return true
55+
nt += 1
56+
nt < ntries || return false
57+
end
58+
ns += 1
59+
ns < nspins || return false
60+
end
61+
end
62+
63+
function lock_read(rwlock::ReadWriteLock)
64+
65+
# Using hardware FAA
66+
ptr = Ptr{NReadersAndWritelock}(
67+
pointer_from_objref(rwlock) + OFFSET_NREADERS_AND_WRITELOCK,
68+
)
69+
GC.@preserve rwlock begin
70+
_, n = UnsafeAtomics.modify!(ptr, +, NREADERS_INC, UnsafeAtomics.acq_rel)
71+
end
72+
# n = @atomic :acquire_release rwlock.nreaders_and_writelock += NREADERS_INC
73+
74+
if iszero(n & WRITELOCK_MASK)
75+
return
76+
end
77+
lock(rwlock.lock) do
78+
while true
79+
local n = @atomic :acquire rwlock.nreaders_and_writelock
80+
if iszero(n & WRITELOCK_MASK)
81+
@assert n > 0
82+
return
83+
end
84+
wait(rwlock.cond_read)
85+
end
86+
end
87+
end
88+
89+
function unlock_read(rwlock::ReadWriteLock)
90+
91+
# Using hardware FAA
92+
ptr = Ptr{NReadersAndWritelock}(
93+
pointer_from_objref(rwlock) + OFFSET_NREADERS_AND_WRITELOCK,
94+
)
95+
GC.@preserve rwlock begin
96+
_, n = UnsafeAtomics.modify!(ptr, -, NREADERS_INC, UnsafeAtomics.acq_rel)
97+
end
98+
# n = @atomic :acquire_release rwlock.nreaders_and_writelock -= NREADERS_INC
99+
100+
@assert iszero(n & WRITELOCK_MASK)
101+
if iszero(n)
102+
lock(rwlock.lock) do
103+
notify(rwlock.cond_write; all = false)
104+
end
105+
end
106+
return
107+
end
108+
109+
function Base.trylock(rwlock::ReadWriteLock)
110+
_, success = @atomicreplace(
111+
:acquire_release,
112+
:monotonic,
113+
rwlock.nreaders_and_writelock,
114+
NOTLOCKED => WRITELOCK_MASK,
115+
)
116+
return success::Bool
117+
end
118+
119+
function Base.lock(rwlock::ReadWriteLock)
120+
if trylock(rwlock)
121+
return
122+
end
123+
lock(rwlock.lock) do
124+
while true
125+
if trylock(rwlock)
126+
return
127+
end
128+
wait(rwlock.cond_write)
129+
end
130+
end
131+
end
132+
133+
function Base.unlock(rwlock::ReadWriteLock)
134+
@assert !iszero(rwlock.nreaders_and_writelock & WRITELOCK_MASK)
135+
@atomic :acquire_release rwlock.nreaders_and_writelock &= ~WRITELOCK_MASK
136+
lock(rwlock.lock) do
137+
notify(rwlock.cond_read)
138+
notify(rwlock.cond_write; all = false)
139+
end
140+
return
141+
end
142+
143+
###
144+
### High-level APIs
145+
###
146+
147+
lock_read(lck) = lock(lck)
148+
unlock_read(lck) = unlock(lck)
149+
150+
function lock_read(f, lock)
151+
lock_read(lock)
152+
try
153+
return f()
154+
finally
155+
unlock_read(lock)
156+
end
157+
end

src/storage.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ mutable struct RefState
316316
# Destructor, if any
317317
destructor::Any
318318
# A Reader-Writer lock to protect access to this struct
319-
lock::CU.ReadWriteLock
319+
lock::ReadWriteLock
320320
# The DRef that this value may be redirecting to
321321
redirect::Union{DRef,Nothing}
322322
end
@@ -326,7 +326,7 @@ RefState(storage::StorageState, size::Integer;
326326
RefState(storage, size,
327327
tag, leaf_tag,
328328
destructor,
329-
CU.ReadWriteLock(), nothing)
329+
ReadWriteLock(), nothing)
330330
function Base.getproperty(state::RefState, field::Symbol)
331331
if field === :storage
332332
throw(ArgumentError("Cannot directly read `:storage` field of `RefState`\nUse `storage_read(state)` instead"))

0 commit comments

Comments
 (0)