diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index d10f366044..f1f903aaad 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -231,76 +231,90 @@ def flush! flushing_threads.each{|t| t.join } end + class ShutdownSequence + attr_reader :method, :checker + def initialize(method, checker, is_safe) + @method = method + @checker = checker + @is_safe = is_safe + end + + def safe? + @is_safe + end + end + + SHUTDOWN_SEQUENCES = [ + ShutdownSequence.new(:stop, :stopped?, true), + # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation + ShutdownSequence.new(:shutdown, :shutdown?, false), + ShutdownSequence.new(:after_shutdown, :after_shutdown?, true), + ShutdownSequence.new(:close, :closed?, false), + ShutdownSequence.new(:terminate, :terminated?, true), + ] + def shutdown # Fluentd's shutdown sequence is stop, before_shutdown, shutdown, after_shutdown, close, terminate for plugins # These method callers does `rescue Exception` to call methods of shutdown sequence as far as possible # if plugin methods does something like infinite recursive call, `exit`, unregistering signal handlers or others. # Plugins should be separated and be in sandbox to protect data in each plugins/buffers. - lifecycle_safe_sequence = ->(method, checker) { - lifecycle do |instance, kind| - begin - log.debug "calling #{method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.__send__(method) unless instance.__send__(checker) - rescue Exception => e - log.warn "unexpected error while calling #{method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e - log.warn_backtrace + SHUTDOWN_SEQUENCES.each do |sequence| + if sequence.safe? + lifecycle do |instance, kind| + execute_shutdown_sequence(sequence, instance, kind) end + next end - } - lifecycle_unsafe_sequence = ->(method, checker) { - operation = case method - when :shutdown then "shutting down" - when :close then "closing" - else - raise "BUG: unknown method name '#{method}'" - end operation_threads = [] callback = ->(){ - operation_threads.each{|t| t.join } + operation_threads.each { |t| t.join } operation_threads.clear + # TODO: サーバープロセスにワーカーが停止したことをシグナル送信。もう少しマシな方法はないのか? + Process.kill 34, Process.ppid } lifecycle(kind_callback: callback) do |instance, kind| t = Thread.new do Thread.current.abort_on_exception = true - begin - if method == :shutdown - # To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence. - # The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown - operation = "preparing shutdown" # for logging - log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - begin - instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?) - rescue Exception => e - log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e - log.warn_backtrace - end - operation = "shutting down" - log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.__send__(:shutdown) unless instance.__send__(:shutdown?) - else - log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id - instance.__send__(method) unless instance.__send__(checker) - end - rescue Exception => e - log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e - log.warn_backtrace - end + execute_shutdown_sequence(sequence, instance, kind) end operation_threads << t end - } - - lifecycle_safe_sequence.call(:stop, :stopped?) + end + end - # before_shutdown does force_flush for output plugins: it should block, so it's unsafe operation - lifecycle_unsafe_sequence.call(:shutdown, :shutdown?) + def execute_shutdown_sequence(sequence, instance, kind) + unless sequence.method == :shutdown + begin + log.debug "calling #{sequence.method} on #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id + instance.__send__(sequence.method) unless instance.__send__(sequence.checker) + rescue Exception => e + log.warn "unexpected error while calling #{sequence.method} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + log.warn_backtrace + end - lifecycle_safe_sequence.call(:after_shutdown, :after_shutdown?) + return + end - lifecycle_unsafe_sequence.call(:close, :closed?) + # To avoid Input#shutdown and Output#before_shutdown mismatch problem, combine before_shutdown and shutdown call in one sequence. + # The problem is in_tail flushes buffered multiline in shutdown but output's flush_at_shutdown is invoked in before_shutdown + begin + operation = "preparing shutdown" # for logging + log.debug "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id + instance.__send__(:before_shutdown) unless instance.__send__(:before_shutdown?) + rescue Exception => e + log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + log.warn_backtrace + end - lifecycle_safe_sequence.call(:terminate, :terminated?) + begin + operation = "shutting down" + log.info "#{operation} #{kind} plugin", type: Plugin.lookup_type_from_class(instance.class), plugin_id: instance.plugin_id + instance.__send__(:shutdown) unless instance.__send__(sequence.checker) + rescue Exception => e + log.warn "unexpected error while #{operation} on #{kind} plugin", plugin: instance.class, plugin_id: instance.plugin_id, error: e + log.warn_backtrace + end end def suppress_interval(interval_time) diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb index d565abf600..e5d6f7270c 100644 --- a/lib/fluent/supervisor.rb +++ b/lib/fluent/supervisor.rb @@ -43,6 +43,7 @@ def before_run @rpc_endpoint = nil @rpc_server = nil @counter = nil + @socket_manager_server = nil @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir @@ -66,8 +67,12 @@ def before_run if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" else - server = ServerEngine::SocketManager::Server.open - ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s + if ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH') + @socket_manager_server = ServerEngine::SocketManager::Server.take_over_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + else + @socket_manager_server = ServerEngine::SocketManager::Server.open + ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s + end end end @@ -138,7 +143,7 @@ def run_rpc_server @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? - supervisor_sigusr2_handler + graceful_reload else Process.kill :USR2, Process.pid end @@ -186,8 +191,34 @@ def install_supervisor_signal_handlers end trap :USR2 do + # ダウンタイム無し更新 $log.debug 'fluentd supervisor process got SIGUSR2' - supervisor_sigusr2_handler + + # TODO: Worker を終了させた際に ServerEngine によって自動的に再起動されないようにする + scale_down + + # ダウンタイム無し更新、worker をすべて停止 + send_signal_to_workers(:TERM) + + @prepared_zero_downtime_updating = true + end + + trap 34 do + # ワーカーが完全停止したら 34 シグナルを受信する + if @prepared_zero_downtime_updating + # ダウンタイム無し更新の処理途中なら残りを実施する + + @prepared_zero_downtime_updating = false + + # 新しいプロセスを起動して更新を反映する + start_new_supervisor + + # TODO: 少なくとも worker を1つでも起動できるように戻さないと自身が死ななかった + scale_up + + # 古いプロセス(自分自身)を止める + Process.kill :TERM, Process.pid + end end end @@ -254,7 +285,7 @@ def install_windows_event_handler when :usr1 supervisor_sigusr1_handler when :usr2 - supervisor_sigusr2_handler + graceful_reload when :cont supervisor_dump_handler_for_windows when :stop_event_thread @@ -284,7 +315,7 @@ def supervisor_sigusr1_handler send_signal_to_workers(:USR1) end - def supervisor_sigusr2_handler + def graceful_reload conf = nil t = Thread.new do $log.info 'Reloading new config' @@ -312,6 +343,14 @@ def supervisor_sigusr2_handler $log.error "Failed to reload config file: #{e}" end + def start_new_supervisor + commands = [ServerEngine.ruby_bin_path, $0] + ARGV + env_to_add = {"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN} + Process.spawn(env_to_add, commands.join(" ")) + rescue => e + $log.error "Failed to start a new supervisor: #{e}" + end + def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. @@ -332,6 +371,13 @@ def supervisor_dump_handler_for_windows $log.error "failed to dump: #{e}" end + def scale_down + self.scale_workers(0) + end + def scale_up + self.scale_workers(1) + end + def kill_worker if config[:worker_pid] pids = config[:worker_pid].clone @@ -500,11 +546,12 @@ def self.default_options end def self.cleanup_resources - unless Fluent.windows? - if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') - FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) - end - end + # TODO: 2回目の USR2 シグナル発火時に SERVERENGINE_SOCKETMANAGER_PATH のファイルが見つからずに起動に失敗するためコメントアウト + # unless Fluent.windows? + # if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') + # FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) + # end + # end end def initialize(cl_opt)