Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,17 @@ version = "0.4.7"

[deps]
ConcurrentCollections = "5060bff5-0b44-40c5-b522-fcd3ca5cecdd"
ConcurrentUtils = "3df5f688-6c4c-4767-8685-17f5ad261477"
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Mmap = "a63ad114-7e13-5084-954f-fe012c677804"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
ScopedValues = "7e506255-f358-4e82-b7e4-beb19740aa63"
Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b"
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
UnsafeAtomics = "013be700-e6cd-48c3-b4a1-df204f14c38f"

[compat]
ConcurrentCollections = "0.1"
ConcurrentUtils = "0.1"
DataStructures = "0.18"
ScopedValues = "1"
julia = "1.8"
Expand Down
1 change: 1 addition & 0 deletions src/MemPool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ approx_size(f::FileRef) = f.size

include("io.jl")
include("lock.jl")
include("read_write_lock.jl")
include("datastore.jl")

"""
Expand Down
7 changes: 3 additions & 4 deletions src/datastore.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Distributed
import ConcurrentUtils as CU

mutable struct DRef
owner::Int
Expand Down Expand Up @@ -515,7 +514,7 @@ function poolget(ref::DRef)
original_ref = ref

# Check global redirect cache
ref = CU.lock_read(REDIRECT_CACHE_LOCK) do
ref = lock_read(REDIRECT_CACHE_LOCK) do
get(REDIRECT_CACHE, ref, ref)
end

Expand Down Expand Up @@ -546,7 +545,7 @@ end

function _getlocal(id, remote)
state = with_lock(()->datastore[id], datastore_lock)
CU.lock_read(state.lock) do
lock_read(state.lock) do
if state.redirect !== nothing
return RedirectTo(state.redirect)
end
Expand Down Expand Up @@ -623,7 +622,7 @@ struct RedirectTo
end

const REDIRECT_CACHE = WeakKeyDict{DRef,DRef}()
const REDIRECT_CACHE_LOCK = CU.ReadWriteLock()
const REDIRECT_CACHE_LOCK = ReadWriteLock()

## Default data directory

Expand Down
157 changes: 157 additions & 0 deletions src/read_write_lock.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Adapted from ConcurrentUtils/src/read_write_lock.jl

import UnsafeAtomics

abstract type AbstractReadWriteLock <: Base.AbstractLock end

const NOTLOCKED = UInt64(0)
const NREADERS_INC = UInt64(2)
const WRITELOCK_MASK = UInt64(1)

const NReadersAndWritelock = UInt64

mutable struct ReadWriteLock <: AbstractReadWriteLock
@atomic nreaders_and_writelock::NReadersAndWritelock
# TODO: use condition variables with lock-free notify
const lock::ReentrantLock
const cond_read::Threads.Condition
const cond_write::Threads.Condition
end

function fieldoffset_by_name(T, field)
for idx in 1:nfields(T)
if fieldnames(T)[idx] == field
return fieldoffset(T, idx)
end
end
error("No such field for $T: $field")
end
const OFFSET_NREADERS_AND_WRITELOCK =
fieldoffset_by_name(ReadWriteLock, :nreaders_and_writelock)

function ReadWriteLock()
lock = ReentrantLock()
cond_read = Threads.Condition(lock)
cond_write = Threads.Condition(lock)
return ReadWriteLock(NOTLOCKED, lock, cond_read, cond_write)
end

# Not very efficient but lock-free
function trylock_read(rwlock::ReadWriteLock; nspins = -∞, ntries = -∞)
local ns::Int = 0
local nt::Int = 0
while true
old = @atomic :monotonic rwlock.nreaders_and_writelock
if iszero(old & WRITELOCK_MASK)
# Try to acquire reader lock without the responsibility to receive or send the
# notification:
old, success = @atomicreplace(
:acquire_release,
:monotonic,
rwlock.nreaders_and_writelock,
old => old + NREADERS_INC,
)
success && return true
nt += 1
nt < ntries || return false
end
ns += 1
ns < nspins || return false
end
end

function lock_read(rwlock::ReadWriteLock)

# Using hardware FAA
ptr = Ptr{NReadersAndWritelock}(
pointer_from_objref(rwlock) + OFFSET_NREADERS_AND_WRITELOCK,
)
GC.@preserve rwlock begin
_, n = UnsafeAtomics.modify!(ptr, +, NREADERS_INC, UnsafeAtomics.acq_rel)
end
# n = @atomic :acquire_release rwlock.nreaders_and_writelock += NREADERS_INC

if iszero(n & WRITELOCK_MASK)
return
end
lock(rwlock.lock) do
while true
local n = @atomic :acquire rwlock.nreaders_and_writelock
if iszero(n & WRITELOCK_MASK)
@assert n > 0
return
end
wait(rwlock.cond_read)
end
end
end

function unlock_read(rwlock::ReadWriteLock)

# Using hardware FAA
ptr = Ptr{NReadersAndWritelock}(
pointer_from_objref(rwlock) + OFFSET_NREADERS_AND_WRITELOCK,
)
GC.@preserve rwlock begin
_, n = UnsafeAtomics.modify!(ptr, -, NREADERS_INC, UnsafeAtomics.acq_rel)
end
# n = @atomic :acquire_release rwlock.nreaders_and_writelock -= NREADERS_INC

@assert iszero(n & WRITELOCK_MASK)
if iszero(n)
lock(rwlock.lock) do
notify(rwlock.cond_write; all = false)
end
end
return
end

function Base.trylock(rwlock::ReadWriteLock)
_, success = @atomicreplace(
:acquire_release,
:monotonic,
rwlock.nreaders_and_writelock,
NOTLOCKED => WRITELOCK_MASK,
)
return success::Bool
end

function Base.lock(rwlock::ReadWriteLock)
if trylock(rwlock)
return
end
lock(rwlock.lock) do
while true
if trylock(rwlock)
return
end
wait(rwlock.cond_write)
end
end
end

function Base.unlock(rwlock::ReadWriteLock)
@assert !iszero(rwlock.nreaders_and_writelock & WRITELOCK_MASK)
@atomic :acquire_release rwlock.nreaders_and_writelock &= ~WRITELOCK_MASK
lock(rwlock.lock) do
notify(rwlock.cond_read)
notify(rwlock.cond_write; all = false)
end
return
end

###
### High-level APIs
###

lock_read(lck) = lock(lck)
unlock_read(lck) = unlock(lck)

function lock_read(f, lock)
lock_read(lock)
try
return f()
finally
unlock_read(lock)
end
end
4 changes: 2 additions & 2 deletions src/storage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ mutable struct RefState
# Destructor, if any
destructor::Any
# A Reader-Writer lock to protect access to this struct
lock::CU.ReadWriteLock
lock::ReadWriteLock
# The DRef that this value may be redirecting to
redirect::Union{DRef,Nothing}
end
Expand All @@ -326,7 +326,7 @@ RefState(storage::StorageState, size::Integer;
RefState(storage, size,
tag, leaf_tag,
destructor,
CU.ReadWriteLock(), nothing)
ReadWriteLock(), nothing)
function Base.getproperty(state::RefState, field::Symbol)
if field === :storage
throw(ArgumentError("Cannot directly read `:storage` field of `RefState`\nUse `storage_read(state)` instead"))
Expand Down