diff --git a/src/K8sClusterManagers.jl b/src/K8sClusterManagers.jl index 090c8d0..96566f4 100644 --- a/src/K8sClusterManagers.jl +++ b/src/K8sClusterManagers.jl @@ -1,7 +1,7 @@ module K8sClusterManagers using DataStructures: DefaultOrderedDict, OrderedDict -using Distributed: Distributed, ClusterManager, WorkerConfig, cluster_cookie +using Distributed: Distributed, ClusterManager, WorkerConfig, write_cookie using JSON: JSON using Mocking: Mocking, @mock using kubectl_jll diff --git a/src/native_driver.jl b/src/native_driver.jl index c79689b..5748523 100644 --- a/src/native_driver.jl +++ b/src/native_driver.jl @@ -1,10 +1,6 @@ const DEFAULT_WORKER_CPU = 1 const DEFAULT_WORKER_MEMORY = "4Gi" -# Port number listened to by workers. The port number was randomly chosen from the ephemeral -# port range: 49152-65535. -const WORKER_PORT = 51400 - # Notifies tasks that the abnormal worker deregistration warning has been emitted const DEREGISTER_ALERT = Condition() @@ -91,9 +87,7 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: exename = params[:exename] exeflags = params[:exeflags] - # Note: We currently use the same port number for all workers but this isn't strictly - # required. - cmd = `$exename $exeflags --worker=$(cluster_cookie()) --bind-to=0:$WORKER_PORT` + cmd = `$exename $exeflags --worker` worker_manifest = @static if VERSION >= v"1.5" worker_pod_spec(manager; cmd) @@ -104,10 +98,18 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: # Note: User-defined `configure` function may or may-not be mutating worker_manifest = manager.configure(worker_manifest) + # Without stdin the `kubectl attach -i` process will be unable to send the cluster + # cookie to the worker. + # Note: Assumes worker pod uses only a single container + if !get(worker_manifest["spec"]["containers"][1], "stdin", false) + error("Worker pod container must enable support for stdin") + end + @sync for i in 1:manager.np @async begin pod_name = create_pod(worker_manifest) + pod = try wait_for_running_pod(pod_name; timeout=manager.pending_timeout) catch e @@ -115,16 +117,27 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched:: rethrow() end - # Wait a few seconds to allow the worker to start listening to connections at - # expected port. If we don't wait long enough we will see a "connection refused" - # error (https://github.com/beacon-biosignals/K8sClusterManagers.jl/issues/46) @info "$pod_name is up" - sleep(4) + + # We'll ignore stderr as `kubectl attach` always outputs: + # "If you don't see a command prompt, try pressing enter." + # TODO: Ideally we would just ignore this line and report anything else but + # unfortunately using an `IOBuffer` here never seems to capture any output. + # + # Note: The `start_worker` function by default redirects stderr to stdout which + # means the stderr captured here should entirely be from `kubectl` (or possibly + # from the worker if an error occurred before `start_worker`). + p = kubectl() do exe + attach_cmd = `$exe attach -i pod/$pod_name -c=worker` + open(pipeline(detach(attach_cmd), stderr=stderr), "r+") + end + + write_cookie(p) config = WorkerConfig() - config.host = pod["status"]["podIP"] - config.port = WORKER_PORT + config.io = p.out config.userdata = (; pod_name=pod_name) + push!(launched, config) notify(c) end diff --git a/src/pod.jl b/src/pod.jl index 5c0ca47..06fd111 100644 --- a/src/pod.jl +++ b/src/pod.jl @@ -219,6 +219,7 @@ function worker_pod_spec!(pod::AbstractDict; rdict("name" => "worker", "image" => image, "command" => collect(cmd), + "stdin" => true, "resources" => rdict("requests" => rdict("cpu" => cpu, "memory" => memory), "limits" => rdict("cpu" => cpu, diff --git a/test/cluster.jl b/test/cluster.jl index fde8e95..ea10cb2 100644 --- a/test/cluster.jl +++ b/test/cluster.jl @@ -29,6 +29,11 @@ const TEST_IMAGE = get(ENV, "K8S_CLUSTER_MANAGERS_TEST_IMAGE", "k8s-cluster-mana const POD_NAME_REGEX = r"Worker pod (?\d+): (?[a-z0-9.-]+)" +# Note: Regex should be generic enough to capture any stray output from the workers +const POD_OUTPUT_REGEX = r"From worker (?\d+):\s+(?.*?)\r?\n" + +const PROMPT_HINT = "If you don't see a command prompt, try pressing enter." + # As a convenience we'll automatically build the Docker image when a user uses `Pkg.test()`. # If the environmental variable is set we expect the Docker image has already been built. if !haskey(ENV, "K8S_CLUSTER_MANAGERS_TEST_IMAGE") @@ -165,13 +170,18 @@ let job_name = "test-success" pod["spec"]["containers"][1]["imagePullPolicy"] = "Never" return pod end - addprocs(K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi")) + pids = addprocs(K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi")) println("Num Processes: ", nprocs()) for i in workers() # Return the name of the pod via HOSTNAME println("Worker pod \$i: ", remotecall_fetch(() -> ENV["HOSTNAME"], i)) end + + # Ensure that stdout/stderr on the worker is displayed on the manager + @everywhere pids begin + println(ENV["HOSTNAME"]) + end """ command = ["julia"] @@ -191,7 +201,8 @@ let job_name = "test-success" worker_pod = first(pod_names("manager" => manager_pod)) manager_log = pod_logs(manager_pod) - matches = collect(eachmatch(POD_NAME_REGEX, manager_log)) + call_matches = collect(eachmatch(POD_NAME_REGEX, manager_log)) + output_matches = collect(eachmatch(POD_OUTPUT_REGEX, manager_log)) test_results = [ @test get_job(job_name, jsonpath="{.status..type}") == "Complete" @@ -202,9 +213,16 @@ let job_name = "test-success" @test pod_phase(manager_pod) == "Succeeded" @test pod_phase(worker_pod) == "Succeeded" - @test length(matches) == 1 - @test matches[1][:worker_id] == "2" - @test matches[1][:pod_name] == worker_pod + @test length(call_matches) == 1 + @test call_matches[1][:worker_id] == "2" + @test call_matches[1][:pod_name] == worker_pod + + @test length(output_matches) == 1 + @test output_matches[1][:worker_id] == "2" + @test output_matches[1][:output] == worker_pod + + # `kubectl attach` reports this warning + @test_broken !occursin(PROMPT_HINT, manager_log) # Ensure there are no unexpected error messages in the log @test !occursin(r"\bError\b"i, manager_log) @@ -321,7 +339,7 @@ let job_name = "test-interrupt" @test pod_phase(worker_pod) == "Failed" # Ensure there are no unexpected error messages in the log - @test !occursin(r"\bError\b"i, manager_log) + @test length(collect(eachmatch(r"\bError\b"i, manager_log))) == 1 ] # Display details to assist in debugging the failure diff --git a/test/job.template.yaml b/test/job.template.yaml index 3011da5..40c48c7 100644 --- a/test/job.template.yaml +++ b/test/job.template.yaml @@ -10,6 +10,9 @@ rules: - apiGroups: [""] resources: ["pods/exec"] verbs: ["create"] +- apiGroups: [""] + resources: ["pods/attach"] + verbs: ["create"] --- # https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/ diff --git a/test/pod.jl b/test/pod.jl index d973cf6..a67c589 100644 --- a/test/pod.jl +++ b/test/pod.jl @@ -103,7 +103,7 @@ end @test length(pod["spec"]["containers"]) == 1 worker = pod["spec"]["containers"][1] - @test keys(worker) == Set(["name", "image", "command", "resources"]) + @test keys(worker) == Set(["name", "image", "command", "resources", "stdin"]) @test worker["name"] == "worker" @test worker["image"] == "julia" @test worker["command"] == ["julia"] @@ -111,6 +111,7 @@ end @test worker["resources"]["requests"]["memory"] == DEFAULT_WORKER_MEMORY @test worker["resources"]["limits"]["cpu"] == DEFAULT_WORKER_CPU @test worker["resources"]["limits"]["memory"] == DEFAULT_WORKER_MEMORY + @test worker["stdin"] == true end @testset "isk8s" begin