Skip to content

Commit e98ac5c

Browse files
committed
Clean up the parallel class
1 parent b3403d1 commit e98ac5c

File tree

1 file changed

+54
-36
lines changed

1 file changed

+54
-36
lines changed

lib/octocatalog-diff/util/parallel.rb

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
# frozen_string_literal: true
22

3-
# Parallelize process executation.
3+
# A class to parallelize process executation.
4+
# This is a utility class to execute tasks in parallel, with our own forking implementation
5+
# that passes through logs and reliably handles errors. If parallel processing has been disabled,
6+
# this instead executes the tasks serially, but provides the same API as the parallel tasks.
47

58
require 'stringio'
69

710
module OctocatalogDiff
811
module Util
9-
# This is a utility class to execute tasks in parallel, using the 'parallel' gem.
10-
# If parallel processing has been disabled, this instead executes the tasks serially,
11-
# but provides the same API as the parallel tasks.
1212
class Parallel
13-
# This class is called for a task that didn't complete.
13+
# This exception is called for a task that didn't complete.
1414
class IncompleteTask < RuntimeError; end
1515

16+
# --------------------------------------
1617
# This class represents a parallel task. It requires a method reference, which will be executed with
1718
# any supplied arguments. It can optionally take a text description and a validator function.
19+
# --------------------------------------
1820
class Task
1921
attr_reader :description
2022
attr_accessor :args
@@ -37,10 +39,12 @@ def validate(result, logger = Logger.new(StringIO.new))
3739
end
3840
end
3941

42+
# --------------------------------------
4043
# This class represents the result from a parallel task. The status is set to true (success), false (error),
4144
# or nil (task was killed before it could complete). The exception (for failure) and output object (for success)
4245
# are readable attributes. The validity of the results, determined by executing the 'validate' method of the Task,
4346
# is available to be set and fetched.
47+
# --------------------------------------
4448
class Result
4549
attr_reader :output, :args
4650
attr_accessor :status, :exception
@@ -53,65 +57,66 @@ def initialize(opts = {})
5357
end
5458
end
5559

60+
# --------------------------------------
61+
# Static methods in the class
62+
# --------------------------------------
63+
5664
# Entry point for parallel processing. By default this will perform parallel processing,
5765
# but it will also accept an option to do serial processing instead.
5866
# @param task_array [Array<Parallel::Task>] Tasks to run
5967
# @param logger [Logger] Optional logger object
6068
# @param parallelized [Boolean] True for parallel processing, false for serial processing
69+
# @param raise_exception [Boolean] True to raise exception immediately if one occurs; false to return exception in results
6170
# @return [Array<Parallel::Result>] Parallel results (same order as tasks)
62-
#
63-
# Note: Parallelization throws intermittent errors under travis CI, so it will be disabled by
64-
# default for integration tests.
6571
def self.run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false)
6672
# Create a throwaway logger object if one is not given
6773
logger ||= Logger.new(StringIO.new)
6874

69-
# Validate input - we need an array. If the array is empty then return an empty array right away.
75+
# Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
76+
# return an empty array right away.
7077
raise ArgumentError, "run_tasks() argument must be array, not #{task_array.class}" unless task_array.is_a?(Array)
7178
return [] if task_array.empty?
7279

73-
# Make sure each element in the array is a OctocatalogDiff::Util::Parallel::Task
74-
task_array.each do |x|
75-
next if x.is_a?(OctocatalogDiff::Util::Parallel::Task)
76-
raise ArgumentError, "Element #{x.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{x.class}"
80+
invalid_inputs = task_array.reject { |task| task.is_a?(OctocatalogDiff::Util::Parallel::Task) }
81+
if invalid_inputs.any?
82+
ele = invalid_inputs.first
83+
raise ArgumentError, "Element #{ele.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{ele.class}"
7784
end
7885

79-
result = task_array.map do |x|
80-
Result.new(exception: IncompleteTask.new('Killed'), args: x.args)
81-
end
86+
# Initialize the result array. For now all entries in the array indicate that the task was killed.
87+
# Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
88+
# the task was killed.
89+
result = task_array.map { |x| Result.new(exception: IncompleteTask.new('Killed'), args: x.args) }
8290
logger.debug "Initialized parallel task result array: size=#{result.size}"
8391

84-
exception = if parallelized
85-
run_tasks_parallel(result, task_array, logger)
86-
else
87-
run_tasks_serial(result, task_array, logger)
88-
end
89-
92+
# Execute as per the requested method (serial or parallel) and handle results.
93+
exception = parallelized ? run_tasks_parallel(result, task_array, logger) : run_tasks_serial(result, task_array, logger)
9094
raise exception if exception && raise_exception
9195
result
9296
end
9397

94-
# Use the parallel gem to run each task in the task array. Under the hood this is forking a process for
95-
# each task, and serializing/deserializing the arguments and the outputs.
98+
# Utility method! Not intended to be called from outside this class.
99+
# ---
100+
# Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child
101+
# process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result)
102+
# into a serialized data file. Once children are forked this method waits for their return, deserializing
103+
# the output from each data file and updating the `result` array with actual results.
96104
# @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
97105
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
98106
# @param logger [Logger] Logger
107+
# @return [Exception] First exception encountered by a child process; returns nil if no exceptions encountered.
99108
def self.run_tasks_parallel(result, task_array, logger)
100109
pidmap = {}
101110
ipc_tempdir = Dir.mktmpdir
102111

112+
# Child process forking
103113
task_array.each_with_index do |task, index|
104114
# simplecov doesn't see this because it's forked
105-
# Kernel.exit! avoids at_exit calls possibly set up by rspec tests
106115
# :nocov:
107116
this_pid = fork do
108-
begin
109-
task_result = execute_task(task, logger)
110-
File.open(File.join(ipc_tempdir, "#{Process.pid}.yaml"), 'w') { |f| f.write Marshal.dump(task_result) }
111-
Kernel.exit! 0
112-
rescue
113-
Kernel.exit! 255
114-
end
117+
task_result = execute_task(task, logger)
118+
File.open(File.join(ipc_tempdir, "#{Process.pid}.dat"), 'w') { |f| f.write Marshal.dump(task_result) }
119+
Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
115120
end
116121
# :nocov:
117122

@@ -120,16 +125,16 @@ def self.run_tasks_parallel(result, task_array, logger)
120125
logger.reopen if logger.respond_to?(:reopen)
121126
end
122127

128+
# Waiting for children and handling results
123129
while pidmap.any?
124-
# Wait for exits
125130
this_pid, exit_obj = Process.wait2(0)
126131
next unless this_pid && pidmap.key?(this_pid)
127132
index = pidmap[this_pid][:index]
128133
exitstatus = exit_obj.exitstatus
129134
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
130135
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
131136

132-
input = File.read(File.join(ipc_tempdir, "#{this_pid}.yaml"))
137+
input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
133138
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
134139
time_delta = Time.now - pidmap[this_pid][:start_time]
135140
pidmap.delete(this_pid)
@@ -139,7 +144,11 @@ def self.run_tasks_parallel(result, task_array, logger)
139144
next if result[index].status
140145
return result[index].exception
141146
end
142-
nil
147+
148+
logger.debug 'All child processes completed with no exceptions raised'
149+
150+
# Cleanup: Kill any child processes that are still running, and clean the temporary directory
151+
# where data files were stored.
143152
ensure
144153
pidmap.each do |pid, _pid_data|
145154
begin
@@ -161,6 +170,8 @@ def self.run_tasks_parallel(result, task_array, logger)
161170
end
162171
end
163172

173+
# Utility method! Not intended to be called from outside this class.
174+
# ---
164175
# Perform the tasks in serial.
165176
# @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
166177
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
@@ -177,7 +188,11 @@ def self.run_tasks_serial(result, task_array, logger)
177188
nil
178189
end
179190

180-
# Process a single task.
191+
# Utility method! Not intended to be called from outside this class.
192+
# ---
193+
# Process a single task. Called by run_tasks_parallel / run_tasks_serial.
194+
# This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object
195+
# itself, and not raise them.
181196
# @param task [OctocatalogDiff::Util::Parallel::Task] Task object
182197
# @param logger [Logger] Logger
183198
# @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result
@@ -188,13 +203,16 @@ def self.execute_task(task, logger)
188203
result = Result.new(output: output, status: true, args: task.args)
189204
rescue => exc
190205
logger.debug("Failed #{task.description}: #{exc.class} #{exc.message}")
206+
# Immediately return without running the validation, since this already failed.
191207
return Result.new(exception: exc, status: false, args: task.args)
192208
end
193209

194210
begin
195211
if task.validate(output, logger)
196212
logger.debug("Success #{task.description}")
197213
else
214+
# Preferably the validator method raised its own exception. However if it
215+
# simply returned false, raise our own exception here.
198216
raise "Failed #{task.description} validation (unspecified error)"
199217
end
200218
rescue => exc

0 commit comments

Comments
 (0)