Skip to content

Commit 647b9f4

Browse files
authored
Add timeout parameter to wait(::Condition) (#56974)
We have a need for this capability. I believe this closes #36217. The implementation is straightforward and there are a couple of tests.
1 parent 98f8aca commit 647b9f4

File tree

2 files changed

+104
-3
lines changed

2 files changed

+104
-3
lines changed

base/condition.jl

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,20 +125,104 @@ proceeding.
125125
"""
126126
function wait end
127127

128+
# wait with timeout
129+
#
130+
# The behavior of wait changes if a timeout is specified. There are
131+
# three concurrent entities that can interact:
132+
# 1. Task W: the task that calls wait w/timeout.
133+
# 2. Task T: the task created to handle a timeout.
134+
# 3. Task N: the task that notifies the Condition being waited on.
135+
#
136+
# Typical flow:
137+
# - W enters the Condition's wait queue.
138+
# - W creates T and stops running (calls wait()).
139+
# - T, when scheduled, waits on a Timer.
140+
# - Two common outcomes:
141+
# - N notifies the Condition.
142+
# - W starts running, closes the Timer, sets waiter_left and returns
143+
# the notify'ed value.
144+
# - The closed Timer throws an EOFError to T which simply ends.
145+
# - The Timer expires.
146+
# - T starts running and locks the Condition.
147+
# - T confirms that waiter_left is unset and that W is still in the
148+
# Condition's wait queue; it then removes W from the wait queue,
149+
# sets dosched to true and unlocks the Condition.
150+
# - If dosched is true, T schedules W with the special :timed_out
151+
# value.
152+
# - T ends.
153+
# - W runs and returns :timed_out.
154+
#
155+
# Some possible interleavings:
156+
# - N notifies the Condition but the Timer expires and T starts running
157+
# before W:
158+
# - W closing the expired Timer is benign.
159+
# - T will find that W is no longer in the Condition's wait queue
160+
# (which is protected by a lock) and will not schedule W.
161+
# - N notifies the Condition; W runs and calls wait on the Condition
162+
# again before the Timer expires:
163+
# - W sets waiter_left before leaving. When T runs, it will find that
164+
# waiter_left is set and will not schedule W.
165+
#
166+
# The lock on the Condition's wait queue and waiter_left together
167+
# ensure proper synchronization and behavior of the tasks involved.
168+
128169
"""
129-
wait(c::GenericCondition; first::Bool=false)
170+
wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
130171
131172
Wait for [`notify`](@ref) on `c` and return the `val` parameter passed to `notify`.
132173
133174
If the keyword `first` is set to `true`, the waiter will be put _first_
134175
in line to wake up on `notify`. Otherwise, `wait` has first-in-first-out (FIFO) behavior.
176+
177+
If `timeout` is specified, cancel the `wait` when it expires and return
178+
`:timed_out`. The minimum value for `timeout` is 0.001 seconds, i.e. 1
179+
millisecond.
135180
"""
136-
function wait(c::GenericCondition; first::Bool=false)
181+
function wait(c::GenericCondition; first::Bool=false, timeout::Real=0.0)
182+
timeout == 0.0 || timeout 1e-3 || throw(ArgumentError("timeout must be ≥ 1 millisecond"))
183+
137184
ct = current_task()
138185
_wait2(c, ct, first)
139186
token = unlockall(c.lock)
187+
188+
timer::Union{Timer, Nothing} = nothing
189+
waiter_left::Union{Threads.Atomic{Bool}, Nothing} = nothing
190+
if timeout > 0.0
191+
timer = Timer(timeout)
192+
waiter_left = Threads.Atomic{Bool}(false)
193+
# start a task to wait on the timer
194+
t = Task() do
195+
try
196+
wait(timer)
197+
catch e
198+
# if the timer was closed, the waiting task has been scheduled; do nothing
199+
e isa EOFError && return
200+
end
201+
dosched = false
202+
lock(c.lock)
203+
# Confirm that the waiting task is still in the wait queue and remove it. If
204+
# the task is not in the wait queue, it must have been notified already so we
205+
# don't do anything here.
206+
if !waiter_left[] && ct.queue == c.waitq
207+
dosched = true
208+
Base.list_deletefirst!(c.waitq, ct)
209+
end
210+
unlock(c.lock)
211+
# send the waiting task a timeout
212+
dosched && schedule(ct, :timed_out)
213+
end
214+
t.sticky = false
215+
Threads._spawn_set_thrpool(t, :interactive)
216+
schedule(t)
217+
end
218+
140219
try
141-
return wait()
220+
res = wait()
221+
if timer !== nothing
222+
close(timer)
223+
waiter_left[] = true
224+
end
225+
return res
142226
catch
143227
q = ct.queue; q === nothing || Base.list_deletefirst!(q::IntrusiveLinkedList{Task}, ct)
144228
rethrow()

test/channels.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# This file is a part of Julia. License is MIT: https://julialang.org/license
22

33
using Random
4+
using Base.Threads
45
using Base: Experimental
56
using Base: n_avail
67

@@ -39,6 +40,22 @@ end
3940
@test fetch(t) == "finished"
4041
end
4142

43+
@testset "timed wait on Condition" begin
44+
a = Threads.Condition()
45+
@test_throws ArgumentError @lock a wait(a; timeout=0.0005)
46+
@test @lock a wait(a; timeout=0.1)==:timed_out
47+
lock(a)
48+
@spawn begin
49+
@lock a notify(a)
50+
end
51+
@test try
52+
wait(a; timeout=2)
53+
true
54+
finally
55+
unlock(a)
56+
end
57+
end
58+
4259
@testset "various constructors" begin
4360
c = Channel()
4461
@test eltype(c) == Any

0 commit comments

Comments
 (0)