Skip to content

Commit 7984888

Browse files
committed
Merge branch 'kpaulisse-simplify-parallel' into kpaulisse-mega-merge
2 parents c0bf900 + 3b87832 commit 7984888

21 files changed

+425
-326
lines changed

.rubocop.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ Lint/EndAlignment:
5252
Style/FileName:
5353
Enabled: false
5454

55+
# Sometimes it's cleaner without this
56+
Style/Documentation:
57+
Enabled: false
58+
5559
# To fix later
5660
Style/PercentLiteralDelimiters:
5761
Enabled: false

lib/octocatalog-diff/catalog-util/bootstrap.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,11 @@ def self.bootstrap_directory_parallelizer(options, logger)
7474
if result.status
7575
logger.debug("Success bootstrap_directory for #{result.args[:tag]}")
7676
else
77+
# Believed to be a bug condition, since error should have already been raised if this happens.
78+
# :nocov:
7779
errmsg = "Failed bootstrap_directory for #{result.args[:tag]}: #{result.exception.class} #{result.exception.message}"
7880
raise OctocatalogDiff::Errors::BootstrapError, errmsg
81+
# :nocov:
7982
end
8083
end
8184
end

lib/octocatalog-diff/catalog-util/fileresources.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,12 @@ def self._convert_file_resources(resources, compilation_dir, environment = 'prod
8686
resources.map! do |resource|
8787
if resource_convertible?(resource)
8888
path = file_path(resource['parameters']['source'], modulepaths)
89-
raise Errno::ENOENT, "Unable to resolve '#{resource['parameters']['source']}'!" if path.nil?
89+
if path.nil?
90+
# Pass this through as a wrapped exception, because it's more likely to be something wrong
91+
# in the catalog itself than it is to be a broken setup of octocatalog-diff.
92+
message = "Errno::ENOENT: Unable to resolve '#{resource['parameters']['source']}'!"
93+
raise OctocatalogDiff::Errors::CatalogError, message
94+
end
9095

9196
if File.file?(path)
9297
# If the file is found, read its content. If the content is all ASCII, substitute it into

lib/octocatalog-diff/util/catalogs.rb

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,15 @@ def build_catalog_parallelizer
9999
# :nocov:
100100
end
101101

102+
# If catalogs failed to compile, report that. Prefer to display an actual failure message rather
103+
# than a generic incomplete parallel task message if there is a more specific message present.
104+
failures = parallel_catalogs.reject(&:status)
105+
if failures.any?
106+
f = failures.reject { |r| r.exception.is_a?(OctocatalogDiff::Util::Parallel::IncompleteTask) }.first
107+
f ||= failures.first
108+
raise f.exception
109+
end
110+
102111
# Construct result hash. Will eventually be in the format
103112
# { :from => OctocatalogDiff::Catalog, :to => OctocatalogDiff::Catalog }
104113

@@ -203,10 +212,12 @@ def add_parallel_result(result, parallel_catalog_obj, key_task_tuple)
203212
end
204213
else
205214
# Something unhandled went wrong, and an exception was thrown. Reveal a generic message.
215+
# :nocov:
206216
msg = parallel_catalog_obj.exception.message
207217
message = "Catalog for '#{key}' (#{branch}) failed to compile with #{parallel_catalog_obj.exception.class}: #{msg}"
208218
message += "\n" + parallel_catalog_obj.exception.backtrace.map { |x| " #{x}" }.join("\n") if @options[:debug]
209219
raise OctocatalogDiff::Errors::CatalogError, message
220+
# :nocov:
210221
end
211222
end
212223

@@ -227,15 +238,18 @@ def build_catalog(opts, logger = @logger)
227238
catalog
228239
end
229240

230-
# Validate a catalog in the parallel execution
241+
# The catalog validator method can indicate failure one of two ways:
242+
# - Raise an exception (this is preferred, since it gives a specific error message)
243+
# - Return false (supported but discouraged, since it only surfaces a generic error)
231244
# @param catalog [OctocatalogDiff::Catalog] Catalog object
232245
# @param logger [Logger] Logger object (presently unused)
233246
# @param args [Hash] Additional arguments set specifically for validator
234-
# @return [Boolean] true if catalog is valid, false otherwise
247+
# @return [Boolean] Return true if catalog is valid, false otherwise
235248
def catalog_validator(catalog = nil, _logger = @logger, args = {})
236-
return false unless catalog.is_a?(OctocatalogDiff::Catalog)
237-
catalog.validate_references if args[:task] == :to
238-
catalog.valid?
249+
raise ArgumentError, "Expects a catalog, got #{catalog.class}" unless catalog.is_a?(OctocatalogDiff::Catalog)
250+
raise OctocatalogDiff::Errors::CatalogError, "Catalog failed: #{catalog.error_message}" unless catalog.valid?
251+
catalog.validate_references if args[:task] == :to # Raises exception for broken references
252+
true
239253
end
240254
end
241255
end

lib/octocatalog-diff/util/parallel.rb

Lines changed: 138 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,22 @@
11
# frozen_string_literal: true
22

3-
# Helper to use the 'parallel' gem to perform tasks
3+
# A class to parallelize process executation.
4+
# This is a utility class to execute tasks in parallel, with our own forking implementation
5+
# that passes through logs and reliably handles errors. If parallel processing has been disabled,
6+
# this instead executes the tasks serially, but provides the same API as the parallel tasks.
47

5-
require 'parallel'
68
require 'stringio'
79

810
module OctocatalogDiff
911
module Util
10-
# This is a utility class to execute tasks in parallel, using the 'parallel' gem.
11-
# If parallel processing has been disabled, this instead executes the tasks serially,
12-
# but provides the same API as the parallel tasks.
1312
class Parallel
13+
# This exception is called for a task that didn't complete.
14+
class IncompleteTask < RuntimeError; end
15+
16+
# --------------------------------------
1417
# This class represents a parallel task. It requires a method reference, which will be executed with
1518
# any supplied arguments. It can optionally take a text description and a validator function.
19+
# --------------------------------------
1620
class Task
1721
attr_reader :description
1822
attr_accessor :args
@@ -35,10 +39,12 @@ def validate(result, logger = Logger.new(StringIO.new))
3539
end
3640
end
3741

42+
# --------------------------------------
3843
# This class represents the result from a parallel task. The status is set to true (success), false (error),
3944
# or nil (task was killed before it could complete). The exception (for failure) and output object (for success)
4045
# are readable attributes. The validity of the results, determined by executing the 'validate' method of the Task,
4146
# is available to be set and fetched.
47+
# --------------------------------------
4248
class Result
4349
attr_reader :output, :args
4450
attr_accessor :status, :exception
@@ -51,121 +57,170 @@ def initialize(opts = {})
5157
end
5258
end
5359

60+
# --------------------------------------
61+
# Static methods in the class
62+
# --------------------------------------
63+
5464
# Entry point for parallel processing. By default this will perform parallel processing,
5565
# but it will also accept an option to do serial processing instead.
5666
# @param task_array [Array<Parallel::Task>] Tasks to run
5767
# @param logger [Logger] Optional logger object
5868
# @param parallelized [Boolean] True for parallel processing, false for serial processing
69+
# @param raise_exception [Boolean] True to raise exception immediately if one occurs; false to return exception in results
5970
# @return [Array<Parallel::Result>] Parallel results (same order as tasks)
60-
def self.run_tasks(task_array, logger = nil, parallelized = true)
71+
def self.run_tasks(task_array, logger = nil, parallelized = true, raise_exception = false)
6172
# Create a throwaway logger object if one is not given
6273
logger ||= Logger.new(StringIO.new)
6374

64-
# Validate input - we need an array. If the array is empty then return an empty array right away.
75+
# Validate input - we need an array of OctocatalogDiff::Util::Parallel::Task. If the array is empty then
76+
# return an empty array right away.
6577
raise ArgumentError, "run_tasks() argument must be array, not #{task_array.class}" unless task_array.is_a?(Array)
6678
return [] if task_array.empty?
6779

68-
# Make sure each element in the array is a OctocatalogDiff::Util::Parallel::Task
69-
task_array.each do |x|
70-
next if x.is_a?(OctocatalogDiff::Util::Parallel::Task)
71-
raise ArgumentError, "Element #{x.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{x.class}"
80+
invalid_inputs = task_array.reject { |task| task.is_a?(OctocatalogDiff::Util::Parallel::Task) }
81+
if invalid_inputs.any?
82+
ele = invalid_inputs.first
83+
raise ArgumentError, "Element #{ele.inspect} must be a OctocatalogDiff::Util::Parallel::Task, not a #{ele.class}"
7284
end
7385

74-
# Actually do the processing - choose here between parallel and serial
75-
parallelized ? run_tasks_parallel(task_array, logger) : run_tasks_serial(task_array, logger)
86+
# Initialize the result array. For now all entries in the array indicate that the task was killed.
87+
# Actual statuses will replace this initial status. If the initial status wasn't replaced, then indeed,
88+
# the task was killed.
89+
result = task_array.map { |x| Result.new(exception: IncompleteTask.new('Killed'), args: x.args) }
90+
logger.debug "Initialized parallel task result array: size=#{result.size}"
91+
92+
# Execute as per the requested method (serial or parallel) and handle results.
93+
exception = parallelized ? run_tasks_parallel(result, task_array, logger) : run_tasks_serial(result, task_array, logger)
94+
raise exception if exception && raise_exception
95+
result
7696
end
7797

78-
# Use the parallel gem to run each task in the task array. Under the hood this is forking a process for
79-
# each task, and serializing/deserializing the arguments and the outputs.
98+
# Utility method! Not intended to be called from outside this class.
99+
# ---
100+
# Use a forking strategy to run tasks in parallel. Each task in the array is forked in a child
101+
# process, and when that task completes it writes its result (OctocatalogDiff::Util::Parallel::Result)
102+
# into a serialized data file. Once children are forked this method waits for their return, deserializing
103+
# the output from each data file and updating the `result` array with actual results.
104+
# @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
80105
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
81106
# @param logger [Logger] Logger
82-
# @return [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
83-
def self.run_tasks_parallel(task_array, logger)
84-
# Create an empty array of results. The status is nil and the exception is pre-populated. If the code
85-
# runs successfully and doesn't get killed, all of these default values will be overwritten. If the code
86-
# gets killed before the task finishes, this exception will remain.
87-
result = task_array.map do |x|
88-
Result.new(exception: ::Parallel::Kill.new('Killed'), args: x.args)
107+
# @return [Exception] First exception encountered by a child process; returns nil if no exceptions encountered.
108+
def self.run_tasks_parallel(result, task_array, logger)
109+
pidmap = {}
110+
ipc_tempdir = Dir.mktmpdir
111+
112+
# Child process forking
113+
task_array.each_with_index do |task, index|
114+
# simplecov doesn't see this because it's forked
115+
# :nocov:
116+
this_pid = fork do
117+
task_result = execute_task(task, logger)
118+
File.open(File.join(ipc_tempdir, "#{Process.pid}.dat"), 'w') { |f| f.write Marshal.dump(task_result) }
119+
Kernel.exit! 0 # Kernel.exit! avoids at_exit from parents being triggered by children exiting
120+
end
121+
# :nocov:
122+
123+
pidmap[this_pid] = { index: index, start_time: Time.now }
124+
logger.debug "Launched pid=#{this_pid} for index=#{index}"
125+
logger.reopen if logger.respond_to?(:reopen)
89126
end
90-
logger.debug "Initialized parallel task result array: size=#{result.size}"
91127

92-
# Do parallel processing
93-
::Parallel.each(task_array,
94-
finish: lambda do |item, i, parallel_result|
95-
# Set the result array element to the result
96-
result[i] = parallel_result
97-
98-
# Kill all other parallel tasks if this task failed by throwing an exception
99-
raise ::Parallel::Kill unless parallel_result.exception.nil?
100-
101-
# Run the validator to determine if the result is in fact valid. The validator
102-
# returns true or false. If true, set the 'valid' attribute in the result. If
103-
# false, kill all other parallel tasks.
104-
if item.validate(parallel_result.output, logger)
105-
logger.debug("Success #{item.description}")
106-
else
107-
logger.warn("Failed #{item.description}")
108-
result[i].status = false
109-
raise ::Parallel::Kill
110-
end
111-
end) do |ele|
112-
# simplecov does not detect that this code runs because it's forked, but this is
113-
# tested extensively in the parallel_spec.rb spec file.
114-
# :nocov:
128+
# Waiting for children and handling results
129+
while pidmap.any?
130+
this_pid, exit_obj = Process.wait2(0)
131+
next unless this_pid && pidmap.key?(this_pid)
132+
index = pidmap[this_pid][:index]
133+
exitstatus = exit_obj.exitstatus
134+
raise "PID=#{this_pid} exited abnormally: #{exit_obj.inspect}" if exitstatus.nil?
135+
raise "PID=#{this_pid} exited with status #{exitstatus}" unless exitstatus.zero?
136+
137+
input = File.read(File.join(ipc_tempdir, "#{this_pid}.dat"))
138+
result[index] = Marshal.load(input) # rubocop:disable Security/MarshalLoad
139+
time_delta = Time.now - pidmap[this_pid][:start_time]
140+
pidmap.delete(this_pid)
141+
142+
logger.debug "PID=#{this_pid} completed in #{time_delta} seconds, #{input.length} bytes"
143+
144+
next if result[index].status
145+
return result[index].exception
146+
end
147+
148+
logger.debug 'All child processes completed with no exceptions raised'
149+
150+
# Cleanup: Kill any child processes that are still running, and clean the temporary directory
151+
# where data files were stored.
152+
ensure
153+
pidmap.each do |pid, _pid_data|
115154
begin
116-
logger.debug("Begin #{ele.description}")
117-
output = ele.execute(logger)
118-
logger.debug("Success #{ele.description}")
119-
Result.new(output: output, status: true, args: ele.args)
120-
rescue => exc
121-
logger.debug("Failed #{ele.description}: #{exc.class} #{exc.message}")
122-
Result.new(exception: exc, status: false, args: ele.args)
155+
Process.kill('TERM', pid)
156+
rescue Errno::ESRCH # rubocop:disable Lint/HandleExceptions
157+
# If the process doesn't exist, that's fine.
123158
end
124-
# :nocov:
125159
end
126160

127-
# Return result
128-
result
161+
retries = 0
162+
while File.directory?(ipc_tempdir) && retries < 10
163+
retries += 1
164+
begin
165+
FileUtils.remove_entry_secure ipc_tempdir
166+
rescue Errno::ENOTEMPTY, Errno::ENOENT # rubocop:disable Lint/HandleExceptions
167+
# Errno::ENOTEMPTY will trigger a retry because the directory exists
168+
# Errno::ENOENT will break the loop because the directory won't exist next time it's checked
169+
end
170+
end
129171
end
130172

173+
# Utility method! Not intended to be called from outside this class.
174+
# ---
131175
# Perform the tasks in serial.
176+
# @param result [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
132177
# @param task_array [Array<OctocatalogDiff::Util::Parallel::Task>] Tasks to perform
133178
# @param logger [Logger] Logger
134-
# @return [Array<OctocatalogDiff::Util::Parallel::Result>] Parallel task results
135-
def self.run_tasks_serial(task_array, logger)
136-
# Create an empty array of results. The status is nil and the exception is pre-populated. If the code
137-
# runs successfully, all of these default values will be overwritten. If a predecessor task fails, all
138-
# later task will have the defined exception.
139-
result = task_array.map do |x|
140-
Result.new(exception: ::RuntimeError.new('Cancellation - A prior task failed'), args: x.args)
141-
end
142-
179+
def self.run_tasks_serial(result, task_array, logger)
143180
# Perform the tasks 1 by 1 - each successful task will replace an element in the 'result' array,
144181
# whereas a failed task will replace the current element with an exception, and all later tasks
145182
# will not be replaced (thereby being populated with the cancellation error).
146-
task_counter = 0
147-
task_array.each do |ele|
148-
begin
149-
logger.debug("Begin #{ele.description}")
150-
output = ele.execute(logger)
151-
result[task_counter] = Result.new(output: output, status: true, args: ele.args)
152-
rescue => exc
153-
logger.debug("Failed #{ele.description}: #{exc.class} #{exc.message}")
154-
result[task_counter] = Result.new(exception: exc, status: false, args: ele.args)
155-
end
183+
task_array.each_with_index do |ele, task_counter|
184+
result[task_counter] = execute_task(ele, logger)
185+
next if result[task_counter].status
186+
return result[task_counter].exception
187+
end
188+
nil
189+
end
156190

157-
if ele.validate(output, logger)
158-
logger.debug("Success #{ele.description}")
191+
# Utility method! Not intended to be called from outside this class.
192+
# ---
193+
# Process a single task. Called by run_tasks_parallel / run_tasks_serial.
194+
# This method will report all exceptions in the OctocatalogDiff::Util::Parallel::Result object
195+
# itself, and not raise them.
196+
# @param task [OctocatalogDiff::Util::Parallel::Task] Task object
197+
# @param logger [Logger] Logger
198+
# @return [OctocatalogDiff::Util::Parallel::Result] Parallel task result
199+
def self.execute_task(task, logger)
200+
begin
201+
logger.debug("Begin #{task.description}")
202+
output = task.execute(logger)
203+
result = Result.new(output: output, status: true, args: task.args)
204+
rescue => exc
205+
logger.debug("Failed #{task.description}: #{exc.class} #{exc.message}")
206+
# Immediately return without running the validation, since this already failed.
207+
return Result.new(exception: exc, status: false, args: task.args)
208+
end
209+
210+
begin
211+
if task.validate(output, logger)
212+
logger.debug("Success #{task.description}")
159213
else
160-
logger.warn("Failed #{ele.description}")
161-
result[task_counter].status = false
214+
# Preferably the validator method raised its own exception. However if it
215+
# simply returned false, raise our own exception here.
216+
raise "Failed #{task.description} validation (unspecified error)"
162217
end
163-
164-
break unless result[task_counter].status
165-
task_counter += 1
218+
rescue => exc
219+
logger.warn("Failed #{task.description} validation: #{exc.class} #{exc.message}")
220+
result.status = false
221+
result.exception = exc
166222
end
167223

168-
# Return the result
169224
result
170225
end
171226
end

octocatalog-diff.gemspec

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ EOF
2626
s.add_runtime_dependency 'diffy', '>= 3.1.0'
2727
s.add_runtime_dependency 'httparty', '>= 0.11.0'
2828
s.add_runtime_dependency 'hashdiff', '>= 0.3.0'
29-
s.add_runtime_dependency 'parallel', '>= 1.11.1'
3029
s.add_runtime_dependency 'rugged', '>= 0.25.0b2'
3130

3231
s.add_development_dependency 'rspec', '~> 3.4.0'

0 commit comments

Comments
 (0)