diff --git a/docs/src/_changelog.md b/docs/src/_changelog.md index d9d6c95..cb79989 100644 --- a/docs/src/_changelog.md +++ b/docs/src/_changelog.md @@ -18,3 +18,7 @@ This documents notable changes in DistributedNext.jl. The format is based on ### Changed - Added a `project` argument to [`addprocs(::AbstractVector)`](@ref) to specify the project of a remote worker ([#2]). +- Workers will now attempt to pick the fastest available interface to + communicate over ([#9]). +- The `SSHManager` now passes all `JULIA_*` environment variables by default to + the workers, instead of only `JULIA_WORKER_TIMEOUT` ([#9]). diff --git a/src/DistributedNext.jl b/src/DistributedNext.jl index d5b66e9..ca0833b 100644 --- a/src/DistributedNext.jl +++ b/src/DistributedNext.jl @@ -102,6 +102,7 @@ end hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h)) ==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id) +include("network_interfaces.jl") include("clusterserialize.jl") include("cluster.jl") # cluster setup and management, addprocs include("messages.jl") diff --git a/src/cluster.jl b/src/cluster.jl index bb08d9b..958dc01 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -214,7 +214,7 @@ end mutable struct LocalProcess id::Int bind_addr::String - bind_port::UInt16 + bind_port::Int cookie::String LocalProcess() = new(1) end @@ -237,6 +237,11 @@ The function reads the cookie from stdin if required, and listens on a free por tasks to process incoming TCP connections and requests. It also (optionally) closes stdin and redirects stderr to stdout. +If a specific interface is not specified through `--bind-to` it will make a +best-effort attempt to pick the fastest available network interface to listen +on. The heuristics it uses for this depend on the system configuration and +should not be relied upon to always pick the fastest interface. + It does not return. """ start_worker(cookie::AbstractString=readline(stdin); kwargs...) = start_worker(stdout, cookie; kwargs...) @@ -253,8 +258,8 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std interface = IPv4(LPROC.bind_addr) if LPROC.bind_port == 0 port_hint = 9000 + (getpid() % 1000) - (port, sock) = listenany(interface, UInt16(port_hint)) - LPROC.bind_port = port + (port, sock) = listenany(interface, port_hint) + LPROC.bind_port = Int(port) else sock = listen(interface, LPROC.bind_port) end @@ -263,7 +268,7 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std process_messages(client, client, true) end) print(out, "julia_worker:") # print header - print(out, "$(string(LPROC.bind_port))#") # print port + print(out, "$(LPROC.bind_port)#") # print port print(out, LPROC.bind_addr) print(out, '\n') flush(out) @@ -1308,17 +1313,33 @@ function init_bind_addr() end else bind_port = 0 - try - bind_addr = string(getipaddr()) - catch - # All networking is unavailable, initialize bind_addr to the loopback address - # Will cause an exception to be raised only when used. + + interfaces = _get_interfaces(IPv4) + if isempty(interfaces) + # Include IPv6 interfaces if there are no IPv4 ones + interfaces = _get_interfaces() + end + + if isempty(interfaces) + # All networking is unavailable, initialize bind_addr to the loopback address. + # An exception will be raised later if even that is unavailable. bind_addr = "127.0.0.1" + else + # Pick the interface with the highest negotiated speed, if any + interfaces_with_speed = filter(x -> !isnothing(x.speed), interfaces) + if isempty(interfaces_with_speed) + # If none of them report speed just pick the first one + bind_addr = string(interfaces[1].ip) + else + idx = findmax(x -> x.speed, interfaces)[2] + bind_addr = string(interfaces[idx].ip) + end end end + global LPROC LPROC.bind_addr = bind_addr - LPROC.bind_port = UInt16(bind_port) + LPROC.bind_port = bind_port end using Random: randstring diff --git a/src/managers.jl b/src/managers.jl index 56d1f78..15e7bd7 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -138,9 +138,8 @@ addprocs([ * `env`: provide an array of string pairs such as `env=["JULIA_DEPOT_PATH"=>"/depot"]` to request that environment variables - are set on the remote machine. By default only the environment variable - `JULIA_WORKER_TIMEOUT` is passed automatically from the local to the remote - environment. + are set on the remote machine. By default all `JULIA_*` environment variables + are passed automatically from the local to the remote environment. * `cmdline_cookie`: pass the authentication cookie via the `--worker` commandline option. The (more secure) default behaviour of passing the cookie via ssh stdio @@ -176,7 +175,7 @@ default_addprocs_params(::SSHManager) = :tunnel => false, :multiplex => false, :max_parallel => 10, - :project => Base.current_project())) + :project => Base.active_project())) function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy::Condition) # Launch one worker on each unique host in parallel. Additional workers are launched later. @@ -186,7 +185,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy: @async try launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy) catch e - print(stderr, "exception launching on machine $(machine) : $(e)\n") + @error "Exception launching on machine $(machine)" exception=(e, catch_backtrace()) end end end @@ -294,8 +293,9 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa # Build up the ssh command # pass on some environment variables by default - for var in ["JULIA_WORKER_TIMEOUT"] - if !haskey(env, var) && haskey(ENV, var) + julia_vars = filter(startswith("JULIA_"), keys(ENV)) + for var in julia_vars + if !haskey(env, var) env[var] = ENV[var] end end diff --git a/src/network_interfaces.jl b/src/network_interfaces.jl new file mode 100644 index 0000000..b3c7ed7 --- /dev/null +++ b/src/network_interfaces.jl @@ -0,0 +1,141 @@ +## This is a minimal version of NetworkInterfaceControllers.jl, licensed under MIT + +# uv_interface_address_t has a few fields, but we don't support accessing all of +# them because `name` is the first field and it's a pointer: +# https://docs.libuv.org/en/v1.x/misc.html#c.uv_interface_address_t +# +# To safely access the other fields we would have to account for their +# offset changing on 32/64bit platforms, which we are too lazy to do (and +# don't need anyway since we only want the name). +const uv_interface_address_t = Cvoid + +const sizeof_uv_interface_address_t = @ccall jl_uv_sizeof_interface_address()::Cint + +function uv_interface_addresses(addresses, count) + @ccall jl_uv_interface_addresses(addresses::Ptr{Ptr{uv_interface_address_t}}, count::Ptr{Cint})::Cint +end + +function uv_free_interface_addresses(addresses, count) + @ccall uv_free_interface_addresses(addresses::Ptr{uv_interface_address_t}, count::Cint)::Cvoid +end + +function _next(r::Base.RefValue{Ptr{uv_interface_address_t}}) + next_addr = r[] + sizeof_uv_interface_address_t + Ref(Ptr{uv_interface_address_t}(next_addr)) +end + +_is_loopback(addr) = 1 == @ccall jl_uv_interface_address_is_internal(addr::Ptr{uv_interface_address_t})::Cint + +_sockaddr(addr) = @ccall jl_uv_interface_address_sockaddr(addr::Ptr{uv_interface_address_t})::Ptr{Cvoid} + +_sockaddr_is_ip4(sockaddr::Ptr{Cvoid}) = 1 == @ccall jl_sockaddr_is_ip4(sockaddr::Ptr{Cvoid})::Cint + +_sockaddr_is_ip6(sockaddr::Ptr{Cvoid}) = 1 == @ccall jl_sockaddr_is_ip6(sockaddr::Ptr{Cvoid})::Cint + +_sockaddr_to_ip4(sockaddr::Ptr{Cvoid}) = IPv4(ntoh(@ccall jl_sockaddr_host4(sockaddr::Ptr{Cvoid})::Cuint)) + +function _sockaddr_to_ip6(sockaddr::Ptr{Cvoid}) + addr6 = Ref{UInt128}() + @ccall jl_sockaddr_host6(sockaddr::Ptr{Cvoid}, addr6::Ptr{UInt128})::Cuint + IPv6(ntoh(addr6[])) +end + +# Define a selection of hardware types that we're interested in. Values taken from: +# https://github.com/torvalds/linux/blob/28eb75e178d389d325f1666e422bc13bbbb9804c/include/uapi/linux/if_arp.h#L29 +@enum ARPHardware begin + ARPHardware_Ethernet = 1 + ARPHardware_Infiniband = 32 + ARPHardware_Loopback = 772 +end + +struct Interface + name::String + version::Symbol + ip::IPAddr + + # These two fields are taken from the sysfs /type and /speed files if available: + # https://www.kernel.org/doc/Documentation/ABI/testing/sysfs-class-net + type::Union{ARPHardware, Nothing} + speed::Union{Float64, Nothing} +end + +function _get_interfaces( + ::Type{T}=IPAddr; loopback::Bool=false +) where T <: IPAddr + addr_ref = Ref{Ptr{uv_interface_address_t}}(C_NULL) + count_ref = Ref{Int32}(1) + + err = uv_interface_addresses(addr_ref, count_ref) + if err != 0 + error("Call to uv_interface_addresses() to list network interfaces failed: $(err)") + end + + interface_data = Interface[] + current_addr = addr_ref + for i = 0:(count_ref[]-1) + # Skip loopback devices, if so required + if (!loopback) && _is_loopback(current_addr[]) + # Don't don't forget to iterate the address pointer though! + current_addr = _next(current_addr) + continue + end + + # Interface name string. The name is the first field of the struct so we + # just cast the struct pointer to a Ptr{Cstring} and load it. + name_ptr = unsafe_load(Ptr{Cstring}(current_addr[])) + name = unsafe_string(name_ptr) + + # Sockaddr used to load IPv4, or IPv6 addresses + sockaddr = _sockaddr(current_addr[]) + + # Load IP addresses + (ip_type, ip_address) = if IPv4 <: T && _sockaddr_is_ip4(sockaddr) + (:v4, _sockaddr_to_ip4(sockaddr)) + elseif IPv6 <: T && _sockaddr_is_ip6(sockaddr) + (:v6, _sockaddr_to_ip6(sockaddr)) + else + (:skip, nothing) + end + + type = nothing + speed = nothing + + @static if Sys.isunix() + # Load sysfs info + sysfs_path = "/sys/class/net/$(name)" + type_path = "$(sysfs_path)/type" + speed_path = "$(sysfs_path)/speed" + + if isfile(type_path) + try + type_code = parse(Int, read(type_path, String)) + if type_code in Int.(instances(ARPHardware)) + type = ARPHardware(type_code) + end + catch + # Do nothing on any failure to read or parse the file + end + end + + if isfile(speed_path) + try + reported_speed = parse(Float64, read(speed_path, String)) + if reported_speed > 0 + speed = reported_speed + end + catch + end + end + end + + # Append to data vector and iterate address pointer + if ip_type != :skip + push!(interface_data, Interface(name, ip_type, ip_address, type, speed)) + end + current_addr = _next(current_addr) + end + + uv_free_interface_addresses(addr_ref[], count_ref[]) + + return interface_data +end diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index b52f3a3..156b986 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -15,6 +15,11 @@ include(joinpath(Sys.BINDIR, "..", "share", "julia", "test", "testenv.jl")) id_me = nothing id_other = nothing +@testset "Network interface info" begin + # Smoke test + @test !isempty(DistributedNext._get_interfaces(; loopback=true)) +end + # Test a few "remote" invocations when no workers are present @testset "Remote invocations with no workers" begin @test remote(myid)() == 1 diff --git a/test/sshmanager.jl b/test/sshmanager.jl index 2f82637..956dd87 100644 --- a/test/sshmanager.jl +++ b/test/sshmanager.jl @@ -71,6 +71,13 @@ end test_n_remove_pids(new_pids) @test :ok == timedwait(()->!issocket(controlpath), 10.0; pollint=0.5) + print("\nTest SSH addprocs() passing environment variables\n") + withenv("JULIA_FOO" => "foo") do + new_pids = addprocs_with_testenv(["localhost"]; sshflags) + @test remotecall_fetch(() -> ENV["JULIA_FOO"], only(new_pids)) == "foo" + test_n_remove_pids(new_pids) + end + print("\nAll supported formats for hostname\n") h1 = "localhost" user = ENV["USER"]