Skip to content

Commit 317033b

Browse files
authored
Merge pull request #206 from isaiahfrantz/parallel_conflict_with_diffy
Fix multi-node list with parallel mode
2 parents e81de2d + 49c6c9b commit 317033b

File tree

2 files changed

+79
-16
lines changed

2 files changed

+79
-16
lines changed

lib/octocatalog-diff/util/parallel.rb

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -129,22 +129,26 @@ def self.run_tasks_parallel(result, task_array, logger)
129129

130130
# Waiting for children and handling results
131131
while pidmap.any?
132-
this_pid, exit_obj = Process.wait2(0)
133-
next unless this_pid && pidmap.key?(this_pid)
134-
index = pidmap[this_pid][:index]
135-
exitstatus = exit_obj.exitstatus
136-
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
137-
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
138-
139-
input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
140-
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
141-
time_delta = Time.now - pidmap[this_pid][:start_time]
142-
pidmap.delete(this_pid)
143-
144-
logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"
145-
146-
next if result[index].status
147-
return result[index].exception
132+
pidmap.each do |pid|
133+
status = Process.waitpid2(pid[0], Process::WNOHANG)
134+
next if status.nil?
135+
this_pid, exit_obj = status
136+
next unless this_pid && pidmap.key?(this_pid)
137+
index = pidmap[this_pid][:index]
138+
exitstatus = exit_obj.exitstatus
139+
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
140+
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
141+
142+
input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
143+
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
144+
time_delta = Time.now - pidmap[this_pid][:start_time]
145+
pidmap.delete(this_pid)
146+
147+
logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"
148+
149+
next if result[index].status
150+
return result[index].exception
151+
end
148152
end
149153

150154
logger.debug 'All child processes completed with no exceptions raised'

spec/octocatalog-diff/tests/util/parallel_spec.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,65 @@
1616
end
1717

1818
context 'with parallel processing' do
19+
it 'should only Process.wait() its own children' do
20+
class Foo
21+
def one(arg, _logger = nil)
22+
'one ' + arg
23+
end
24+
25+
def two(arg, _logger = nil)
26+
'two ' + arg
27+
end
28+
29+
def dont_wait_me_bro(sleep_for = 1)
30+
# do we need a rescue block here?
31+
pid = fork do
32+
sleep sleep_for
33+
Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
34+
end
35+
pid
36+
end
37+
38+
def wait_on_me(pid)
39+
status = nil
40+
# just in case status never equals anything
41+
count = 100
42+
while status.nil? || count > 0
43+
count -= 1
44+
status = Process.waitpid2(pid, Process::WNOHANG)
45+
end
46+
status
47+
end
48+
end
49+
50+
c = Foo.new
51+
# start my non-parallel process first
52+
just_a_guy = c.dont_wait_me_bro
53+
54+
one = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:one), args: 'abc', description: 'test1')
55+
two = OctocatalogDiff::Util::Parallel::Task.new(method: c.method(:two), args: 'def', description: 'test2')
56+
result = OctocatalogDiff::Util::Parallel.run_tasks([one, two], nil, true)
57+
expect(result).to be_a_kind_of(Array)
58+
expect(result.size).to eq(2)
59+
60+
one_result = result[0]
61+
expect(one_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
62+
expect(one_result.status).to eq(true)
63+
expect(one_result.exception).to eq(nil)
64+
expect(one_result.output).to match(/^one abc/)
65+
66+
two_result = result[1]
67+
expect(two_result).to be_a_kind_of(OctocatalogDiff::Util::Parallel::Result)
68+
expect(two_result.status).to eq(true)
69+
expect(two_result.exception).to eq(nil)
70+
expect(two_result.output).to match(/^two def/)
71+
72+
# just_a_guy should still be need to be waited
73+
result = c.wait_on_me(just_a_guy)
74+
expect(result).to be_a_kind_of(Array)
75+
# test result and check for error conditions
76+
end
77+
1978
it 'should parallelize and return task results' do
2079
class Foo
2180
def one(arg, _logger = nil)

0 commit comments

Comments
 (0)