Skip to content

Commit ecb097c

Browse files
daipomWatson1978
andcommitted
Zero downtime restart
Add a new feature: Zero downtime update/reload 1. The supervisor receives SIGUSR2. 2. Spawn a new supervisor. 3. Take over shared sockets. 4. Launch new workers, and stop old processes in parallel. * Launch new workers with source-only mode * Limit to zero_downtime_restart_ready? input plugin * Send SIGTERM to the old supervisor after 10s delay from 3. 5. The old supervisor stops and sends SIGWINCH to the new one. 6. The new workers run fully. Problem to solve: Updating Fluentd or reloading a config causes downtime. Plugins that receive data as a server, such as `in_udp`, `in_tcp`, and `in_syslog`, cannot receive data during this time. This means that the data sent by a client is lost during this time unless the client has a re-sending feature. This makes updating Fluentd or reloading a config difficult in some cases. Note: need these feature * #4661 * treasure-data/serverengine#146 Co-authored-by: Shizuo Fujita <[email protected]> Signed-off-by: Daijiro Fukuda <[email protected]>
1 parent 38c63f3 commit ecb097c

File tree

7 files changed

+170
-27
lines changed

7 files changed

+170
-27
lines changed

lib/fluent/engine.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def initialize
5151

5252
attr_reader :root_agent, :system_config, :supervisor_mode
5353

54-
def init(system_config, supervisor_mode: false)
54+
def init(system_config, supervisor_mode: false, start_in_parallel: false)
5555
@system_config = system_config
5656
@supervisor_mode = supervisor_mode
5757

@@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false)
6060

6161
@log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?
6262

63-
@root_agent = RootAgent.new(log: log, system_config: @system_config)
63+
@root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)
6464

6565
self
6666
end

lib/fluent/plugin/in_syslog.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,10 @@ def multi_workers_ready?
156156
true
157157
end
158158

159+
def zero_downtime_restart_ready?
160+
true
161+
end
162+
159163
def start
160164
super
161165

lib/fluent/plugin/in_tcp.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ def multi_workers_ready?
101101
true
102102
end
103103

104+
def zero_downtime_restart_ready?
105+
true
106+
end
107+
104108
def start
105109
super
106110

lib/fluent/plugin/in_udp.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ def multi_workers_ready?
6565
true
6666
end
6767

68+
def zero_downtime_restart_ready?
69+
true
70+
end
71+
6872
def start
6973
super
7074

lib/fluent/plugin/input.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ def metric_callback(es)
7070
def multi_workers_ready?
7171
false
7272
end
73+
74+
def zero_downtime_restart_ready?
75+
false
76+
end
7377
end
7478
end
7579
end

lib/fluent/root_agent.rb

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,15 +48,43 @@ module Fluent
4848
class RootAgent < Agent
4949
ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label
5050

51-
def initialize(log:, system_config: SystemConfig.new)
51+
class SourceOnlyMode
52+
DISABELD = 0
53+
NORMAL = 1
54+
ONLY_ZERO_DOWNTIME_RESTART_READY = 2
55+
56+
def initialize(with_source_only, start_in_parallel)
57+
if start_in_parallel
58+
@mode = ONLY_ZERO_DOWNTIME_RESTART_READY
59+
elsif with_source_only
60+
@mode = NORMAL
61+
else
62+
@mode = DISABELD
63+
end
64+
end
65+
66+
def enabled?
67+
@mode != DISABELD
68+
end
69+
70+
def only_zero_downtime_restart_ready?
71+
@mode == ONLY_ZERO_DOWNTIME_RESTART_READY
72+
end
73+
74+
def disable!
75+
@mode = DISABELD
76+
end
77+
end
78+
79+
def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false)
5280
super(log: log)
5381

5482
@labels = {}
5583
@inputs = []
5684
@suppress_emit_error_log_interval = 0
5785
@next_emit_error_log_time = nil
5886
@without_source = system_config.without_source || false
59-
@with_source_only = system_config.with_source_only || false
87+
@source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel)
6088
@source_only_buffer_agent = nil
6189
@enable_input_metrics = system_config.enable_input_metrics || false
6290

@@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new)
6795
attr_reader :labels
6896

6997
def source_only_router
70-
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
98+
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled?
7199
@source_only_buffer_agent.event_router
72100
end
73101

@@ -154,7 +182,7 @@ def configure(conf)
154182

155183
super
156184

157-
setup_source_only_buffer_agent if @with_source_only
185+
setup_source_only_buffer_agent if @source_only_mode.enabled?
158186

159187
# initialize <source> elements
160188
if @without_source
@@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent
187215
end
188216

189217
def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
218+
only_zero_downtime_restart_ready = false
219+
190220
unless kind_or_agent_list
191-
if @with_source_only
221+
if @source_only_mode.enabled?
192222
kind_or_agent_list = [:input, @source_only_buffer_agent]
223+
only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready?
193224
elsif @source_only_buffer_agent
194225
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
195226
kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
@@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
214245
end
215246
display_kind = (kind == :output_with_router ? :output : kind)
216247
list.each do |instance|
248+
if only_zero_downtime_restart_ready
249+
next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready?
250+
end
217251
yield instance, display_kind
218252
end
219253
end
@@ -257,7 +291,7 @@ def flush!
257291
end
258292

259293
def cancel_source_only!
260-
unless @with_source_only
294+
unless @source_only_mode.enabled?
261295
log.info "do nothing for canceling with-source-only because the current mode is not with-source-only."
262296
return
263297
end
@@ -285,7 +319,7 @@ def cancel_source_only!
285319
setup_source_only_buffer_agent(flush: true)
286320
start(kind_or_agent_list: [@source_only_buffer_agent])
287321

288-
@with_source_only = false
322+
@source_only_mode.disable!
289323
end
290324

291325
def shutdown(kind_or_agent_list: nil)
@@ -378,7 +412,7 @@ def add_source(type, conf)
378412
# See also 'fluentd/plugin/input.rb'
379413
input.context_router = @event_router
380414
input.configure(conf)
381-
input.event_emitter_apply_source_only if @with_source_only
415+
input.event_emitter_apply_source_only if @source_only_mode.enabled?
382416
if @enable_input_metrics
383417
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
384418
end

lib/fluent/supervisor.rb

Lines changed: 110 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ def before_run
4343
@rpc_endpoint = nil
4444
@rpc_server = nil
4545
@counter = nil
46+
@socket_manager_server = nil
47+
@starting_new_supervisor_without_downtime = false
48+
@new_supervisor_pid = nil
49+
start_in_parallel = ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
4650

4751
@fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-")
4852
ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir
@@ -65,18 +69,31 @@ def before_run
6569

6670
if config[:disable_shared_socket]
6771
$log.info "shared socket for multiple workers is disabled"
72+
elsif start_in_parallel
73+
begin
74+
raise "[BUG] SERVERENGINE_SOCKETMANAGER_PATH env var must exist when starting in parallel" unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
75+
@socket_manager_server = ServerEngine::SocketManager::Server.share_sockets_with_another_server(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
76+
$log.info "restart-without-downtime: took over the shared sockets", path: ENV['SERVERENGINE_SOCKETMANAGER_PATH']
77+
rescue => e
78+
$log.error "restart-without-downtime: cancel sequence because failed to take over the shared sockets", error: e
79+
raise
80+
end
6881
else
69-
server = ServerEngine::SocketManager::Server.open
70-
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s
82+
@socket_manager_server = ServerEngine::SocketManager::Server.open
83+
ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = @socket_manager_server.path.to_s
7184
end
85+
86+
stop_parallel_old_supervisor_after_delay if start_in_parallel
7287
end
7388

7489
def after_run
7590
stop_windows_event_thread if Fluent.windows?
7691
stop_rpc_server if @rpc_endpoint
7792
stop_counter_server if @counter
7893
cleanup_lock_dir
79-
Fluent::Supervisor.cleanup_resources
94+
Fluent::Supervisor.cleanup_socketmanager_path unless @starting_new_supervisor_without_downtime
95+
96+
notify_new_supervisor_that_old_one_has_stopped if @starting_new_supervisor_without_downtime
8097
end
8198

8299
def cleanup_lock_dir
@@ -138,7 +155,7 @@ def run_rpc_server
138155
@rpc_server.mount_proc('/api/config.gracefulReload') { |req, res|
139156
$log.debug "fluentd RPC got /api/config.gracefulReload request"
140157
if Fluent.windows?
141-
supervisor_sigusr2_handler
158+
graceful_reload
142159
else
143160
Process.kill :USR2, Process.pid
144161
end
@@ -172,6 +189,47 @@ def stop_counter_server
172189
@counter.stop
173190
end
174191

192+
def stop_parallel_old_supervisor_after_delay
193+
# TODO if the new supervisor fails to start and this is not called,
194+
# it would be necessary to update the pid in the PID file to the old one when daemonized.
195+
196+
Thread.new do
197+
# Delay to wait the new workers to start up.
198+
# Even if it takes a long time to start the new workers and stop the old Fluentd first,
199+
# it is no problem because the socket buffer works, as long as the capacity is not exceeded.
200+
sleep 10
201+
old_pid = ENV["FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"]&.to_i
202+
if old_pid
203+
$log.info "restart-without-downtime: stop the old supervisor"
204+
Process.kill :TERM, old_pid
205+
end
206+
rescue => e
207+
$log.warn "restart-without-downtime: failed to stop the old supervisor." +
208+
" If the old one does not exist, please send SIGWINCH to this new process to start to work fully." +
209+
" If it exists, something went wrong. Please kill the old one manually.",
210+
error: e
211+
end
212+
end
213+
214+
def notify_new_supervisor_that_old_one_has_stopped
215+
if config[:pid_path]
216+
new_pid = File.read(config[:pid_path]).to_i
217+
else
218+
raise "[BUG] new_supervisor_pid is not saved" unless @new_supervisor_pid
219+
new_pid = @new_supervisor_pid
220+
end
221+
222+
$log.info "restart-without-downtime: notify the new supervisor (pid: #{new_pid}) that old one has stopped"
223+
Process.kill :WINCH, new_pid
224+
rescue => e
225+
$log.error(
226+
"restart-without-downtime: failed to notify the new supervisor." +
227+
" Please send SIGWINCH to the new supervisor process manually" +
228+
" if it does not start to work fully.",
229+
error: e
230+
)
231+
end
232+
175233
def install_supervisor_signal_handlers
176234
return if Fluent.windows?
177235

@@ -187,7 +245,11 @@ def install_supervisor_signal_handlers
187245

188246
trap :USR2 do
189247
$log.debug 'fluentd supervisor process got SIGUSR2'
190-
supervisor_sigusr2_handler
248+
if Fluent.windows?
249+
graceful_reload
250+
else
251+
restart_without_downtime
252+
end
191253
end
192254

193255
trap :WINCH do
@@ -259,7 +321,7 @@ def install_windows_event_handler
259321
when :usr1
260322
supervisor_sigusr1_handler
261323
when :usr2
262-
supervisor_sigusr2_handler
324+
graceful_reload
263325
when :cont
264326
supervisor_dump_handler_for_windows
265327
when :stop_event_thread
@@ -289,7 +351,7 @@ def supervisor_sigusr1_handler
289351
send_signal_to_workers(:USR1)
290352
end
291353

292-
def supervisor_sigusr2_handler
354+
def graceful_reload
293355
conf = nil
294356
t = Thread.new do
295357
$log.info 'Reloading new config'
@@ -317,7 +379,38 @@ def supervisor_sigusr2_handler
317379
$log.error "Failed to reload config file: #{e}"
318380
end
319381

382+
def restart_without_downtime
383+
# TODO exclusive lock
384+
385+
$log.info "start restart-without-downtime sequence"
386+
387+
if @starting_new_supervisor_without_downtime
388+
$log.warn "restart-without-downtime: canceled because it is already starting"
389+
return
390+
end
391+
if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
392+
$log.warn "restart-without-downtime: canceled because the previous sequence is still running"
393+
return
394+
end
395+
396+
@starting_new_supervisor_without_downtime = true
397+
commands = [ServerEngine.ruby_bin_path, $0] + ARGV
398+
env_to_add = {
399+
"SERVERENGINE_SOCKETMANAGER_INTERNAL_TOKEN" => ServerEngine::SocketManager::INTERNAL_TOKEN,
400+
"FLUENT_RUNNING_IN_PARALLEL_WITH_OLD" => "#{Process.pid}",
401+
}
402+
pid = Process.spawn(env_to_add, commands.join(" "))
403+
@new_supervisor_pid = pid unless config[:daemonize]
404+
rescue => e
405+
$log.error "restart-without-downtime: failed", error: e
406+
@starting_new_supervisor_without_downtime = false
407+
end
408+
320409
def cancel_source_only
410+
if ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
411+
$log.info "restart-without-downtime: done all sequences, now the new workers starts to work fully"
412+
ENV.delete("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD")
413+
end
321414
send_signal_to_workers(:WINCH)
322415
end
323416

@@ -510,12 +603,11 @@ def self.default_options
510603
}
511604
end
512605

513-
def self.cleanup_resources
514-
unless Fluent.windows?
515-
if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH')
516-
FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
517-
end
518-
end
606+
def self.cleanup_socketmanager_path
607+
return if Fluent.windows?
608+
return unless ENV.key?('SERVERENGINE_SOCKETMANAGER_PATH')
609+
610+
FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH'])
519611
end
520612

521613
def initialize(cl_opt)
@@ -583,7 +675,7 @@ def run_supervisor(dry_run: false)
583675
begin
584676
ServerEngine::Privilege.change(@chuser, @chgroup)
585677
MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
586-
Fluent::Engine.init(@system_config, supervisor_mode: true)
678+
Fluent::Engine.init(@system_config, supervisor_mode: true, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"))
587679
Fluent::Engine.run_configure(@conf, dry_run: dry_run)
588680
rescue Fluent::ConfigError => e
589681
$log.error 'config error', file: @config_path, error: e
@@ -632,10 +724,10 @@ def run_worker
632724
File.umask(@chumask.to_i(8))
633725
end
634726
MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support)
635-
Fluent::Engine.init(@system_config)
727+
Fluent::Engine.init(@system_config, start_in_parallel: ENV.key?("FLUENT_RUNNING_IN_PARALLEL_WITH_OLD"))
636728
Fluent::Engine.run_configure(@conf)
637729
Fluent::Engine.run
638-
self.class.cleanup_resources if @standalone_worker
730+
self.class.cleanup_socketmanager_path if @standalone_worker
639731
exit 0
640732
end
641733
end
@@ -853,7 +945,8 @@ def install_main_process_signal_handlers
853945
end
854946

855947
trap :USR2 do
856-
reload_config
948+
# Do nothing
949+
# TODO consider suitable code for this
857950
end
858951

859952
trap :CONT do

0 commit comments

Comments
 (0)