Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/K8sClusterManagers.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module K8sClusterManagers

using DataStructures: DefaultOrderedDict, OrderedDict
using Distributed: Distributed, ClusterManager, WorkerConfig, cluster_cookie
using Distributed: Distributed, ClusterManager, WorkerConfig, write_cookie
using JSON: JSON
using Mocking: Mocking, @mock
using kubectl_jll
Expand Down
39 changes: 26 additions & 13 deletions src/native_driver.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
const DEFAULT_WORKER_CPU = 1
const DEFAULT_WORKER_MEMORY = "4Gi"

# Port number listened to by workers. The port number was randomly chosen from the ephemeral
# port range: 49152-65535.
const WORKER_PORT = 51400

# Notifies tasks that the abnormal worker deregistration warning has been emitted
const DEREGISTER_ALERT = Condition()

Expand Down Expand Up @@ -91,9 +87,7 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched::
exename = params[:exename]
exeflags = params[:exeflags]

# Note: We currently use the same port number for all workers but this isn't strictly
# required.
cmd = `$exename $exeflags --worker=$(cluster_cookie()) --bind-to=0:$WORKER_PORT`
cmd = `$exename $exeflags --worker`

worker_manifest = @static if VERSION >= v"1.5"
worker_pod_spec(manager; cmd)
Expand All @@ -104,27 +98,46 @@ function Distributed.launch(manager::K8sClusterManager, params::Dict, launched::
# Note: User-defined `configure` function may or may-not be mutating
worker_manifest = manager.configure(worker_manifest)

# Without stdin the `kubectl attach -i` process will be unable to send the cluster
# cookie to the worker.
# Note: Assumes worker pod uses only a single container
if !get(worker_manifest["spec"]["containers"][1], "stdin", false)
error("Worker pod container must enable support for stdin")
end

@sync for i in 1:manager.np
@async begin
pod_name = create_pod(worker_manifest)


pod = try
wait_for_running_pod(pod_name; timeout=manager.pending_timeout)
catch e
delete_pod(pod_name; wait=false)
rethrow()
end

# Wait a few seconds to allow the worker to start listening to connections at
# expected port. If we don't wait long enough we will see a "connection refused"
# error (https://github.com/beacon-biosignals/K8sClusterManagers.jl/issues/46)
@info "$pod_name is up"
sleep(4)

# We'll ignore stderr as `kubectl attach` always outputs:
# "If you don't see a command prompt, try pressing enter."
# TODO: Ideally we would just ignore this line and report anything else but
# unfortunately using an `IOBuffer` here never seems to capture any output.
#
# Note: The `start_worker` function by default redirects stderr to stdout which
# means the stderr captured here should entirely be from `kubectl` (or possibly
# from the worker if an error occurred before `start_worker`).
p = kubectl() do exe
attach_cmd = `$exe attach -i pod/$pod_name -c=worker`
open(pipeline(detach(attach_cmd), stderr=stderr), "r+")
end

write_cookie(p)

config = WorkerConfig()
config.host = pod["status"]["podIP"]
config.port = WORKER_PORT
config.io = p.out
config.userdata = (; pod_name=pod_name)

push!(launched, config)
notify(c)
end
Expand Down
1 change: 1 addition & 0 deletions src/pod.jl
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ function worker_pod_spec!(pod::AbstractDict;
rdict("name" => "worker",
"image" => image,
"command" => collect(cmd),
"stdin" => true,
"resources" => rdict("requests" => rdict("cpu" => cpu,
"memory" => memory),
"limits" => rdict("cpu" => cpu,
Expand Down
30 changes: 24 additions & 6 deletions test/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ const TEST_IMAGE = get(ENV, "K8S_CLUSTER_MANAGERS_TEST_IMAGE", "k8s-cluster-mana

const POD_NAME_REGEX = r"Worker pod (?<worker_id>\d+): (?<pod_name>[a-z0-9.-]+)"

# Note: Regex should be generic enough to capture any stray output from the workers
const POD_OUTPUT_REGEX = r"From worker (?<worker_id>\d+):\s+(?<output>.*?)\r?\n"

const PROMPT_HINT = "If you don't see a command prompt, try pressing enter."

# As a convenience we'll automatically build the Docker image when a user uses `Pkg.test()`.
# If the environmental variable is set we expect the Docker image has already been built.
if !haskey(ENV, "K8S_CLUSTER_MANAGERS_TEST_IMAGE")
Expand Down Expand Up @@ -165,13 +170,18 @@ let job_name = "test-success"
pod["spec"]["containers"][1]["imagePullPolicy"] = "Never"
return pod
end
addprocs(K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi"))
pids = addprocs(K8sClusterManager(1; configure, pending_timeout=60, cpu="0.5", memory="300Mi"))

println("Num Processes: ", nprocs())
for i in workers()
# Return the name of the pod via HOSTNAME
println("Worker pod \$i: ", remotecall_fetch(() -> ENV["HOSTNAME"], i))
end

# Ensure that stdout/stderr on the worker is displayed on the manager
@everywhere pids begin
println(ENV["HOSTNAME"])
end
"""

command = ["julia"]
Expand All @@ -191,7 +201,8 @@ let job_name = "test-success"
worker_pod = first(pod_names("manager" => manager_pod))

manager_log = pod_logs(manager_pod)
matches = collect(eachmatch(POD_NAME_REGEX, manager_log))
call_matches = collect(eachmatch(POD_NAME_REGEX, manager_log))
output_matches = collect(eachmatch(POD_OUTPUT_REGEX, manager_log))

test_results = [
@test get_job(job_name, jsonpath="{.status..type}") == "Complete"
Expand All @@ -202,9 +213,16 @@ let job_name = "test-success"
@test pod_phase(manager_pod) == "Succeeded"
@test pod_phase(worker_pod) == "Succeeded"

@test length(matches) == 1
@test matches[1][:worker_id] == "2"
@test matches[1][:pod_name] == worker_pod
@test length(call_matches) == 1
@test call_matches[1][:worker_id] == "2"
@test call_matches[1][:pod_name] == worker_pod

@test length(output_matches) == 1
@test output_matches[1][:worker_id] == "2"
@test output_matches[1][:output] == worker_pod

# `kubectl attach` reports this warning
@test_broken !occursin(PROMPT_HINT, manager_log)

# Ensure there are no unexpected error messages in the log
@test !occursin(r"\bError\b"i, manager_log)
Expand Down Expand Up @@ -321,7 +339,7 @@ let job_name = "test-interrupt"
@test pod_phase(worker_pod) == "Failed"

# Ensure there are no unexpected error messages in the log
@test !occursin(r"\bError\b"i, manager_log)
@test length(collect(eachmatch(r"\bError\b"i, manager_log))) == 1
]

# Display details to assist in debugging the failure
Expand Down
3 changes: 3 additions & 0 deletions test/job.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ rules:
- apiGroups: [""]
resources: ["pods/exec"]
verbs: ["create"]
- apiGroups: [""]
resources: ["pods/attach"]
verbs: ["create"]

---
# https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/
Expand Down
3 changes: 2 additions & 1 deletion test/pod.jl
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ end
@test length(pod["spec"]["containers"]) == 1

worker = pod["spec"]["containers"][1]
@test keys(worker) == Set(["name", "image", "command", "resources"])
@test keys(worker) == Set(["name", "image", "command", "resources", "stdin"])
@test worker["name"] == "worker"
@test worker["image"] == "julia"
@test worker["command"] == ["julia"]
@test worker["resources"]["requests"]["cpu"] == DEFAULT_WORKER_CPU
@test worker["resources"]["requests"]["memory"] == DEFAULT_WORKER_MEMORY
@test worker["resources"]["limits"]["cpu"] == DEFAULT_WORKER_CPU
@test worker["resources"]["limits"]["memory"] == DEFAULT_WORKER_MEMORY
@test worker["stdin"] == true
end

@testset "isk8s" begin
Expand Down