Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/_changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ This documents notable changes in DistributedNext.jl. The format is based on
### Changed
- Added a `project` argument to [`addprocs(::AbstractVector)`](@ref) to specify
the project of a remote worker ([#2]).
- Workers will now attempt to pick the fastest available interface to
communicate over ([#9]).
- The `SSHManager` now passes all `JULIA_*` environment variables by default to
the workers, instead of only `JULIA_WORKER_TIMEOUT` ([#9]).
1 change: 1 addition & 0 deletions src/DistributedNext.jl
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ end
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)

include("network_interfaces.jl")
include("clusterserialize.jl")
include("cluster.jl") # cluster setup and management, addprocs
include("messages.jl")
Expand Down
41 changes: 31 additions & 10 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ end
mutable struct LocalProcess
id::Int
bind_addr::String
bind_port::UInt16
bind_port::Int
cookie::String
LocalProcess() = new(1)
end
Expand All @@ -237,6 +237,11 @@ The function reads the cookie from stdin if required, and listens on a free por
tasks to process incoming TCP connections and requests. It also (optionally)
closes stdin and redirects stderr to stdout.

If a specific interface is not specified through `--bind-to` it will make a
best-effort attempt to pick the fastest available network interface to listen
on. The heuristics it uses for this depend on the system configuration and
should not be relied upon to always pick the fastest interface.

It does not return.
"""
start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...)
Expand All @@ -253,8 +258,8 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
interface = IPv4(LPROC.bind_addr)
if LPROC.bind_port == 0
port_hint = 9000 + (getpid() % 1000)
(port, sock) = listenany(interface, UInt16(port_hint))
LPROC.bind_port = port
(port, sock) = listenany(interface, port_hint)
LPROC.bind_port = Int(port)
else
sock = listen(interface, LPROC.bind_port)
end
Expand All @@ -263,7 +268,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
process_messages(client, client, true)
end)
print(out, "julia_worker:") # print header
print(out, "$(string(LPROC.bind_port))#") # print port
print(out, "$(LPROC.bind_port)#") # print port
print(out, LPROC.bind_addr)
print(out, '\n')
flush(out)
Expand Down Expand Up @@ -1308,17 +1313,33 @@ function init_bind_addr()
end
else
bind_port = 0
try
bind_addr = string(getipaddr())
catch
# All networking is unavailable, initialize bind_addr to the loopback address
# Will cause an exception to be raised only when used.

interfaces = _get_interfaces(IPv4)
if isempty(interfaces)
# Include IPv6 interfaces if there are no IPv4 ones
interfaces = _get_interfaces()
end

if isempty(interfaces)
# All networking is unavailable, initialize bind_addr to the loopback address.
# An exception will be raised later if even that is unavailable.
bind_addr = "127.0.0.1"
else
# Pick the interface with the highest negotiated speed, if any
interfaces_with_speed = filter(x -> !isnothing(x.speed), interfaces)
if isempty(interfaces_with_speed)
# If none of them report speed just pick the first one
bind_addr = string(interfaces[1].ip)
else
idx = findmax(x -> x.speed, interfaces)[2]
bind_addr = string(interfaces[idx].ip)
end
end
end

global LPROC
LPROC.bind_addr = bind_addr
LPROC.bind_port = UInt16(bind_port)
LPROC.bind_port = bind_port
end

using Random: randstring
Expand Down
14 changes: 7 additions & 7 deletions src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ addprocs([

* `env`: provide an array of string pairs such as
`env=["JULIA_DEPOT_PATH"=>"/depot"]` to request that environment variables
are set on the remote machine. By default only the environment variable
`JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote
environment.
are set on the remote machine. By default all `JULIA_*` environment variables
are passed automatically from the local to the remote environment.

* `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline
option. The (more secure) default behaviour of passing the cookie via ssh stdio
Expand Down Expand Up @@ -176,7 +175,7 @@ default_addprocs_params(::SSHManager) =
:tunnel => false,
:multiplex => false,
:max_parallel => 10,
:project => Base.current_project()))
:project => Base.active_project()))

function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition)
# Launch one worker on each unique host in parallel. Additional workers are launched later.
Expand All @@ -186,7 +185,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
@async try
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
@error "Exception launching on machine $(machine)" exception=(e, catch_backtrace())
end
end
end
Expand Down Expand Up @@ -294,8 +293,9 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa
# Build up the ssh command

# pass on some environment variables by default
for var in ["JULIA_WORKER_TIMEOUT"]
if !haskey(env, var) && haskey(ENV, var)
julia_vars = filter(startswith("JULIA_"), keys(ENV))
for var in julia_vars
if !haskey(env, var)
env[var] = ENV[var]
end
end
Expand Down
141 changes: 141 additions & 0 deletions src/network_interfaces.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
## This is a minimal version of NetworkInterfaceControllers.jl, licensed under MIT

# uv_interface_address_t has a few fields, but we don't support accessing all of
# them because `name` is the first field and it's a pointer:
# https://docs.libuv.org/en/v1.x/misc.html#c.uv_interface_address_t
#
# To safely access the other fields we would have to account for their
# offset changing on 32/64bit platforms, which we are too lazy to do (and
# don't need anyway since we only want the name).
const uv_interface_address_t = Cvoid

const sizeof_uv_interface_address_t = @ccall jl_uv_sizeof_interface_address()::Cint

function uv_interface_addresses(addresses, count)
@ccall jl_uv_interface_addresses(addresses::Ptr{Ptr{uv_interface_address_t}}, count::Ptr{Cint})::Cint
end

function uv_free_interface_addresses(addresses, count)
@ccall uv_free_interface_addresses(addresses::Ptr{uv_interface_address_t}, count::Cint)::Cvoid
end

function _next(r::Base.RefValue{Ptr{uv_interface_address_t}})
next_addr = r[] + sizeof_uv_interface_address_t
Ref(Ptr{uv_interface_address_t}(next_addr))
end

_is_loopback(addr) = 1 == @ccall jl_uv_interface_address_is_internal(addr::Ptr{uv_interface_address_t})::Cint

_sockaddr(addr) = @ccall jl_uv_interface_address_sockaddr(addr::Ptr{uv_interface_address_t})::Ptr{Cvoid}

_sockaddr_is_ip4(sockaddr::Ptr{Cvoid}) = 1 == @ccall jl_sockaddr_is_ip4(sockaddr::Ptr{Cvoid})::Cint

_sockaddr_is_ip6(sockaddr::Ptr{Cvoid}) = 1 == @ccall jl_sockaddr_is_ip6(sockaddr::Ptr{Cvoid})::Cint

_sockaddr_to_ip4(sockaddr::Ptr{Cvoid}) = IPv4(ntoh(@ccall jl_sockaddr_host4(sockaddr::Ptr{Cvoid})::Cuint))

function _sockaddr_to_ip6(sockaddr::Ptr{Cvoid})
addr6 = Ref{UInt128}()
@ccall jl_sockaddr_host6(sockaddr::Ptr{Cvoid}, addr6::Ptr{UInt128})::Cuint
IPv6(ntoh(addr6[]))
end

# Define a selection of hardware types that we're interested in. Values taken from:
# https://github.com/torvalds/linux/blob/28eb75e178d389d325f1666e422bc13bbbb9804c/include/uapi/linux/if_arp.h#L29
@enum ARPHardware begin
ARPHardware_Ethernet = 1
ARPHardware_Infiniband = 32
ARPHardware_Loopback = 772
end

struct Interface
name::String
version::Symbol
ip::IPAddr

# These two fields are taken from the sysfs /type and /speed files if available:
# https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net
type::Union{ARPHardware, Nothing}
speed::Union{Float64, Nothing}
end

function _get_interfaces(
::Type{T}=IPAddr; loopback::Bool=false
) where T <: IPAddr
addr_ref = Ref{Ptr{uv_interface_address_t}}(C_NULL)
count_ref = Ref{Int32}(1)

err = uv_interface_addresses(addr_ref, count_ref)
if err != 0
error("Call to uv_interface_addresses() to list network interfaces failed: $(err)")
end

interface_data = Interface[]
current_addr = addr_ref
for i = 0:(count_ref[]-1)
# Skip loopback devices, if so required
if (!loopback) && _is_loopback(current_addr[])
# Don't don't forget to iterate the address pointer though!
current_addr = _next(current_addr)
continue
end

# Interface name string. The name is the first field of the struct so we
# just cast the struct pointer to a Ptr{Cstring} and load it.
name_ptr = unsafe_load(Ptr{Cstring}(current_addr[]))
name = unsafe_string(name_ptr)

# Sockaddr used to load IPv4, or IPv6 addresses
sockaddr = _sockaddr(current_addr[])

# Load IP addresses
(ip_type, ip_address) = if IPv4 <: T && _sockaddr_is_ip4(sockaddr)
(:v4, _sockaddr_to_ip4(sockaddr))
elseif IPv6 <: T && _sockaddr_is_ip6(sockaddr)
(:v6, _sockaddr_to_ip6(sockaddr))
else
(:skip, nothing)
end

type = nothing
speed = nothing

@static if Sys.isunix()
# Load sysfs info
sysfs_path = "/sys/class/net/$(name)"
type_path = "$(sysfs_path)/type"
speed_path = "$(sysfs_path)/speed"

if isfile(type_path)
try
type_code = parse(Int, read(type_path, String))
if type_code in Int.(instances(ARPHardware))
type = ARPHardware(type_code)
end
catch
# Do nothing on any failure to read or parse the file
end
end

if isfile(speed_path)
try
reported_speed = parse(Float64, read(speed_path, String))
if reported_speed > 0
speed = reported_speed
end
catch
end
end
end

# Append to data vector and iterate address pointer
if ip_type != :skip
push!(interface_data, Interface(name, ip_type, ip_address, type, speed))
end
current_addr = _next(current_addr)
end

uv_free_interface_addresses(addr_ref[], count_ref[])

return interface_data
end
5 changes: 5 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl"))
id_me = nothing
id_other = nothing

@testset "Network interface info" begin
# Smoke test
@test !isempty(DistributedNext._get_interfaces(; loopback=true))
end

# Test a few "remote" invocations when no workers are present
@testset "Remote invocations with no workers" begin
@test remote(myid)() == 1
Expand Down
7 changes: 7 additions & 0 deletions test/sshmanager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ end
test_n_remove_pids(new_pids)
@test :ok == timedwait(()->!issocket(controlpath), 10.0; pollint=0.5)

print("\nTest SSH addprocs() passing environment variables\n")
withenv("JULIA_FOO" => "foo") do
new_pids = addprocs_with_testenv(["localhost"]; sshflags)
@test remotecall_fetch(() -> ENV["JULIA_FOO"], only(new_pids)) == "foo"
test_n_remove_pids(new_pids)
end

print("\nAll supported formats for hostname\n")
h1 = "localhost"
user = ENV["USER"]
Expand Down
Loading