Skip to content

Commit b994e0b

Browse files
authored
make our locks fast (#42964)
Now that we have the ability to express atomics (particularly the write barrier of the Task object), it is time for us to speed up and simplify our locks. This PR does need someone to verify the synchronizations are correct, but I think this version is actually quite simple. There is a single byte that represents the state of the lock: 0x0 : unlocked 0x1 : locked (without other waiters) 0x2 : locked with waiters And then when we have the locked bit set, we can manipulate any of the other data (e.g. owner and reentrancy) without needing any atomic work, and if we don't hold that bit, we should not write to any other field.
1 parent 5e4773b commit b994e0b

File tree

6 files changed

+132
-127
lines changed

6 files changed

+132
-127
lines changed

base/lock.jl

Lines changed: 76 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,22 @@ end
2626
```
2727
"""
2828
mutable struct ReentrantLock <: AbstractLock
29-
locked_by::Union{Task, Nothing}
30-
cond_wait::ThreadSynchronizer
31-
reentrancy_cnt::Int
32-
33-
ReentrantLock() = new(nothing, ThreadSynchronizer(), 0)
29+
# offset = 16
30+
@atomic locked_by::Union{Task, Nothing}
31+
# offset32 = 20, offset64 = 24
32+
reentrancy_cnt::UInt32
33+
# offset32 = 24, offset64 = 28
34+
@atomic havelock::UInt8 # 0x0 = none, 0x1 = lock, 0x2 = conflict
35+
# offset32 = 28, offset64 = 32
36+
cond_wait::ThreadSynchronizer # 2 words
37+
# offset32 = 36, offset64 = 48
38+
# sizeof32 = 20, sizeof64 = 32
39+
# now add padding to make this a full cache line to minimize false sharing between objects
40+
_::NTuple{Int === Int32 ? 2 : 3, Int}
41+
# offset32 = 44, offset64 = 72 == sizeof+offset
42+
# sizeof32 = 28, sizeof64 = 56
43+
44+
ReentrantLock() = new(nothing, 0x0000_0000, 0x00, ThreadSynchronizer())
3445
end
3546

3647
assert_havelock(l::ReentrantLock) = assert_havelock(l, l.locked_by)
@@ -42,7 +53,7 @@ Check whether the `lock` is held by any task/thread.
4253
This should not be used for synchronization (see instead [`trylock`](@ref)).
4354
"""
4455
function islocked(rl::ReentrantLock)
45-
return rl.reentrancy_cnt != 0
56+
return rl.havelock != 0
4657
end
4758

4859
"""
@@ -55,23 +66,26 @@ return `false`.
5566
5667
Each successful `trylock` must be matched by an [`unlock`](@ref).
5768
"""
58-
function trylock(rl::ReentrantLock)
59-
t = current_task()
60-
if t === rl.locked_by
61-
rl.reentrancy_cnt += 1
69+
@inline function trylock(rl::ReentrantLock)
70+
ct = current_task()
71+
if rl.locked_by === ct
72+
#@assert rl.havelock !== 0x00
73+
rl.reentrancy_cnt += 0x0000_0001
6274
return true
6375
end
64-
lock(rl.cond_wait)
65-
if rl.reentrancy_cnt == 0
66-
rl.locked_by = t
67-
rl.reentrancy_cnt = 1
68-
GC.disable_finalizers()
69-
got = true
70-
else
71-
got = false
76+
return _trylock(rl, ct)
77+
end
78+
@noinline function _trylock(rl::ReentrantLock, ct::Task)
79+
GC.disable_finalizers()
80+
if (@atomicreplace :acquire rl.havelock 0x00 => 0x01).success
81+
#@assert rl.locked_by === nothing
82+
#@assert rl.reentrancy_cnt === 0
83+
rl.reentrancy_cnt = 0x0000_0001
84+
@atomic :release rl.locked_by = ct
85+
return true
7286
end
73-
unlock(rl.cond_wait)
74-
return got
87+
GC.enable_finalizers()
88+
return false
7589
end
7690

7791
"""
@@ -83,28 +97,23 @@ wait for it to become available.
8397
8498
Each `lock` must be matched by an [`unlock`](@ref).
8599
"""
86-
function lock(rl::ReentrantLock)
87-
t = current_task()
88-
if t === rl.locked_by
89-
rl.reentrancy_cnt += 1
90-
else
91-
lock(rl.cond_wait)
92-
while true
93-
if rl.reentrancy_cnt == 0
94-
rl.locked_by = t
95-
rl.reentrancy_cnt = 1
96-
GC.disable_finalizers()
97-
break
98-
end
99-
try
100-
wait(rl.cond_wait)
101-
catch
102-
unlock(rl.cond_wait)
103-
rethrow()
100+
@inline function lock(rl::ReentrantLock)
101+
trylock(rl) || (@noinline function slowlock(rl::ReentrantLock)
102+
c = rl.cond_wait
103+
lock(c.lock)
104+
try
105+
while true
106+
if (@atomicreplace rl.havelock 0x01 => 0x02).old == 0x00 # :sequentially_consistent ? # now either 0x00 or 0x02
107+
# it was unlocked, so try to lock it ourself
108+
_trylock(rl, current_task()) && break
109+
else # it was locked, so now wait for the release to notify us
110+
wait(c)
111+
end
104112
end
113+
finally
114+
unlock(c.lock)
105115
end
106-
unlock(rl.cond_wait)
107-
end
116+
end)(rl)
108117
return
109118
end
110119

@@ -116,58 +125,42 @@ Releases ownership of the `lock`.
116125
If this is a recursive lock which has been acquired before, decrement an
117126
internal counter and return immediately.
118127
"""
119-
function unlock(rl::ReentrantLock)
120-
t = current_task()
121-
n = rl.reentrancy_cnt
122-
n == 0 && error("unlock count must match lock count")
123-
rl.locked_by === t || error("unlock from wrong thread")
124-
if n > 1
125-
rl.reentrancy_cnt = n - 1
126-
else
127-
lock(rl.cond_wait)
128-
rl.reentrancy_cnt = 0
129-
rl.locked_by = nothing
130-
if !isempty(rl.cond_wait.waitq)
131-
try
132-
notify(rl.cond_wait)
133-
catch
134-
unlock(rl.cond_wait)
135-
rethrow()
128+
@inline function unlock(rl::ReentrantLock)
129+
rl.locked_by === current_task() ||
130+
error(rl.reentrancy_cnt == 0x0000_0000 ? "unlock count must match lock count" : "unlock from wrong thread")
131+
(@noinline function _unlock(rl::ReentrantLock)
132+
n = rl.reentrancy_cnt - 0x0000_0001
133+
rl.reentrancy_cnt = n
134+
if n == 0x0000_00000
135+
@atomic :monotonic rl.locked_by = nothing
136+
if (@atomicswap :release rl.havelock = 0x00) == 0x02
137+
(@noinline function notifywaiters(rl)
138+
cond_wait = rl.cond_wait
139+
lock(cond_wait)
140+
try
141+
notify(cond_wait)
142+
finally
143+
unlock(cond_wait)
144+
end
145+
end)(rl)
136146
end
147+
return true
137148
end
138-
GC.enable_finalizers()
139-
unlock(rl.cond_wait)
140-
end
141-
return
149+
return false
150+
end)(rl) && GC.enable_finalizers()
151+
nothing
142152
end
143153

144154
function unlockall(rl::ReentrantLock)
145-
t = current_task()
146-
n = rl.reentrancy_cnt
147-
rl.locked_by === t || error("unlock from wrong thread")
148-
n == 0 && error("unlock count must match lock count")
149-
lock(rl.cond_wait)
150-
rl.reentrancy_cnt = 0
151-
rl.locked_by = nothing
152-
if !isempty(rl.cond_wait.waitq)
153-
try
154-
notify(rl.cond_wait)
155-
catch
156-
unlock(rl.cond_wait)
157-
rethrow()
158-
end
159-
end
160-
GC.enable_finalizers()
161-
unlock(rl.cond_wait)
155+
n = @atomicswap :not_atomic rl.reentrancy_cnt = 0x0000_0001
156+
unlock(rl)
162157
return n
163158
end
164159

165-
function relockall(rl::ReentrantLock, n::Int)
166-
t = current_task()
160+
function relockall(rl::ReentrantLock, n::UInt32)
167161
lock(rl)
168-
n1 = rl.reentrancy_cnt
169-
rl.reentrancy_cnt = n
170-
n1 == 1 || concurrency_violation()
162+
old = @atomicswap :not_atomic rl.reentrancy_cnt = n
163+
old == 0x0000_0001 || concurrency_violation()
171164
return
172165
end
173166

base/locks-mt.jl

Lines changed: 10 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -27,46 +27,19 @@ contending threads. If you have more contention than that, different
2727
synchronization approaches should be considered.
2828
"""
2929
mutable struct SpinLock <: AbstractLock
30-
owned::Int
30+
# we make this much larger than necessary to minimize false-sharing
31+
@atomic owned::Int
3132
SpinLock() = new(0)
3233
end
3334

34-
import Base.Sys.WORD_SIZE
35-
36-
@eval _xchg!(x::SpinLock, v::Int) =
37-
llvmcall($"""
38-
%ptr = inttoptr i$WORD_SIZE %0 to i$WORD_SIZE*
39-
%rv = atomicrmw xchg i$WORD_SIZE* %ptr, i$WORD_SIZE %1 acq_rel
40-
ret i$WORD_SIZE %rv
41-
""", Int, Tuple{Ptr{Int}, Int}, unsafe_convert(Ptr{Int}, pointer_from_objref(x)), v)
42-
43-
@eval _get(x::SpinLock) =
44-
llvmcall($"""
45-
%ptr = inttoptr i$WORD_SIZE %0 to i$WORD_SIZE*
46-
%rv = load atomic i$WORD_SIZE, i$WORD_SIZE* %ptr monotonic, align $(gc_alignment(Int))
47-
ret i$WORD_SIZE %rv
48-
""", Int, Tuple{Ptr{Int}}, unsafe_convert(Ptr{Int}, pointer_from_objref(x)))
49-
50-
@eval _set!(x::SpinLock, v::Int) =
51-
llvmcall($"""
52-
%ptr = inttoptr i$WORD_SIZE %0 to i$WORD_SIZE*
53-
store atomic i$WORD_SIZE %1, i$WORD_SIZE* %ptr release, align $(gc_alignment(Int))
54-
ret void
55-
""", Cvoid, Tuple{Ptr{Int}, Int}, unsafe_convert(Ptr{Int}, pointer_from_objref(x)), v)
56-
5735
# Note: this cannot assert that the lock is held by the correct thread, because we do not
5836
# track which thread locked it. Users beware.
5937
Base.assert_havelock(l::SpinLock) = islocked(l) ? nothing : Base.concurrency_violation()
6038

6139
function lock(l::SpinLock)
6240
while true
63-
if _get(l) == 0
64-
GC.disable_finalizers()
65-
p = _xchg!(l, 1)
66-
if p == 0
67-
return
68-
end
69-
GC.enable_finalizers()
41+
if @inline trylock(l)
42+
return
7043
end
7144
ccall(:jl_cpu_pause, Cvoid, ())
7245
# Temporary solution before we have gc transition support in codegen.
@@ -75,9 +48,9 @@ function lock(l::SpinLock)
7548
end
7649

7750
function trylock(l::SpinLock)
78-
if _get(l) == 0
51+
if l.owned == 0
7952
GC.disable_finalizers()
80-
p = _xchg!(l, 1)
53+
p = @atomicswap :acquire l.owned = 1
8154
if p == 0
8255
return true
8356
end
@@ -87,13 +60,14 @@ function trylock(l::SpinLock)
8760
end
8861

8962
function unlock(l::SpinLock)
90-
_get(l) == 0 && error("unlock count must match lock count")
91-
_set!(l, 0)
63+
if (@atomicswap :release l.owned = 0) == 0
64+
error("unlock count must match lock count")
65+
end
9266
GC.enable_finalizers()
9367
ccall(:jl_cpu_wake, Cvoid, ())
9468
return
9569
end
9670

9771
function islocked(l::SpinLock)
98-
return _get(l) != 0
72+
return l.owned != 0
9973
end

doc/src/base/multi-threading.md

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,6 @@ Base.Threads.nthreads
99
```
1010

1111
See also [Multi-Threading](@ref man-multithreading).
12-
## Synchronization
13-
14-
```@docs
15-
Base.Threads.Condition
16-
Base.Threads.Event
17-
```
18-
19-
See also [Synchronization](@ref lib-task-sync).
2012

2113
## Atomic operations
2214

doc/src/base/parallel.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ Base.schedule
2626

2727
## [Synchronization](@id lib-task-sync)
2828

29+
## Synchronization
30+
2931
```@docs
3032
Base.errormonitor
3133
Base.@sync
@@ -34,6 +36,8 @@ Base.fetch(t::Task)
3436
Base.timedwait
3537
3638
Base.Condition
39+
Base.Threads.Condition
40+
Base.Threads.Event
3741
Base.notify
3842
3943
Base.Semaphore

test/threads.jl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,27 @@
22

33
using Test
44

5+
# simple sanity tests for locks under cooperative concurrent access
6+
let lk = ReentrantLock()
7+
c1 = Base.Event()
8+
c2 = Base.Event()
9+
@test trylock(lk)
10+
@test trylock(lk)
11+
t1 = @async (notify(c1); lock(lk); unlock(lk); trylock(lk))
12+
t2 = @async (notify(c2); trylock(lk))
13+
wait(c1)
14+
wait(c2)
15+
@test t1.queue === lk.cond_wait.waitq
16+
@test t2.queue !== lk.cond_wait.waitq
17+
@test istaskdone(t2)
18+
@test !fetch(t2)
19+
unlock(lk)
20+
@test t1.queue === lk.cond_wait.waitq
21+
unlock(lk)
22+
@test t1.queue !== lk.cond_wait.waitq
23+
@test fetch(t1)
24+
end
25+
526
let cmd = `$(Base.julia_cmd()) --depwarn=error --rr-detach --startup-file=no threads_exec.jl`
627
for test_nthreads in (1, 2, 4, 4) # run once to try single-threaded mode, then try a couple times to trigger bad races
728
new_env = copy(ENV)

test/threads_exec.jl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,27 @@ end
2727
# (expected test duration is about 18-180 seconds)
2828
Timer(t -> killjob("KILLING BY THREAD TEST WATCHDOG\n"), 1200)
2929

30+
# basic lock check
31+
if nthreads() > 1
32+
let lk = Base.Threads.SpinLock()
33+
c1 = Base.Event()
34+
c2 = Base.Event()
35+
@test trylock(lk)
36+
@test !trylock(lk)
37+
t1 = Threads.@spawn (notify(c1); lock(lk); unlock(lk); trylock(lk))
38+
t2 = Threads.@spawn (notify(c2); trylock(lk))
39+
Libc.systemsleep(0.1) # block our thread from scheduling for a bit
40+
wait(c1)
41+
wait(c2)
42+
@test !fetch(t2)
43+
@test istaskdone(t2)
44+
@test !istaskdone(t1)
45+
unlock(lk)
46+
@test fetch(t1)
47+
@test istaskdone(t1)
48+
end
49+
end
50+
3051
# threading constructs
3152

3253
let a = zeros(Int, 2 * nthreads())

0 commit comments

Comments
 (0)