Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
name: CI
on:
pull_request:
branches:
- 'master'
- 'release-*'
# branches:
# - 'master'
# - 'release-*'
push:
branches:
- 'master'
Expand Down
15 changes: 11 additions & 4 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi
"""
abstract type ClusterManager end

# cluster_manager is a global constant
const cluster_manager = Ref{ClusterManager}()

function throw_if_cluster_manager_unassigned()
isassigned(cluster_manager) || error("cluster_manager is unassigned")
return nothing
end

"""
WorkerConfig

Expand Down Expand Up @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus

# On workers, the default cluster manager connects via TCP sockets. Custom
# transports will need to call this function with their own manager.
global cluster_manager
cluster_manager = manager
cluster_manager[] = manager

# Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called.
@assert nprocs() <= 1
Expand Down Expand Up @@ -569,7 +576,7 @@ function setup_launched_worker(manager, wconfig, launched_q)
# same type. This is done by setting an appropriate value to `WorkerConfig.cnt`.
cnt = something(wconfig.count, 1)
if cnt === :auto
cnt = wconfig.environ[:cpu_threads]
cnt = (wconfig.environ::AbstractDict)[:cpu_threads]
end
cnt = cnt - 1 # Removing self from the requested number

Expand Down Expand Up @@ -607,7 +614,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch
end
end

function create_worker(manager, wconfig)
function create_worker(manager::ClusterManager, wconfig::WorkerConfig)
# only node 1 can add new nodes, since nobody else has the full list of address:port
@assert LPROC.id == 1
timeout = worker_timeout()
Expand Down
1 change: 1 addition & 0 deletions src/macros.jl
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ completion. To wait for completion, prefix the call with [`@sync`](@ref), like :
macro distributed(args...)
na = length(args)
if na==1
reducer = identity
loop = args[1]
elseif na==2
reducer = args[1]
Expand Down
17 changes: 9 additions & 8 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa

any(c -> c == '"', exename) && throw(ArgumentError("invalid exename"))

remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...))
remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)::AbstractString)
# change working directory
if dir !== nothing && dir != ""
any(c -> c == '"', dir) && throw(ArgumentError("invalid dir"))
Expand Down Expand Up @@ -553,7 +553,7 @@ end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
if op === :interrupt
kill(config.process, 2)
kill(config.process::Process, 2)
end
end

Expand Down Expand Up @@ -606,7 +606,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)

# master connecting to workers
if config.io !== nothing
(bind_addr, port::Int) = read_worker_host_port(config.io)
(bind_addr, port::Int) = read_worker_host_port(config.io::IO)
pubhost = something(config.host, bind_addr)
config.host = pubhost
config.port = port
Expand Down Expand Up @@ -776,21 +776,22 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wai
sleep(exit_timeout)

# Check to see if our child exited, and if not, send an actual kill signal
if !process_exited(config.process)
process = config.process::Process
if !process_exited(process)
@warn "Failed to gracefully kill worker $(pid)"
profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10)
if profile_sig !== nothing
@warn("Sending profile $(profile_sig[1]) to worker $(pid)")
kill(config.process, profile_sig[2])
kill(process, profile_sig[2])
sleep(profile_wait)
end
@warn("Sending SIGQUIT to worker $(pid)")
kill(config.process, Base.SIGQUIT)
kill(process, Base.SIGQUIT)

sleep(term_timeout)
if !process_exited(config.process)
if !process_exited(process)
@warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL")
kill(config.process, Base.SIGKILL)
kill(process, Base.SIGKILL)
end
end
end
Expand Down
4 changes: 2 additions & 2 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ end
function send_msg(s::IO, header, msg)
id = worker_id_from_socket(s)
if id > -1
return send_msg(worker_from_id(id), header, msg)
return send_msg(worker_from_id(id)::Worker, header, msg)
end
send_msg_unknown(s, header, msg)
end

function send_msg_now(s::IO, header, msg::AbstractMsg)
id = worker_id_from_socket(s)
if id > -1
return send_msg_now(worker_from_id(id), header, msg)
return send_msg_now(worker_from_id(id)::Worker, header, msg)
end
send_msg_unknown(s, header, msg)
end
Expand Down
20 changes: 12 additions & 8 deletions src/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ function run_work_thunk(thunk::Function, print_error::Bool)
end
return result
end
function run_work_thunk(rv::RemoteValue, thunk)
function run_work_thunk_remotevalue(rv::RemoteValue, thunk)
put!(rv, run_work_thunk(thunk, false))
nothing
end
Expand All @@ -85,7 +85,7 @@ function schedule_call(rid, thunk)
rv = RemoteValue(def_rv_channel())
(PGRP::ProcessGroup).refs[rid] = rv
push!(rv.clientset, rid.whence)
errormonitor(@async run_work_thunk(rv, thunk))
errormonitor(@async run_work_thunk_remotevalue(rv, thunk))
return rv
end
end
Expand Down Expand Up @@ -289,7 +289,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi
try
deliver_result(w_stream, :call_fetch, header.notify_oid, v.v)
finally
unlock(v.rv.synctake)
unlock(v.rv.synctake::ReentrantLock)
end
else
deliver_result(w_stream, :call_fetch, header.notify_oid, v)
Expand All @@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version)
end

function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version)
throw_if_cluster_manager_unassigned()

# register a new peer worker connection
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version)
w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker
send_connection_hdr(w, false)
send_msg_now(w, MsgHeader(), IdentifySocketAckMsg())
notify(w.initialized)
Expand All @@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi
end

function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
throw_if_cluster_manager_unassigned()

LPROC.id = msg.self_pid
controller = Worker(1, r_stream, w_stream, cluster_manager; version=version)
controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker
notify(controller.initialized)
register_worker(LPROC)
topology(msg.topology)
Expand All @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
let rpid=rpid, wconfig=wconfig
if lazy
# The constructor registers the object with a global registry.
Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig))
Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig))
else
@async connect_to_peer(cluster_manager, rpid, wconfig)
@async connect_to_peer(cluster_manager[], rpid, wconfig)
end
end
end
Expand All @@ -362,7 +366,7 @@ end
function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig)
try
(r_s, w_s) = connect(manager, rpid, wconfig)
w = Worker(rpid, r_s, w_s, manager; config=wconfig)
w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker
process_messages(w.r_stream, w.w_stream, false)
send_connection_hdr(w, true)
send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid()))
Expand Down
12 changes: 6 additions & 6 deletions src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ function serialize(s::AbstractSerializer, ::Future)
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut)
end

function serialize(s::AbstractSerializer, ::RemoteChannel)
zero_rc = RemoteChannel{Channel{Any}}((0,0,0))
function serialize(s::AbstractSerializer, ::RemoteChannel{T}) where T
zero_rc = RemoteChannel{T}((0,0,0))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc)
end

Expand Down Expand Up @@ -706,8 +706,8 @@ function put_ref(rid, caller, args...)
put!(rv, args...)
if myid() == caller && rv.synctake !== nothing
# Wait till a "taken" value is serialized out - github issue #29932
lock(rv.synctake)
unlock(rv.synctake)
lock(rv.synctake::ReentrantLock)
unlock(rv.synctake::ReentrantLock)
end
nothing
end
Expand All @@ -731,15 +731,15 @@ function take_ref(rid, caller, args...)
# special handling for local put! / remote take! on unbuffered channel
# github issue #29932
synctake = true
lock(rv.synctake)
lock(rv.synctake::ReentrantLock)
end

v = try
take!(rv, args...)
catch e
# avoid unmatched unlock when exception occurs
# github issue #33972
synctake && unlock(rv.synctake)
synctake && unlock(rv.synctake::ReentrantLock)
rethrow(e)
end

Expand Down
4 changes: 2 additions & 2 deletions src/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
```
"""
function default_worker_pool()
function default_worker_pool()::AbstractWorkerPool
# On workers retrieve the default worker pool from the master when accessed
# for the first time
if _default_worker_pool[] === nothing
Expand All @@ -299,7 +299,7 @@ function default_worker_pool()
_default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1)
end
end
return _default_worker_pool[]
return _default_worker_pool[]::AbstractWorkerPool
end

"""
Expand Down
8 changes: 6 additions & 2 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,12 @@ end
@testset "Ser/deser to non-ClusterSerializer objects" begin
function test_regular_io_ser(ref::DistributedNext.AbstractRemoteRef)
io = IOBuffer()
serialize(io, ref)
# Wrapping the ref in a Dict to exercise the case when the
# type parameter of the RemoteChannel is part of an outer type.
# See https://github.com/JuliaLang/Distributed.jl/issues/178
serialize(io, Dict("ref" => ref))
seekstart(io)
ref2 = deserialize(io)
ref2 = deserialize(io)["ref"]
for fld in fieldnames(typeof(ref))
v = getfield(ref2, fld)
if isa(v, Number)
Expand All @@ -361,6 +364,7 @@ end

test_regular_io_ser(Future())
test_regular_io_ser(RemoteChannel())
test_regular_io_ser(RemoteChannel(() -> Channel{Bool}(1)))
end

@testset "@distributed and [un]buffered reads" begin
Expand Down
Loading