Skip to content

Commit ad4eaa6

Browse files
committed
pkg-auto: Run package handling in jobs
This spawns some jobs, where each is waiting for messages from main process. The message can be either a number followed by the number of packages to handle (a batch) or command to shut down when there is no more packages left to process. On the other hand, job can send a message to the main process that it is done with the batch and is ready for the next one. Any other message is printed on the terminal by the main process. After the packages are processed, the main process will collect and merge the job reports into the main one.
1 parent f5a1939 commit ad4eaa6

File tree

1 file changed

+248
-15
lines changed

1 file changed

+248
-15
lines changed

pkg_auto/impl/pkg_auto_lib.sh

Lines changed: 248 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,13 +1985,101 @@ function package_output_paths_unset() {
19851985
unset "${@}"
19861986
}
19871987

1988+
# Fields of package job state struct.
1989+
#
1990+
# PJS_JOB_IDX - name of a job variable
1991+
# PJS_DIR_IDX - path to job's state directory
1992+
declare -gri PJS_JOB_IDX=0 PJS_DIR_IDX=1
1993+
1994+
# Declare package job state variables.
1995+
#
1996+
# Parameters:
1997+
#
1998+
# @ - names of variables to be used for package job states
1999+
function pkg_job_state_declare() {
2000+
struct_declare -ga "${@}" "( '' '' )"
2001+
}
2002+
2003+
# Unset package job state variables.
2004+
#
2005+
# Parameters:
2006+
#
2007+
# @ - names of package job state variables
2008+
function pkg_job_state_unset() {
2009+
local name job_name
2010+
for name; do
2011+
local -n pkg_job_state_ref=${name}
2012+
job_name=${pkg_job_state_ref[PJS_JOB_IDX]}
2013+
if [[ -n ${job_name} ]]; then
2014+
job_unset "${job_name}"
2015+
fi
2016+
unset -n ref
2017+
done
2018+
unset "${@}"
2019+
}
2020+
2021+
# Messages used in communication between package jobs and the main
2022+
# process.
2023+
#
2024+
# READYFORMORE is a message that a package job sends to the main
2025+
# process when it is done with processing the current batch of
2026+
# packages and asks for more.
2027+
#
2028+
# WEAREDONE is a message that the main process sends to a package job
2029+
# when there are no more packages to process, so the job should
2030+
# terminate.
2031+
declare -gr ready_for_more_msg='READYFORMORE' we_are_done_msg='WEAREDONE'
2032+
2033+
# A job function for handling package updates. Receives a batch of
2034+
# packages to process, processes them, writes results to a given
2035+
# directory and asks for more packages when done.
2036+
#
2037+
# Parameters:
2038+
#
2039+
# 1 - output directory for the package handling results
2040+
# 2 - name of a bunch of maps variable
2041+
function handle_package_changes_job() {
2042+
local output_dir=${1}; shift
2043+
local bunch_of_maps_var_name=${1}; shift
2044+
local we_are_done='' line
2045+
local -a reply_lines pair
2046+
local -i i pkg_count
2047+
2048+
local REPLY
2049+
while [[ -z ${we_are_done} ]]; do
2050+
echo "${ready_for_more_msg}"
2051+
read -r
2052+
if [[ ${REPLY} = "${we_are_done_msg}" ]]; then
2053+
we_are_done=x
2054+
elif [[ ${REPLY} =~ ^[0-9]+$ ]]; then
2055+
reply_lines=()
2056+
pkg_count=${REPLY}
2057+
for ((i = 0; i < pkg_count; ++i)); do
2058+
read -r
2059+
reply_lines+=( "${REPLY}" )
2060+
done
2061+
for line in "${reply_lines[@]}"; do
2062+
mapfile -t pair <<<"${line// /$'\n'}"
2063+
if [[ ${#pair[@]} -eq 2 ]]; then
2064+
handle_one_package_change "${output_dir}" "${bunch_of_maps_var_name}" "${pair[@]}"
2065+
else
2066+
echo "invalid message received: ${line@Q}, expected a pair of package names"
2067+
fi
2068+
done
2069+
else
2070+
echo "invalid message received: ${REPLY@Q}, expected a number or ${we_are_done_msg@Q}"
2071+
fi
2072+
done
2073+
return 0
2074+
}
2075+
19882076
function handle_one_package_change() {
19892077
local output_dir=${1}; shift
19902078
local -n bunch_of_maps_ref=${1}; shift
19912079
local old_name=${1}; shift
19922080
local new_name=${1}; shift
19932081

1994-
local warnings_dir="${output_dir}"
2082+
local warnings_dir="${output_dir}/warnings"
19952083
local updates_dir="${output_dir}/updates"
19962084

19972085
local pkg_to_tags_mvm_var_name=${bunch_of_maps_ref[BOM_PKG_TO_TAGS_MVM_IDX]}
@@ -2293,8 +2381,6 @@ function handle_package_changes() {
22932381
hpc_package_sources_map=()
22942382
read_package_sources hpc_package_sources_map
22952383

2296-
mkdir -p "${REPORTS_DIR}/updates"
2297-
22982384
local -a old_pkgs new_pkgs
22992385
old_pkgs=()
23002386
new_pkgs=()
@@ -2381,6 +2467,155 @@ function handle_package_changes() {
23812467
hpc_bunch_of_maps[BOM_NEW_PKG_SLOT_VERMINMAX_MAP_MVM_IDX]=hpc_new_pkg_slot_verminmax_map_mvm
23822468
hpc_bunch_of_maps[BOM_PKG_SOURCES_MAP_IDX]=hpc_package_sources_map
23832469

2470+
# We will be spawning as many jobs below as there are available
2471+
# processors/cores. Each job has its own work directory and will
2472+
# be receiving packages to process in batches of five. Once all
2473+
# the packages are processed, their reports are aggregated into a
2474+
# single one.
2475+
local -i pkg_batch_size=5 this_batch_size pkg_idx=0 pkg_count=${#old_pkgs[@]}
2476+
local -a pkg_batch old_pkgs_batch new_pkgs_batch
2477+
2478+
local pkg_job_top_dir="${WORKDIR}/pkgjobdirs"
2479+
create_cleanup_dir "${pkg_job_top_dir}"
2480+
2481+
local -a pkg_job_state_names=()
2482+
local pkg_job_state_name pkg_job_name pkg_job_dir pkg_job_warnings_dir pkg_job_updates_dir
2483+
2484+
local -i job_count i
2485+
get_num_proc job_count
2486+
2487+
local file
2488+
local -a paths
2489+
2490+
# Set up environment for each job, create a job state and kick off
2491+
# the job.
2492+
for ((i = 0; i < job_count; ++i)); do
2493+
gen_varname pkg_job_state_name
2494+
pkg_job_state_declare "${pkg_job_state_name}"
2495+
gen_varname pkg_job_name
2496+
job_declare "${pkg_job_name}"
2497+
2498+
pkg_job_dir="${pkg_job_top_dir}/j${i}"
2499+
pkg_job_warnings_dir="${pkg_job_dir}/warnings"
2500+
pkg_job_updates_dir="${pkg_job_dir}/updates"
2501+
create_cleanup_dir "${pkg_job_dir}"
2502+
create_cleanup_dir "${pkg_job_warnings_dir}"
2503+
create_cleanup_dir "${pkg_job_updates_dir}"
2504+
paths=()
2505+
for file in developer-warnings warnings manual-work-needed; do
2506+
paths+=( "${pkg_job_dir}/warnings/${file}" )
2507+
done
2508+
for file in summary_stubs changelog_stubs; do
2509+
paths+=( "${pkg_job_dir}/updates/${file}" )
2510+
done
2511+
# TODO: That's a bit messy
2512+
add_cleanup "find -P ${pkg_job_updates_dir@Q} -mindepth 1 -maxdepth 1 -type d -exec rm -rf {} +"
2513+
add_cleanup "rm -f ${paths[*]@Q}"
2514+
2515+
job_run -m "${pkg_job_name}" handle_package_changes_job "${pkg_job_dir}" hpc_bunch_of_maps
2516+
2517+
local -n pkg_job_state_ref="${pkg_job_state_name}"
2518+
pkg_job_state_ref[PJS_JOB_IDX]=${pkg_job_name}
2519+
pkg_job_state_ref[PJS_DIR_IDX]=${pkg_job_dir}
2520+
unset -n pkg_job_state_ref
2521+
pkg_job_state_names+=( "${pkg_job_state_name}" )
2522+
done
2523+
2524+
# We have two job arrays, "current" and "next". When iterating the
2525+
# "current" array, we will be putting all still alive jobs into
2526+
# the "next" array, which will become "current" in the next
2527+
# iteration. In every iteration we collect the output and send
2528+
# another batch of packages to be processed by a job if it's
2529+
# ready. We terminate the jobs when we have run out of
2530+
# packages. The looping finishes when all the jobs are terminated.
2531+
local -i current_idx=0 next_idx=1 idx state_count=${#pkg_job_state_names[@]}
2532+
local -a pkg_job_state_names_0=( "${pkg_job_state_names[@]}" ) pkg_job_state_names_1=() pkg_job_output_lines
2533+
local pkg_job_output_line pkg_job_input_sent
2534+
while [[ state_count -gt 0 ]]; do
2535+
local -n pkg_job_state_names_ref=pkg_job_state_names_${current_idx}
2536+
local -n next_pkg_job_state_names_ref=pkg_job_state_names_${next_idx}
2537+
2538+
next_pkg_job_state_names_ref=()
2539+
for pkg_job_state_name in "${pkg_job_state_names_ref[@]}"; do
2540+
local -n pkg_job_state_ref=${pkg_job_state_name}
2541+
pkg_job_name=${pkg_job_state_ref[PJS_JOB_IDX]}
2542+
unset -n pkg_job_state_ref
2543+
if job_is_alive "${pkg_job_name}"; then
2544+
next_pkg_job_state_names_ref+=( "${pkg_job_state_name}" )
2545+
fi
2546+
job_get_output "${pkg_job_name}" pkg_job_output_lines
2547+
pkg_job_input_sent=
2548+
for pkg_job_output_line in "${pkg_job_output_lines[@]}"; do
2549+
if [[ ${pkg_job_output_line} = "${ready_for_more_msg}" ]]; then
2550+
if [[ -z ${pkg_job_input_sent} ]]; then
2551+
if [[ pkg_idx -ge pkg_count ]]; then
2552+
job_send_input "${pkg_job_name}" "${we_are_done_msg}"
2553+
else
2554+
old_pkgs_batch=( "${old_pkgs[@]:pkg_idx:pkg_batch_size}" )
2555+
new_pkgs_batch=( "${new_pkgs[@]:pkg_idx:pkg_batch_size}" )
2556+
this_batch_size=${#old_pkgs_batch[@]}
2557+
pkg_batch=( "${this_batch_size}" )
2558+
for ((i = 0; i < this_batch_size; ++i)); do
2559+
old_pkg=${old_pkgs_batch[i]}
2560+
new_pkg=${new_pkgs_batch[i]}
2561+
pkg_batch+=( "${old_pkg} ${new_pkg}" )
2562+
done
2563+
pkg_idx=$((pkg_idx + pkg_batch_size))
2564+
job_send_input "${pkg_job_name}" "${pkg_batch[@]}"
2565+
fi
2566+
pkg_job_input_sent=x
2567+
fi
2568+
else
2569+
# The job already used info to print this line, so
2570+
# we should just echo it, otherwise we will get
2571+
# repeated prefixes ("script_name: script_name:
2572+
# something happenend")
2573+
echo "${pkg_job_output_line}"
2574+
fi
2575+
done
2576+
done
2577+
state_count=${#next_pkg_job_state_names_ref[@]}
2578+
if [[ state_count -gt 0 ]]; then
2579+
sleep 0.2
2580+
fi
2581+
2582+
unset -n pkg_job_state_names_ref next_pkg_job_state_names_ref
2583+
idx=${current_idx}
2584+
current_idx=${next_idx}
2585+
next_idx=${idx}
2586+
done
2587+
2588+
# All the jobs are done, so here we collect all their reports and
2589+
# merge them into the main ones in reports directory.
2590+
local some_job_failed='' hpc_filename
2591+
local -i hpc_rv
2592+
truncate --size=0 "${REPORTS_DIR}/updates/summary_stubs" "${REPORTS_DIR}/updates/changelog_stubs"
2593+
for pkg_job_state_name in "${pkg_job_state_names[@]}"; do
2594+
local -n pkg_job_state_ref=${pkg_job_state_name}
2595+
pkg_job_name=${pkg_job_state_ref[PJS_JOB_IDX]}
2596+
pkg_job_dir=${pkg_job_state_ref[PJS_DIR_IDX]}
2597+
unset -n pkg_job_state_ref
2598+
job_reap "${pkg_job_name}" hpc_rv
2599+
if [[ hpc_rv -ne 0 ]]; then
2600+
some_job_failed=x
2601+
fi
2602+
for file in "${pkg_job_dir}/warnings/"*; do
2603+
basename_out "${file}" hpc_filename
2604+
cat "${file}" >>"${REPORTS_DIR}/${hpc_filename}"
2605+
done
2606+
for file in "${pkg_job_dir}/updates/"*; do
2607+
basename_out "${file}" hpc_filename
2608+
if [[ -f ${file} ]]; then
2609+
cat "${file}" >>"${REPORTS_DIR}/updates/${hpc_filename}"
2610+
elif [[ -d ${file} ]]; then
2611+
if [[ ! -d "${REPORTS_DIR}/updates/${hpc_filename}" ]]; then
2612+
mkdir -p "${REPORTS_DIR}/updates/${hpc_filename}"
2613+
fi
2614+
mv "${file}/"* "${REPORTS_DIR}/updates/${hpc_filename}"
2615+
fi
2616+
done
2617+
done
2618+
23842619
# The loop below goes over the pairs of old and new package
23852620
# names. For each name there will be some checks done (like does
23862621
# this package even exist). Each name in the pair has a set of
@@ -2398,20 +2633,16 @@ function handle_package_changes() {
23982633
# handled by the automation - in such cases there will be a
23992634
# "manual action needed" report.
24002635

2401-
local pkg_idx=0
2402-
local old_name new_name
2403-
while [[ ${pkg_idx} -lt ${#old_pkgs[@]} ]]; do
2404-
old_name=${old_pkgs["${pkg_idx}"]}
2405-
new_name=${new_pkgs["${pkg_idx}"]}
2406-
2407-
handle_one_package_change "${REPORTS_DIR}" hpc_bunch_of_maps "${old_name}" "${new_name}"
2408-
done
2409-
2636+
pkg_job_state_unset "${pkg_job_state_names[@]}"
24102637
bunch_of_maps_unset hpc_bunch_of_maps
24112638

24122639
mvm_unset hpc_new_pkg_slot_verminmax_map_mvm
24132640
mvm_unset hpc_old_pkg_slot_verminmax_map_mvm
24142641
mvm_unset hpc_pkg_slots_set_mvm
2642+
2643+
if [[ -n ${some_job_failed} ]]; then
2644+
fail "some job failed"
2645+
fi
24152646
}
24162647

24172648
# Gets the first item from the passed set.
@@ -3035,17 +3266,19 @@ function handle_gentoo_sync() {
30353266
mvm_declare hgs_pkg_to_tags_mvm
30363267
process_listings hgs_pkg_to_tags_mvm
30373268

3269+
# shellcheck source=for-shellcheck/globals
3270+
source "${WORKDIR}/globals"
3271+
30383272
local -A hgs_renames_old_to_new_map=()
30393273
process_profile_updates_directory hgs_renames_old_to_new_map
30403274

3275+
mkdir -p "${REPORTS_DIR}/updates"
3276+
30413277
handle_package_changes hgs_renames_old_to_new_map hgs_pkg_to_tags_mvm
30423278

30433279
mvm_unset hgs_pkg_to_tags_mvm
30443280
#mvm_debug_disable hgs_pkg_to_tags_mvm
30453281

3046-
# shellcheck source=for-shellcheck/globals
3047-
source "${WORKDIR}/globals"
3048-
30493282
local old_head new_head
30503283
old_head=$(git -C "${OLD_STATE}" rev-parse HEAD)
30513284
new_head=$(git -C "${NEW_STATE}" rev-parse HEAD)

0 commit comments

Comments
 (0)