Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class TraceRecord implements Serializable {
hostname: 'str',
cpu_model: 'str',
accelerator: 'num',
accelerator_type: 'str'
accelerator_type: 'str',
used_cpus: 'num'
]

static public Map<String,Closure<String>> FORMATTER = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,62 @@ nxf_mem_watch() {
"inv_ctxt=${nxf_stat_ret[7]}" >> "$trace_file" || >&2 echo "Error: Failed to append to file: $trace_file"
}

nxf_cpu_watch(){
set -u

root_pid="$1"
local -A seen_cpus

## breadth-first descendant collection
collect_child_processes() {
local queue=("$1")
local out=()

while ((${#queue[@]})); do
local p=${queue[0]}
queue=("${queue[@]:1}")

[[ -d /proc/$p ]] || continue
out+=("$p")

mapfile -t kids < <(pgrep -P "$p" 2>/dev/null)
queue+=("${kids[@]}")
done

printf "%s\n" "${out[@]}"
}

## CPU sampling per PID (includes threads)
sample_pid_threads() {
local pid="$1"

for t in /proc/"$pid"/task/*; do
[[ -r "$t/stat" ]] || continue
cpu=$(awk '{print $39}' "$t/stat" 2>/dev/null) || continue
[[ $cpu =~ ^[0-9]+$ ]] && seen_cpus["$cpu"]=1
done
}

## main sampling loop
while kill -0 "$root_pid" 2>/dev/null; do
mapfile -t pids < <(collect_child_processes "$root_pid")

for p in "${pids[@]}"; do
sample_pid_threads "$p"
done

nxf_sleep 0.1
done

echo "${#seen_cpus[@]}"
}

nxf_write_trace() {
printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"used_cpus=$used_cpus" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
Expand All @@ -149,6 +200,7 @@ nxf_trace_mac() {
local ucpu=''
local cpu_model=''
local io_stat1=('' '' '' '' '' '')
local used_cpus=''
nxf_write_trace
}

Expand Down Expand Up @@ -184,6 +236,9 @@ nxf_trace_linux() {
eval "exec $mem_fd> >(nxf_mem_watch $task)"
local mem_proc=$!

## Report number of used CPUs via polling
local used_cpus=$(nxf_cpu_watch "$task")

wait $task

## compute cpu usage time for processes
Expand All @@ -201,17 +256,7 @@ nxf_trace_linux() {
local wall_time=$((end_millis-start_millis))
[ $NXF_DEBUG = 1 ] && echo "+++ STATS %CPU=$ucpu TIME=$wall_time I/O=${io_stat1[*]}"

printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
"syscr=${io_stat1[2]}" \
"syscw=${io_stat1[3]}" \
"read_bytes=${io_stat1[4]}" \
"write_bytes=${io_stat1[5]}" >| "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"
nxf_write_trace

## join nxf_mem_watch
[ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class TraceRecordTest extends Specification {
nextflow.trace/v2
realtime=12021
%cpu=997
used_cpus=32
rchar=50838
wchar=317
syscr=120
Expand All @@ -181,6 +182,7 @@ class TraceRecordTest extends Specification {
then:
trace.realtime == 12021
trace.'%cpu' == 99.7
trace.used_cpus == 32
trace.rchar == 50838
trace.wchar == 317
trace.syscr == 120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,59 @@ nxf_mem_watch() {
"inv_ctxt=${nxf_stat_ret[7]}" >> "$trace_file" || >&2 echo "Error: Failed to append to file: $trace_file"
}

nxf_cpu_watch(){
set -u

root_pid="$1"
local -A seen_cpus

collect_child_processes() {
local queue=("$1")
local out=()

while ((${#queue[@]})); do
local p=${queue[0]}
queue=("${queue[@]:1}")

[[ -d /proc/$p ]] || continue
out+=("$p")

mapfile -t kids < <(pgrep -P "$p" 2>/dev/null)
queue+=("${kids[@]}")
done

printf "%s\n" "${out[@]}"
}

sample_pid_threads() {
local pid="$1"

for t in /proc/"$pid"/task/*; do
[[ -r "$t/stat" ]] || continue
cpu=$(awk '{print $39}' "$t/stat" 2>/dev/null) || continue
[[ $cpu =~ ^[0-9]+$ ]] && seen_cpus["$cpu"]=1
done
}

while kill -0 "$root_pid" 2>/dev/null; do
mapfile -t pids < <(collect_child_processes "$root_pid")

for p in "${pids[@]}"; do
sample_pid_threads "$p"
done

nxf_sleep 0.1
done

echo "${#seen_cpus[@]}"
}

nxf_write_trace() {
printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"used_cpus=$used_cpus" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
Expand All @@ -132,6 +180,7 @@ nxf_trace_mac() {
local ucpu=''
local cpu_model=''
local io_stat1=('' '' '' '' '' '')
local used_cpus=''
nxf_write_trace
}

Expand Down Expand Up @@ -159,6 +208,8 @@ nxf_trace_linux() {
eval "exec $mem_fd> >(nxf_mem_watch $task)"
local mem_proc=$!

local used_cpus=$(nxf_cpu_watch "$task")

wait $task

local end_millis=$(nxf_date)
Expand All @@ -175,17 +226,7 @@ nxf_trace_linux() {
local wall_time=$((end_millis-start_millis))
[ $NXF_DEBUG = 1 ] && echo "+++ STATS %CPU=$ucpu TIME=$wall_time I/O=${io_stat1[*]}"

printf "%s\n" \
"nextflow.trace/v2" \
"realtime=$wall_time" \
"%cpu=$ucpu" \
"cpu_model=$cpu_model" \
"rchar=${io_stat1[0]}" \
"wchar=${io_stat1[1]}" \
"syscr=${io_stat1[2]}" \
"syscw=${io_stat1[3]}" \
"read_bytes=${io_stat1[4]}" \
"write_bytes=${io_stat1[5]}" >| "$trace_file" || >&2 echo "Error: Failed to write to file: $trace_file"
nxf_write_trace

[ -e /proc/$mem_proc ] && eval "echo 'DONE' >&$mem_fd" || true
wait $mem_proc 2>/dev/null || true
Expand Down
Loading