Skip to content
4 changes: 4 additions & 0 deletions lib/posix/spawn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ class MaximumOutputExceeded < StandardError
class TimeoutExceeded < StandardError
end

# Exception raised when output streaming is aborted early.
class Aborted < StandardError
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could maybe be CallerAborted or something more specific?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with either.

end

private

# Turns the various varargs incantations supported by Process::spawn into a
Expand Down
40 changes: 35 additions & 5 deletions lib/posix/spawn/child.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,31 @@ def initialize(*args)
@options[:pgroup] = true
end
@options.delete(:chdir) if @options[:chdir].nil?

@out, @err = "", ""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use String.new to avoid breaking when someone passes --enable-frozen-string-literal.


@stdout_stream = Proc.new do |chunk|
@out << chunk

true
end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stdout_stream = @out.method(:<<)

:trollface: 🚲 🏠

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I love it! Being able to use method was one of the main reasons for going back to the proc-based API ;)


@stderr_stream = Proc.new do |chunk|
@err << chunk

true
end

if streams = @options.delete(:streams)
if streams[:stdout]
@stdout_stream = streams[:stdout]
end

if streams[:stderr]
@stderr_stream = streams[:stderr]
end
end

exec! if !@options.delete(:noexec)
end

Expand Down Expand Up @@ -223,6 +248,10 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil)
slice_method = input.respond_to?(:byteslice) ? :byteslice : :slice
t = timeout

streams = {stdout => @stdout_stream, stderr => @stderr_stream}

bytes_seen = 0
chunk_buffer = ""
Copy link
Collaborator Author

@brianmario brianmario Jan 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This buffer is reused by readpartial below. Internally, so far as I can tell, it will be resized to BUFSIZE on the first call to readpartial and then that underlying buffer will be reused from then on out.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the issue with appending to strings that have been mentioned, we might want to consider having #readpartial allocate a new string and give ownership of it to the stream, but since we're now re-using this buffer, it's probably already more efficient than what we had before, so we can probably leave it until we actually find a perf issue we can trace back to this specifically.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "" that might be better as String.new here.

while readers.any? || writers.any?
ready = IO.select(readers, writers, readers + writers, t)
raise TimeoutExceeded if ready.nil?
Expand All @@ -244,9 +273,12 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil)

# read from stdout and stderr streams
ready[0].each do |fd|
buf = (fd == stdout) ? @out : @err
begin
buf << fd.readpartial(BUFSIZE)
fd.readpartial(BUFSIZE, chunk_buffer)

raise Aborted unless streams[fd].call(chunk_buffer)

bytes_seen += chunk_buffer.bytesize
rescue Errno::EAGAIN, Errno::EINTR
rescue EOFError
readers.delete(fd)
Expand All @@ -262,12 +294,10 @@ def read_and_write(input, stdin, stdout, stderr, timeout=nil, max=nil)
end

# maybe we've hit our max output
if max && ready[0].any? && (@out.size + @err.size) > max
if max && ready[0].any? && bytes_seen > max
raise MaximumOutputExceeded
end
end

[@out, @err]
end

# Wait for the child process to exit
Expand Down
57 changes: 56 additions & 1 deletion test/test_child.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def test_max_with_stubborn_child_pgroup_kill

def test_max_with_partial_output
p = Child.build('yes', :max => 100_000)
assert_nil p.out
assert_empty p.out
assert_raises MaximumOutputExceeded do
p.exec!
end
Expand Down Expand Up @@ -224,6 +224,61 @@ def test_utf8_input_long
assert p.success?
end

def test_streaming_stdout
stdout_buff = ""
stdout_stream = Proc.new do |chunk|
stdout_buff << chunk
end

input = "hello!"
p = Child.new('cat', :input => input, :streams => {
:stdout => stdout_stream
})

assert p.success?
assert_equal input, stdout_buff
end

def test_streaming_stderr
stderr_buff = ""
stderr_stream = Proc.new do |chunk|
stderr_buff << chunk
end

p = Child.new('ls', '-?', :streams => {
:stderr => stderr_stream
})

refute p.success?
assert stderr_buff.size > 0
end

def test_streaming_stdout_aborted
stdout_stream = Proc.new do |chunk|
false
end

input = "hello!"
assert_raises POSIX::Spawn::Aborted do
p = Child.new('cat', :input => input, :streams => {
:stdout => stdout_stream
})
end
end

def test_streaming_stderr_aborted
stderr_stream = Proc.new do |chunk|
false
end

input = "hello!"
assert_raises POSIX::Spawn::Aborted do
p = Child.new('ls', '-?', :streams => {
:stderr => stderr_stream
})
end
end

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no tests that involve reading more than one BUFSIZE worth of output, or reading from both stdout and stderr. Those might be worthwhile additions.

Copy link

@mclark mclark Jan 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a test for passing in a minimal custom object, just to ensure the interface contract is maintained. May be a good time to use a spy.
No longer applicable as we are using Procs now.

##
# Assertion Helpers

Expand Down