Skip to content

How to work with long-running commands? #348

@asomers

Description

@asomers

Background

I'm trying to execute a long-running command on a remote server. It needs to be fed data through stdin, and it may occasionally print stuff through stdout and stderr. It will not terminate until it gets EOF on its input. I have a working program that uses Paramiko. It multiplexes over the SSH channel and another file descriptor with select, and reads from both sources in non-blocking mode. However, my attempt at using parallel-ssh exhibits a few bugs.

Describe the bug

  1. The output is duplicated. The program is supposed to print the numbers 0...1000 . But it usually prints something like 0..21 and then 0..21 again. Is there a better way to read partial output than for line in channel.stdout?
  2. Reading partial output hangs. Typically the program freezes after printing a few dozen numbers. It freezes while trying to read from channel.stdout. How can I do that in a nonblocking way?
  3. The program never terminates. I can workaround problem 2 by changing the range of numbers sent from 0...1000 to 0...10. If I do that then it prints them correctly and also prints the debug message "Sending EoF", but then it appears to hang. What happens is that select always returns that the channel is readable, but exit_code is always None so the program busy loops. Why doesn't exit_code get set?

To Reproduce

Execute this program like this: python3.9 cat-parallelssh.py my-host. To see problem 3, change the range arguments to 0, 10.

from concurrent.futures import ThreadPoolExecutor
import os
import select
import sys

from pssh.clients import SSHClient
from pssh.exceptions import Timeout
from ssh2 import error_codes

class Remote:
    def __init__(self, the_host):
        self.host = the_host
        self.client = SSHClient(self.host)

    def cat(self):
        """ Start a cat process and return the paramiko channel """
        cmd = "cat"
        channel = self.client.run_command(cmd, use_pty=True)
        return channel

class RemoteMigrator:
    def __init__(self, host):
        self.host = host
        self.remote = Remote(host)

    def migrate(self):
        channel = self.remote.cat()
        (pin, pout) = os.pipe()
        with ThreadPoolExecutor() as executor:
            fut = executor.submit(self._th_send, pin)
            status = self._feed_pipes(pout, channel)
            fut.result()
        self.sendpipe = None
        if status != 0:
            raise Exception(f"Remote: status={status}")

    def _th_send(threadname, sendpipe):
        # This must be in its own thread, because the original version invokes
        # a blocking C function to write to sendpipe.
        try:
            for i in range(0, 1000):
                os.write(sendpipe, (f"{i}\n".encode()))
        finally:
            os.close(sendpipe)

    def _feed_pipes(self, sendpipe, channel):
        BS = 128
        readfds = [channel.channel.session.sock.fileno(), sendpipe]
        try:
            while True:
                rl, wl, xl = select.select(readfds, [], [], 1)
                for readable in rl:
                    if readable == sendpipe:
                        data = os.read(sendpipe, BS)
                        try:
                            channel.stdin.write(data)
                            channel.stdin.flush()
                        except OSError as e:
                            sys.stderr.write("Error: remote terminated early: %s\n" % e)
                            break
                        if len(data) == 0:
                            # EOF indicates the send is done
                            print("Sending EoF")
                            channel.channel.send_eof()
                            readfds = [channel.channel.session.sock.fileno()]
                            break
                    else:
                        try:
                            for line in channel.stdout: # XXX Sometimes hangs
                                print(line)
                        except Timeout:
                            print("stdout timeout")
                        try:
                            for line in channel.stderr:
                                print(line, file=sys.stderr)
                        except Timeout:
                            print("stderr timeout")
                if len(rl) == 0:
                    print("select timeout")
                if channel.exit_code:
                    break
        finally:
            os.close(sendpipe)
        status = channel.exit_code
        channel.close()
        return status

migrator = RemoteMigrator(sys.argv[1])
migrator.migrate()

Expected behavior
It should print the numbers 0 through 999, inclusive, each on its own line and then terminate. For comparison, this paramiko program does just that:

from concurrent.futures import ThreadPoolExecutor
import os
import select
import sys

import paramiko

class Remote:
    def __init__(self, the_host):
        self.host = the_host
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.client.load_system_host_keys()
        self.client.connect(self.host)

    def cat(self):
        """ Start a cat process and return the paramiko channel """
        cmd = "cat"
        channel = self.client.get_transport().open_session()
        channel.exec_command(cmd)
        return channel

class RemoteMigrator:
    def __init__(self, host):
        self.host = host
        self.remote = Remote(host)

    def migrate(self):
        channel = self.remote.cat()
        (pin, pout) = os.pipe()
        with ThreadPoolExecutor() as executor:
            fut = executor.submit(self._th_send, pin)
            status = self._feed_pipes(pout, channel)
            fut.result()
        self.sendpipe = None
        if status != 0:
            raise Exception(f"Remote: status={status}")

    def _th_send(threadname, sendpipe):
        # This must be in its own thread, because the original version invokes
        # a blocking C function to write to sendpipe.
        try:
            for i in range(0, 1000):
                os.write(sendpipe, (f"{i}\n".encode()))
        finally:
            os.close(sendpipe)

    def _feed_pipes(self, sendpipe, channel):
        BS = 128
        readfds = [channel, sendpipe]
        try:
            while True:
                rl, wl, xl = select.select(readfds, [], [])
                for readable in rl:
                    if readable == sendpipe:
                        data = os.read(sendpipe, BS)
                        if len(data) == 0:
                            # EOF indicates the send is done
                            channel.shutdown_write()
                            readfds = [channel]
                            break
                        try:
                            channel.sendall(data)
                        except OSError as e:
                            sys.stderr.write("Error: remote terminated early: %s\n" % e)
                            break
                    if readable == channel:
                        if channel.recv_ready():
                            sys.stdout.write(channel.recv(BS).decode())
                        if channel.recv_stderr_ready():
                            sys.stderr.write(channel.recv_stderr(BS).decode())
                if channel.exit_status_ready():
                    break
            sys.stdout.write(channel.recv(BS).decode())
            sys.stderr.write(channel.recv_stderr(BS).decode())
        finally:
            os.close(sendpipe)
        status = channel.recv_exit_status()
        channel.close()
        return status

migrator = RemoteMigrator(sys.argv[1])
migrator.migrate()

Actual behaviour
It typically prints the numbers 0 through 21 twice and then hangs. The exact numbers printed varies from run to run.

Additional information
ssh2_python-0.27.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions