Skip to content

Commit 9639c42

Browse files
authored
Define iterate for RemoteChannel (JuliaLang/julia#48515)
1 parent 1ff04d8 commit 9639c42

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

src/Distributed.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module Distributed
77

88
# imports for extension
99
import Base: getindex, wait, put!, take!, fetch, isready, push!, length,
10-
hash, ==, kill, close, isopen, showerror
10+
hash, ==, kill, close, isopen, showerror, iterate, IteratorSize
1111

1212
# imports for use
1313
using Base: Process, Semaphore, JLOptions, buffer_writes, @async_unwrap,

src/remotecall.jl

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,3 +778,23 @@ function getindex(r::RemoteChannel, args...)
778778
end
779779
return remotecall_fetch(getindex, r.where, r, args...)
780780
end
781+
782+
function iterate(c::RemoteChannel, state=nothing)
783+
if isopen(c) || isready(c)
784+
try
785+
return (take!(c), nothing)
786+
catch e
787+
if isa(e, InvalidStateException) ||
788+
(isa(e, RemoteException) &&
789+
isa(e.captured.ex, InvalidStateException) &&
790+
e.captured.ex.state === :closed)
791+
return nothing
792+
end
793+
rethrow()
794+
end
795+
else
796+
return nothing
797+
end
798+
end
799+
800+
IteratorSize(::Type{<:RemoteChannel}) = SizeUnknown()

test/distributed_exec.jl

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,32 @@ function test_iteration(in_c, out_c)
456456
end
457457

458458
test_iteration(Channel(10), Channel(10))
459+
test_iteration(RemoteChannel(() -> Channel(10)), RemoteChannel(() -> Channel(10)))
460+
461+
@everywhere function test_iteration_take(ch)
462+
count = 0
463+
for x in ch
464+
count += 1
465+
end
466+
return count
467+
end
468+
469+
@everywhere function test_iteration_put(ch, total)
470+
for i in 1:total
471+
put!(ch, i)
472+
end
473+
close(ch)
474+
end
475+
476+
let ch = RemoteChannel(() -> Channel(1))
477+
@async test_iteration_put(ch, 10)
478+
@test 10 == @fetchfrom id_other test_iteration_take(ch)
479+
# now reverse
480+
ch = RemoteChannel(() -> Channel(1))
481+
@spawnat id_other test_iteration_put(ch, 10)
482+
@test 10 == test_iteration_take(ch)
483+
end
484+
459485
# make sure exceptions propagate when waiting on Tasks
460486
@test_throws CompositeException (@sync (@async error("oops")))
461487
try

0 commit comments

Comments
 (0)