@@ -2,26 +2,32 @@ struct RemoteFetcher end
22function stream_fetch_values! (:: Type{RemoteFetcher} , T, store_ref:: Chunk{Store_remote} , buffer:: Blocal , id:: UInt ) where {Store_remote, Blocal}
33 thunk_id = STREAM_THUNK_ID[]
44 @dagdebug thunk_id :stream " fetching values"
5- @label fetch_values
6- # FIXME : Pass buffer free space
7- # TODO : It would be ideal if we could wait on store.lock, but get unlocked during migration
8- values = MemPool. access_ref (store_ref. handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
9- if ! isopen (store)
10- throw (InvalidStateException (" Stream is closed" , :closed ))
11- end
12- @dagdebug thunk_id :stream " trying to fetch values at $(myid ()) "
13- store:: Store_remote
14- in_store = store
15- STREAM_THUNK_ID[] = thunk_id
16- values = T[]
17- while ! isempty (store, id)
18- value = take! (store, id):: T
19- push! (values, value)
20- end
21- return values
22- end :: Vector{T}
23- if isempty (values)
24- @goto fetch_values
5+
6+ values = T[]
7+ while isempty (values)
8+ # FIXME : Pass buffer free space
9+ # TODO : It would be ideal if we could wait on store.lock, but get unlocked during migration
10+ values = MemPool. access_ref (store_ref. handle, id, T, Store_remote, thunk_id) do store, id, T, Store_remote, thunk_id
11+ if ! isopen (store)
12+ throw (InvalidStateException (" Stream is closed" , :closed ))
13+ end
14+ @dagdebug thunk_id :stream " trying to fetch values at $(myid ()) "
15+ store:: Store_remote
16+ in_store = store
17+ STREAM_THUNK_ID[] = thunk_id
18+ values = T[]
19+ while ! isempty (store, id)
20+ value = take! (store, id):: T
21+ push! (values, value)
22+ end
23+ return values
24+ end :: Vector{T}
25+
26+ # We explicitly yield in the loop to allow other tasks to run. This
27+ # matters on single-threaded instances because MemPool.access_ref()
28+ # might not yield when accessing data locally, which can cause this loop
29+ # to spin forever.
30+ yield ()
2531 end
2632
2733 @dagdebug thunk_id :stream " fetched $(length (values)) values"
0 commit comments