Skip to content

Commit 26c0e74

Browse files
committed
Increase the plugin's resiliency when working with the file system (#15)
* Handle ruby's ENOENT exception in upload * Recover existing files before starting workers * Recover existing files only if directory exists
1 parent 15f72cb commit 26c0e74

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

lib/logstash/outputs/kusto.rb

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ def register
134134

135135
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, database, table, mapping, delete_temp_files, @logger, executor)
136136

137+
# send existing files
138+
recover_past_files if recovery
139+
140+
@last_stale_cleanup_cycle = Time.now
141+
137142
@flush_interval = @flush_interval.to_i
138143
if @flush_interval > 0
139144
@flusher = Interval.start(@flush_interval, -> { flush_pending_files })
@@ -142,11 +147,6 @@ def register
142147
if (@stale_cleanup_type == 'interval') && (@stale_cleanup_interval > 0)
143148
@cleaner = Interval.start(stale_cleanup_interval, -> { close_stale_files })
144149
end
145-
146-
@last_stale_cleanup_cycle = Time.now
147-
148-
# send existing files
149-
recover_past_files if recovery
150150
end
151151

152152
private
@@ -360,9 +360,11 @@ def recover_past_files
360360
pattern_start = @path.index('%') || path_last_char
361361
last_folder_before_pattern = @path.rindex('/', pattern_start) || path_last_char
362362
new_path = path[0..last_folder_before_pattern]
363-
@logger.info("Going to recover old files in path #{@new_path}")
364-
363+
365364
begin
365+
return unless Dir.exist?(new_path)
366+
@logger.info("Going to recover old files in path #{@new_path}")
367+
366368
old_files = Find.find(new_path).select { |p| /.*\.kusto$/ =~ p }
367369
@logger.info("Found #{old_files.length} old file(s), sending them now...")
368370

lib/logstash/outputs/kusto/ingestor.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ def upload(path, delete_on_success)
9696
File.delete(path) if delete_on_success
9797

9898
@logger.debug("File #{path} sent to kusto.")
99+
rescue Errno::ENOENT => e
100+
@logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
99101
rescue Java::JavaNioFile::NoSuchFileException => e
100102
@logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace)
101103
rescue => e

0 commit comments

Comments
 (0)