@@ -26,12 +26,13 @@ mutable struct Future <: AbstractRemoteRef
2626 where :: Int
2727 whence:: Int
2828 id:: Int
29- v:: Union{Some{Any}, Nothing}
29+ lock:: ReentrantLock
30+ @atomic v:: Union{Some{Any}, Nothing}
3031
3132 Future (w:: Int , rrid:: RRID , v:: Union{Some, Nothing} = nothing ) =
32- (r = new (w,rrid. whence,rrid. id,v); return test_existing_ref (r))
33+ (r = new (w,rrid. whence,rrid. id,ReentrantLock (), v); return test_existing_ref (r))
3334
34- Future (t:: NTuple{4, Any} ) = new (t[1 ],t[2 ],t[3 ],t[4 ]) # Useful for creating dummy, zeroed-out instances
35+ Future (t:: NTuple{4, Any} ) = new (t[1 ],t[2 ],t[3 ],ReentrantLock (), t[4 ]) # Useful for creating dummy, zeroed-out instances
3536end
3637
3738"""
@@ -69,10 +70,17 @@ function test_existing_ref(r::AbstractRemoteRef)
6970 found = getkey (client_refs, r, nothing )
7071 if found != = nothing
7172 @assert r. where > 0
72- if isa (r, Future) && found. v === nothing && r. v != = nothing
73- # we have recd the value from another source, probably a deserialized ref, send a del_client message
74- send_del_client (r)
75- found. v = r. v
73+ if isa (r, Future)
74+ # this is only for copying the reference from Future to RemoteRef (just created)
75+ fv_cache = @atomic :acquire found. v
76+ rv_cache = @atomic :monotonic r. v
77+ if fv_cache === nothing && rv_cache != = nothing
78+ # we have recd the value from another source, probably a deserialized ref, send a del_client message
79+ send_del_client (r)
80+ @lock found. lock begin
81+ @atomicreplace found. v nothing => rv_cache
82+ end
83+ end
7684 end
7785 return found:: typeof (r)
7886 end
@@ -91,8 +99,9 @@ function finalize_ref(r::AbstractRemoteRef)
9199 send_del_client_no_lock (r)
92100 else
93101 # send_del_client only if the reference has not been set
94- r. v === nothing && send_del_client_no_lock (r)
95- r. v = nothing
102+ v_cache = @atomic :monotonic r. v
103+ v_cache === nothing && send_del_client_no_lock (r)
104+ @atomic :monotonic r. v = nothing
96105 end
97106 r. where = 0
98107 finally
@@ -201,7 +210,8 @@ isready(f) # will not block
201210```
202211"""
203212function isready (rr:: Future )
204- rr. v === nothing || return true
213+ v_cache = @atomic rr. v
214+ v_cache === nothing || return true
205215
206216 rid = remoteref_id (rr)
207217 return if rr. where == myid ()
@@ -354,26 +364,33 @@ end
354364
355365channel_type (rr:: RemoteChannel{T} ) where {T} = T
356366
357- serialize (s:: ClusterSerializer , f:: Future ) = serialize (s, f, f. v === nothing )
358- serialize (s:: ClusterSerializer , rr:: RemoteChannel ) = serialize (s, rr, true )
359- function serialize (s:: ClusterSerializer , rr:: AbstractRemoteRef , addclient)
360- if addclient
367+ function serialize (s:: ClusterSerializer , f:: Future )
368+ v_cache = @atomic f. v
369+ if v_cache === nothing
361370 p = worker_id_from_socket (s. io)
362- (p != = rr . where) && send_add_client (rr , p)
371+ (p != = f . where) && send_add_client (f , p)
363372 end
373+ fc = Future ((f. where, f. whence, f. id, v_cache)) # copy to be used for serialization (contains a reset lock)
374+ invoke (serialize, Tuple{ClusterSerializer, Any}, s, fc)
375+ end
376+
377+ function serialize (s:: ClusterSerializer , rr:: RemoteChannel )
378+ p = worker_id_from_socket (s. io)
379+ (p != = rr. where) && send_add_client (rr, p)
364380 invoke (serialize, Tuple{ClusterSerializer, Any}, s, rr)
365381end
366382
367383function deserialize (s:: ClusterSerializer , t:: Type{<:Future} )
368- f = invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t)
369- f2 = Future (f . where, RRID (f . whence, f . id), f . v) # ctor adds to client_refs table
384+ fc = invoke (deserialize, Tuple{ClusterSerializer, DataType}, s, t) # deserialized copy
385+ f2 = Future (fc . where, RRID (fc . whence, fc . id), fc . v) # ctor adds to client_refs table
370386
371387 # 1) send_add_client() is not executed when the ref is being serialized
372388 # to where it exists, hence do it here.
373389 # 2) If we have received a 'fetch'ed Future or if the Future ctor found an
374390 # already 'fetch'ed instance in client_refs (Issue #25847), we should not
375391 # track it in the backing RemoteValue store.
376- if f2. where == myid () && f2. v === nothing
392+ f2v_cache = @atomic f2. v
393+ if f2. where == myid () && f2v_cache === nothing
377394 add_client (remoteref_id (f2), myid ())
378395 end
379396 f2
570587
571588Wait for a value to become available for the specified [`Future`](@ref).
572589"""
573- wait (r:: Future ) = (r. v != = nothing && return r; call_on_owner (wait_ref, r, myid ()); r)
590+ wait (r:: Future ) = (v_cache = @atomic r. v; v_cache != = nothing && return r; call_on_owner (wait_ref, r, myid ()); r)
574591
575592"""
576593 wait(r::RemoteChannel, args...)
@@ -587,11 +604,41 @@ Further calls to `fetch` on the same reference return the cached value. If the r
587604is an exception, throws a [`RemoteException`](@ref) which captures the remote exception and backtrace.
588605"""
589606function fetch (r:: Future )
590- r. v != = nothing && return something (r. v)
591- v = call_on_owner (fetch_ref, r)
592- r. v = Some (v)
607+ v_cache = @atomic r. v
608+ v_cache != = nothing && return something (v_cache)
609+
610+ if r. where == myid ()
611+ rv, v_cache = @lock r. lock begin
612+ v_cache = @atomic :monotonic r. v
613+ rv = v_cache === nothing ? lookup_ref (remoteref_id (r)) : nothing
614+ rv, v_cache
615+ end
616+
617+ if v_cache != = nothing
618+ return something (v_cache)
619+ else
620+ v_local = fetch (rv. c)
621+ end
622+ else
623+ v_local = call_on_owner (fetch_ref, r)
624+ end
625+
626+ v_cache = @atomic r. v
627+
628+ if v_cache === nothing # call_on_owner case
629+ v_old, status = @lock r. lock begin
630+ @atomicreplace r. v nothing => Some (v_local)
631+ end
632+ # status == true - when value obtained through call_on_owner
633+ # status == false - any other situation: atomicreplace fails, because by the time the lock is obtained cache will be populated
634+ # why? local put! performs caching and putting into channel under r.lock
635+
636+ # for local put! use the cached value, for call_on_owner cases just take the v_local as it was just cached in r.v
637+ v_cache = status ? v_local : v_old
638+ end
639+
593640 send_del_client (r)
594- v
641+ something (v_cache)
595642end
596643
597644fetch_ref (rid, args... ) = fetch (lookup_ref (rid). c, args... )
@@ -615,12 +662,30 @@ A `put!` on an already set `Future` throws an `Exception`.
615662All asynchronous remote calls return `Future`s and set the
616663value to the return value of the call upon completion.
617664"""
618- function put! (rr:: Future , v)
619- rr. v != = nothing && error (" Future can be set only once" )
620- call_on_owner (put_future, rr, v, myid ())
621- rr. v = Some (v)
622- rr
665+ function put! (r:: Future , v)
666+ if r. where == myid ()
667+ rid = remoteref_id (r)
668+ rv = lookup_ref (rid)
669+ isready (rv) && error (" Future can be set only once" )
670+ @lock r. lock begin
671+ put! (rv, v) # this notifies the tasks waiting on the channel in fetch
672+ set_future_cache (r, v) # set the cache before leaving the lock, so that the notified tasks already see it cached
673+ end
674+ del_client (rid, myid ())
675+ else
676+ @lock r. lock begin # same idea as above if there were any local tasks fetching on this Future
677+ call_on_owner (put_future, r, v, myid ())
678+ set_future_cache (r, v)
679+ end
680+ end
681+ r
623682end
683+
684+ function set_future_cache (r:: Future , v)
685+ _, ok = @atomicreplace r. v nothing => Some (v)
686+ ok || error (" internal consistency error detected for Future" )
687+ end
688+
624689function put_future (rid, v, caller)
625690 rv = lookup_ref (rid)
626691 isready (rv) && error (" Future can be set only once" )
0 commit comments