1414
1515import paramiko .client
1616import re
17+ import select
1718
1819from cstar .output import err , debug , msg
1920from cstar .exceptions import BadSSHHost , BadEnvironmentVariable , NoHostsSpecified
@@ -105,6 +106,49 @@ def run_job(self, file, jobid, timeout=None, env={}):
105106 """ % (self .escape (dir ),)
106107
107108 stdin , stdout , stderr = self .client .exec_command (cmd , timeout = timeout )
109+ # get the shared channel for stdout/stderr/stdin
110+ channel = stdout .channel
111+
112+ stdin .close ()
113+ # indicate that we're not going to write to that channel anymore
114+ channel .shutdown_write ()
115+
116+ # read stdout/stderr in order to prevent read block hangs
117+ stdout_chunks = []
118+ stdout_chunks .append (stdout .channel .recv (len (stdout .channel .in_buffer )))
119+ # chunked read to prevent stalls
120+ while not channel .closed or channel .recv_ready () or channel .recv_stderr_ready ():
121+ # stop if channel was closed prematurely, and there is no data in the buffers.
122+ got_chunk = False
123+ readq , _ , _ = select .select ([stdout .channel ], [], [], timeout )
124+ for c in readq :
125+ if c .recv_ready ():
126+ stdout_chunks .append (stdout .channel .recv (len (c .in_buffer )))
127+ got_chunk = True
128+ if c .recv_stderr_ready ():
129+ # make sure to read stderr to prevent stall
130+ stderr .channel .recv_stderr (len (c .in_stderr_buffer ))
131+ got_chunk = True
132+ '''
133+ 1) make sure that there are at least 2 cycles with no data in the input buffers in order to not exit too early (i.e. cat on a >200k file).
134+ 2) if no data arrived in the last loop, check if we already received the exit code
135+ 3) check if input buffers are empty
136+ 4) exit the loop
137+ '''
138+ if not got_chunk \
139+ and stdout .channel .exit_status_ready () \
140+ and not stderr .channel .recv_stderr_ready () \
141+ and not stdout .channel .recv_ready ():
142+ # indicate that we're not going to read from this channel anymore
143+ stdout .channel .shutdown_read ()
144+ # close the channel
145+ stdout .channel .close ()
146+ break # exit as remote side is finished and our bufferes are empty
147+
148+ # close all the pseudofiles
149+ stdout .close ()
150+ stderr .close ()
151+
108152 stdout .channel .recv_exit_status ()
109153 real_output = self .read_file (dir + "/stdout" )
110154 real_error = self .read_file (dir + "/stderr" )
@@ -120,11 +164,55 @@ def run(self, argv):
120164 try :
121165 self ._connect ()
122166 cmd = " " .join (self .escape (s ) for s in argv )
123-
124167 stdin , stdout , stderr = self .client .exec_command (cmd )
168+ # get the shared channel for stdout/stderr/stdin
169+ channel = stdout .channel
170+
171+ # we do not need stdin.
172+ stdin .close ()
173+ # indicate that we're not going to write to that channel anymore
174+ channel .shutdown_write ()
175+
176+ # read stdout/stderr in order to prevent read block hangs
177+ stdout_chunks = []
178+ stderr_chunks = []
179+ stdout_chunks .append (stdout .channel .recv (len (stdout .channel .in_buffer )))
180+ # chunked read to prevent stalls
181+ while not channel .closed or channel .recv_ready () or channel .recv_stderr_ready ():
182+ # stop if channel was closed prematurely, and there is no data in the buffers.
183+ got_chunk = False
184+ readq , _ , _ = select .select ([stdout .channel ], [], [], 30 )
185+ for c in readq :
186+ if c .recv_ready ():
187+ stdout_chunks .append (stdout .channel .recv (len (c .in_buffer )))
188+ got_chunk = True
189+ if c .recv_stderr_ready ():
190+ # make sure to read stderr to prevent stall
191+ stderr_chunks .append (stderr .channel .recv_stderr (len (c .in_stderr_buffer )))
192+ got_chunk = True
193+ '''
194+ 1) make sure that there are at least 2 cycles with no data in the input buffers in order to not exit too early (i.e. cat on a >200k file).
195+ 2) if no data arrived in the last loop, check if we already received the exit code
196+ 3) check if input buffers are empty
197+ 4) exit the loop
198+ '''
199+ if not got_chunk \
200+ and stdout .channel .exit_status_ready () \
201+ and not stderr .channel .recv_stderr_ready () \
202+ and not stdout .channel .recv_ready ():
203+ # indicate that we're not going to read from this channel anymore
204+ stdout .channel .shutdown_read ()
205+ # close the channel
206+ stdout .channel .close ()
207+ break # exit as remote side is finished and our bufferes are empty
208+
209+ # close all the pseudofiles
210+ stdout .close ()
211+ stderr .close ()
212+
213+ out = b'' .join (stdout_chunks )
214+ error = b'' .join (stderr_chunks )
125215 status = stdout .channel .recv_exit_status ()
126- out = stdout .read ()
127- error = stderr .read ()
128216 if status != 0 :
129217 err ("Command %s failed with status %d on host %s" % (cmd , status , self .hostname ))
130218 else :
0 commit comments