diff --git a/lib/smart_proxy_remote_execution_ssh.rb b/lib/smart_proxy_remote_execution_ssh.rb index c6649c5..49aa195 100644 --- a/lib/smart_proxy_remote_execution_ssh.rb +++ b/lib/smart_proxy_remote_execution_ssh.rb @@ -27,20 +27,6 @@ def validate_mode! unless Plugin::MODES.include? Plugin.settings.mode raise "Mode has to be one of #{Plugin::MODES.join(', ')}, given #{Plugin.settings.mode}" end - - if Plugin.settings.async_ssh - Plugin.logger.warn('Option async_ssh is deprecated, use ssh-async mode instead.') - - case Plugin.settings.mode - when :ssh - Plugin.logger.warn('Deprecated option async_ssh used together with ssh mode, switching mode to ssh-async.') - Plugin.settings.mode = :'ssh-async' - when :'ssh-async' - # This is a noop - else - Plugin.logger.warn('Deprecated option async_ssh used together with incompatible mode, ignoring.') - end - end end def validate_mqtt_settings! @@ -97,11 +83,11 @@ def validate_ssh_log_level! end def requires_configured_ssh? - %i[ssh ssh-async].include?(Plugin.settings.mode) || Plugin.settings.cockpit_integration + Plugin.settings.mode == :ssh || Plugin.settings.cockpit_integration end def validate_socket_path! - return unless Plugin.settings.mode == :'ssh' || Plugin.settings.mode == :'ssh-async' + return unless Plugin.settings.mode == :'ssh' socket_path = File.expand_path(Plugin.settings.socket_working_dir) raise "Socket path #{socket_path} is too long" if socket_path.length > Plugin::SOCKET_PATH_MAX_LENGTH diff --git a/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb b/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb index 16415e0..aa5e427 100644 --- a/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb +++ b/lib/smart_proxy_remote_execution_ssh/actions/run_script.rb @@ -7,7 +7,7 @@ class RunScript < ::Dynflow::Action def plan(*args) mode = Proxy::RemoteExecution::Ssh::Plugin.settings.mode case mode - when :ssh, :'ssh-async' + when :ssh plan_action(ScriptRunner, *args) when :pull, :'pull-mqtt' plan_action(PullScript, *args) diff --git a/lib/smart_proxy_remote_execution_ssh/async_scripts/control.sh b/lib/smart_proxy_remote_execution_ssh/async_scripts/control.sh deleted file mode 100644 index ff673ea..0000000 --- a/lib/smart_proxy_remote_execution_ssh/async_scripts/control.sh +++ /dev/null @@ -1,110 +0,0 @@ -#!/bin/sh -# -# Control script for the remote execution jobs. -# -# The initial script calls `$CONTROL_SCRIPT init-script-finish` once the original script exits. -# In automatic mode, the exit code is sent back to the proxy on `init-script-finish`. -# -# What the script provides is also a manual mode, where the author of the rex script can take -# full control of the job lifecycle. This allows keeping the marked as running even when -# the initial script finishes. -# -# The manual mode is turned on by calling `$CONTROL_SCRIPT manual-control`. After calling this, -# one can call `echo message | $CONTROL_SCRIPT update` to send output to the remote execution jobs -# and `$CONTROL_SCRIPT finish 0` once finished (with 0 as exit code) to send output to the remote execution jobs -# and `$CONTROL_SCRIPT finish 0` once finished (with 0 as exit code) -BASE_DIR="$(dirname "$(readlink -f "$0")")" - -if ! command -v curl >/dev/null; then - echo 'curl is required' >&2 - exit 1 -fi - -# send the callback data to proxy -update() { - "$BASE_DIR/retrieve.sh" push_update -} - -# wait for named pipe $1 to retrieve data. If $2 is provided, it serves as timeout -# in seconds on how long to wait when reading. -wait_for_pipe() { - pipe_path=$1 - if [ -n "$2" ]; then - timeout="-t $2" - fi - if read $timeout <>"$pipe_path"; then - rm "$pipe_path" - return 0 - else - return 1 - fi -} - -# function run in background, when receiving update data via STDIN. -periodic_update() { - interval=1 - # reading some data from periodic_update_control signals we're done - while ! wait_for_pipe "$BASE_DIR/periodic_update_control" "$interval"; do - update - done - # one more update before we finish - update - # signal the main process that we are finished - echo > "$BASE_DIR/periodic_update_finished" -} - -# signal the periodic_update process that the main process is finishing -periodic_update_finish() { - if [ -e "$BASE_DIR/periodic_update_control" ]; then - echo > "$BASE_DIR/periodic_update_control" - fi -} - -ACTION=${1:-finish} - -case "$ACTION" in - init-script-finish) - if ! [ -e "$BASE_DIR/manual_mode" ]; then - # make the exit code of initialization script the exit code of the whole job - cp init_exit_code exit_code - update - fi - ;; - finish) - # take exit code passed via the command line, with fallback - # to the exit code of the initialization script - exit_code=${2:-$(cat "$BASE_DIR/init_exit_code")} - echo $exit_code > "$BASE_DIR/exit_code" - update - if [ -e "$BASE_DIR/manual_mode" ]; then - rm "$BASE_DIR/manual_mode" - fi - ;; - update) - # read data from input when redirected though a pipe - if ! [ -t 0 ]; then - # couple of named pipes to coordinate the main process with the periodic_update - mkfifo "$BASE_DIR/periodic_update_control" - mkfifo "$BASE_DIR/periodic_update_finished" - trap "periodic_update_finish" EXIT - # run periodic update as separate process to keep sending updates in output to server - periodic_update & - # redirect the input into output - tee -a "$BASE_DIR/output" - periodic_update_finish - # ensure the periodic update finished before we return - wait_for_pipe "$BASE_DIR/periodic_update_finished" - else - update - fi - ;; - # mark the script to be in manual mode: this means the script author needs to use `update` and `finish` - # commands to send output to the remote execution job or mark it as finished. - manual-mode) - touch "$BASE_DIR/manual_mode" - ;; - *) - echo "Unknown action $ACTION" - exit 1 - ;; -esac diff --git a/lib/smart_proxy_remote_execution_ssh/async_scripts/retrieve.sh b/lib/smart_proxy_remote_execution_ssh/async_scripts/retrieve.sh deleted file mode 100644 index 4ef56dc..0000000 --- a/lib/smart_proxy_remote_execution_ssh/async_scripts/retrieve.sh +++ /dev/null @@ -1,151 +0,0 @@ -#!/bin/sh - -if ! pgrep --help 2>/dev/null >/dev/null; then - echo DONE 1 - echo "pgrep is required" >&2 - exit 1 -fi - -BASE_DIR="$(dirname "$(readlink -f "$0")")" - -# load the data required for generating the callback -. "$BASE_DIR/env.sh" -URL_PREFIX="$CALLBACK_HOST/dynflow/tasks/$TASK_ID" -AUTH="$TASK_ID:$OTP" -CURL="curl --silent --show-error --fail --max-time 10" - -MY_LOCK_FILE="$BASE_DIR/retrieve_lock.$$" -MY_PID=$$ -echo $MY_PID >"$MY_LOCK_FILE" -LOCK_FILE="$BASE_DIR/retrieve_lock" -TMP_OUTPUT_FILE="$BASE_DIR/tmp_output" - -RUN_TIMEOUT=30 # for how long can the script hold the lock -WAIT_TIMEOUT=60 # for how long the script is trying to acquire the lock -START_TIME=$(date +%s) - -fail() { - echo RUNNING - echo "$1" - exit 1 -} - -acquire_lock() { - # try to acquire lock by creating the file (ln should be atomic an fail in case - # another process succeeded first). We also check the content of the lock file, - # in case our process won when competing over the lock while invalidating - # the lock on timeout. - ln "$MY_LOCK_FILE" "$LOCK_FILE" 2>/dev/null || [ "$(head -n1 "$LOCK_FILE")" = "$MY_PID" ] - return $? -} - -# acquiring the lock before proceeding, to ensure only one instance of the script is running -while ! acquire_lock; do - # we failed to create retrieve_lock - assuming there is already another retrieve script running - current_pid=$(head -n1 "$LOCK_FILE") - if [ -z "$current_pid" ]; then - continue - fi - # check whether the lock is not too old (compared to $RUN_TIMEOUT) and try to kill - # if it is, so that we don't have a stalled processes here - lock_lines_count=$(wc -l < "$LOCK_FILE") - current_lock_time=$(stat --format "%Y" "$LOCK_FILE") - current_time=$(date +%s) - - if [ "$(( current_time - START_TIME ))" -gt "$WAIT_TIMEOUT" ]; then - # We were waiting for the lock for too long - just give up - fail "Wait time exceeded $WAIT_TIMEOUT" - elif [ "$(( current_time - current_lock_time ))" -gt "$RUN_TIMEOUT" ]; then - # The previous lock it hold for too long - re-acquiring procedure - if [ "$lock_lines_count" -gt 1 ]; then - # there were multiple processes waiting for lock without resolution - # longer than the $RUN_TIMEOUT - we reset the lock file and let processes - # to compete - echo "RETRY" > "$LOCK_FILE" - fi - if [ "$current_pid" != "RETRY" ]; then - # try to kill the currently stalled process - kill -9 "$current_pid" 2>/dev/null - fi - # try to add our process as one candidate - echo $MY_PID >> "$LOCK_FILE" - if [ "$( head -n2 "$LOCK_FILE" | tail -n1 )" = "$MY_PID" ]; then - # our process won the competition for the new lock: it is the first pid - # after the original one in the lock file - take ownership of the lock - # next iteration only this process will get through - echo $MY_PID >"$LOCK_FILE" - fi - else - # still waiting for the original owner to finish - sleep 1 - fi -done - -release_lock() { - rm "$MY_LOCK_FILE" - rm "$LOCK_FILE" -} -# ensure the release the lock at exit -trap "release_lock" EXIT - -# make sure we clear previous tmp output file -if [ -e "$TMP_OUTPUT_FILE" ]; then - rm "$TMP_OUTPUT_FILE" -fi - -pid=$(cat "$BASE_DIR/pid") -[ -f "$BASE_DIR/position" ] || echo 1 > "$BASE_DIR/position" -position=$(cat "$BASE_DIR/position") - -prepare_output() { - if [ -e "$BASE_DIR/manual_mode" ] || ([ -n "$pid" ] && pgrep -P "$pid" >/dev/null 2>&1); then - echo RUNNING - else - echo "DONE $(cat "$BASE_DIR/exit_code" 2>/dev/null)" - fi - [ -f "$BASE_DIR/output" ] || exit 0 - tail --bytes "+${position}" "$BASE_DIR/output" > "$TMP_OUTPUT_FILE" - cat "$TMP_OUTPUT_FILE" -} - -# prepare the callback payload -payload() { - if [ -n "$1" ]; then - exit_code="$1" - else - exit_code=null - fi - - if [ -e "$BASE_DIR/manual_mode" ]; then - manual_mode=true - output=$(prepare_output | base64 -w0) - else - manual_mode=false - fi - - echo "{ \"exit_code\": $exit_code,"\ - " \"step_id\": \"$STEP_ID\","\ - " \"manual_mode\": $manual_mode,"\ - " \"output\": \"$output\" }" -} - -if [ "$1" = "push_update" ]; then - if [ -e "$BASE_DIR/exit_code" ]; then - exit_code="$(cat "$BASE_DIR/exit_code")" - action="done" - else - exit_code="" - action="update" - fi - $CURL -X POST -d "$(payload $exit_code)" -u "$AUTH" "$URL_PREFIX"/$action 2>>"$BASE_DIR/curl_stderr" - success=$? -else - prepare_output - success=$? -fi - -if [ "$success" = 0 ] && [ -e "$TMP_OUTPUT_FILE" ]; then - # in case the retrieval was successful, move the position of the cursor to be read next time - bytes=$(wc --bytes < "$TMP_OUTPUT_FILE") - expr "${position}" + "${bytes}" > "$BASE_DIR/position" -fi diff --git a/lib/smart_proxy_remote_execution_ssh/plugin.rb b/lib/smart_proxy_remote_execution_ssh/plugin.rb index 21ab35c..00b6c49 100644 --- a/lib/smart_proxy_remote_execution_ssh/plugin.rb +++ b/lib/smart_proxy_remote_execution_ssh/plugin.rb @@ -1,7 +1,7 @@ module Proxy::RemoteExecution::Ssh class Plugin < Proxy::Plugin SSH_LOG_LEVELS = %w[debug info error fatal].freeze - MODES = %i[ssh ssh-async pull pull-mqtt].freeze + MODES = %i[ssh pull pull-mqtt].freeze # Unix domain socket path length is limited to 104 (on some platforms) characters # Socket path is composed of custom path (max 49 characters) + job id (37 characters) # + offset(17 characters) + null terminator @@ -61,8 +61,6 @@ def self.simulate? def self.runner_class @runner_class ||= if simulate? Runners::FakeScriptRunner - elsif settings.mode == :'ssh-async' - Runners::PollingScriptRunner else Runners::ScriptRunner end diff --git a/lib/smart_proxy_remote_execution_ssh/runners.rb b/lib/smart_proxy_remote_execution_ssh/runners.rb index 5837162..3d57a7d 100644 --- a/lib/smart_proxy_remote_execution_ssh/runners.rb +++ b/lib/smart_proxy_remote_execution_ssh/runners.rb @@ -1,7 +1,6 @@ module Proxy::RemoteExecution::Ssh module Runners require 'smart_proxy_remote_execution_ssh/runners/script_runner' - require 'smart_proxy_remote_execution_ssh/runners/polling_script_runner' require 'smart_proxy_remote_execution_ssh/runners/fake_script_runner' end end diff --git a/lib/smart_proxy_remote_execution_ssh/runners/polling_script_runner.rb b/lib/smart_proxy_remote_execution_ssh/runners/polling_script_runner.rb deleted file mode 100644 index 88c8134..0000000 --- a/lib/smart_proxy_remote_execution_ssh/runners/polling_script_runner.rb +++ /dev/null @@ -1,147 +0,0 @@ -require 'base64' - -module Proxy::RemoteExecution::Ssh::Runners - class PollingScriptRunner < ScriptRunner - DEFAULT_REFRESH_INTERVAL = 60 - - def self.load_script(name) - script_dir = File.expand_path('../async_scripts', __dir__) - File.read(File.join(script_dir, name)) - end - - # The script that controls the flow of the job, able to initiate update or - # finish on the task, or take over the control over script lifecycle - CONTROL_SCRIPT = load_script('control.sh') - - # The script always outputs at least one line - # First line of the output either has to begin with - # "RUNNING" or "DONE $EXITCODE" - # The following lines are treated as regular output - RETRIEVE_SCRIPT = load_script('retrieve.sh') - - def initialize(options, user_method, suspended_action: nil) - super(options, user_method, suspended_action: suspended_action) - @callback_host = options[:callback_host] - @task_id = options[:uuid] - @step_id = options[:step_id] - @otp = Proxy::Dynflow::OtpManager.generate_otp(@task_id) - end - - def prepare_start - super - @base_dir = File.dirname @remote_script - upload_control_scripts - end - - def initialization_script - close_stdin = '/dev/null 2>/dev/null' - main_script = "(#{@remote_script_wrapper} #{@remote_script} #{close_stdin} 2>&1; echo $?>#{@base_dir}/init_exit_code) >#{@base_dir}/output" - control_script_finish = "#{@control_script_path} init-script-finish" - <<-SCRIPT.gsub(/^ +\| /, '') - | export CONTROL_SCRIPT="#{@control_script_path}" - | #{"chown #{@user_method.effective_user} #{@base_dir}" if @user_method.cli_command_prefix} - | #{@user_method.cli_command_prefix} sh -c '#{main_script}; #{control_script_finish}' #{close_fds} & - SCRIPT - end - - def trigger(*args) - run_sync(*args) - end - - def refresh - @connection.establish! unless @connection.connected? - begin - pm = run_sync("#{@user_method.cli_command_prefix} #{@retrieval_script}") - process_retrieved_data(pm.stdout.to_s.chomp, pm.stderr.to_s.chomp) - rescue StandardError => e - @logger.info("Error while connecting to the remote host on refresh: #{e.message}") - end - ensure - destroy_session - end - - def kill - run_sync("pkill -P $(cat #{@pid_path})") - rescue StandardError => e - publish_exception('Unexpected error', e, false) - end - - def process_retrieved_data(output, err) - return if output.nil? || output.empty? - - lines = output.lines - result = lines.shift.match(/^DONE (\d+)?/) - publish_data(lines.join, 'stdout') unless lines.empty? - publish_data(err, 'stderr') unless err.empty? - if result - exitcode = result[1] || 0 - publish_exit_status(exitcode.to_i) - cleanup - end - end - - def external_event(event) - data = event.data - if data['manual_mode'] - load_event_updates(data) - else - # getting the update from automatic mode - reaching to the host to get the latest update - return run_refresh - end - ensure - destroy_session - end - - def close - super - Proxy::Dynflow::OtpManager.drop_otp(@task_id, @otp) if @otp - end - - def upload_control_scripts - return if @control_scripts_uploaded - - cp_script_to_remote(env_script, 'env.sh') - @control_script_path = cp_script_to_remote(CONTROL_SCRIPT, 'control.sh') - @retrieval_script = cp_script_to_remote(RETRIEVE_SCRIPT, 'retrieve.sh') - @control_scripts_uploaded = true - end - - # Script setting the dynamic values to env variables: it's sourced from other control scripts - def env_script - <<-SCRIPT.gsub(/^ +\| /, '') - | CALLBACK_HOST="#{@callback_host}" - | TASK_ID="#{@task_id}" - | STEP_ID="#{@step_id}" - | OTP="#{@otp}" - SCRIPT - end - - private - - # Generates updates based on the callback data from the manual mode - def load_event_updates(event_data) - continuous_output = Proxy::Dynflow::ContinuousOutput.new - if event_data.key?('output') - lines = Base64.decode64(event_data['output']).sub(/\A(RUNNING|DONE).*\n/, '') - continuous_output.add_output(lines, 'stdout') - end - cleanup if event_data['exit_code'] - new_update(continuous_output, event_data['exit_code']) - end - - def cleanup - if @cleanup_working_dirs - ensure_remote_command("rm -rf #{remote_command_dir}", - error: "Unable to remove working directory #{remote_command_dir} on remote system, exit code: %{exit_code}") - end - end - - def destroy_session - if @connection.connected? - @logger.debug("Closing session with #{@ssh_user}@#{@host}") - close_session - end - end - end -end diff --git a/settings.d/remote_execution_ssh.yml.example b/settings.d/remote_execution_ssh.yml.example index a30bdd3..17c835b 100644 --- a/settings.d/remote_execution_ssh.yml.example +++ b/settings.d/remote_execution_ssh.yml.example @@ -8,13 +8,12 @@ # :cockpit_integration: true -# Mode of operation, one of ssh, ssh-async, pull, pull-mqtt +# Mode of operation, one of ssh, pull, pull-mqtt :mode: ssh # Defines how often (in seconds) should the runner check # for new data leave empty to use the runner's default -# (1 second for regular, 60 seconds with async_ssh enabled) -# :runner_refresh_interval: +# :runner_refresh_interval: 1 # Defines the verbosity of logging coming from ssh command # one of :debug, :info, :error, :fatal