Skip to content

Commit ca981ba

Browse files
committed
Updated custom_size_based_buffer.rb
Removed the temp file buffer and added retry to prevent data loss
1 parent b7aba1a commit ca981ba

File tree

3 files changed

+15
-106
lines changed

3 files changed

+15
-106
lines changed

lib/logstash/outputs/kusto.rb

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
7777
default :codec, 'json_lines'
7878

7979
def register
80-
# Set buffer_file to a valid file path
81-
buffer_file = File.join(Dir.pwd, 'buffer', 'kusto_buffer.dat')
82-
83-
# Ensure the buffer directory exists
84-
FileUtils.mkdir_p(File.dirname(buffer_file))
8580
# Initialize the custom buffer with size, interval, and buffer file
86-
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, buffer_file) do |events|
81+
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
8782
flush_buffer(events)
8883
end
8984

@@ -146,9 +141,9 @@ def flush_buffer(events)
146141
begin
147142
@ingestor.upload_async(events.join)
148143
rescue => e
149-
# Log the error and continue
150144
@logger.error("Error during flush: #{e.message}")
151145
@logger.error(e.backtrace.join("\n"))
146+
raise e # Exception is raised to trigger the rescue block in buffer_flush
152147
end
153148
end
154149

Lines changed: 13 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
require 'logger'
22
require 'thread'
3-
require 'fileutils'
43

54
module LogStash
65
module Outputs
76
class CustomSizeBasedBuffer
8-
def initialize(max_size_mb, max_interval, buffer_file, &flush_callback)
9-
raise ArgumentError, "buffer_file cannot be nil" if buffer_file.nil?
10-
7+
def initialize(max_size_mb, max_interval, &flush_callback)
118
@buffer_config = {
129
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
1310
max_interval: max_interval,
14-
buffer_file: buffer_file,
1511
logger: Logger.new(STDOUT)
1612
}
1713
@buffer_state = {
@@ -29,7 +25,6 @@ def initialize(max_size_mb, max_interval, buffer_file, &flush_callback)
2925
@shutdown = false
3026
@pending_mutex = Mutex.new
3127
@flush_mutex = Mutex.new
32-
load_buffer_from_file
3328
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
3429
end
3530

@@ -51,7 +46,6 @@ def shutdown
5146
@shutdown = true
5247
@buffer_state[:timer].kill
5348
buffer_flush(final: true)
54-
clear_file_buffer
5549
end
5650

5751
private
@@ -73,6 +67,8 @@ def buffer_flush(options = {})
7367
end
7468

7569
items_flushed = 0
70+
max_retries = 5
71+
retries = 0
7672

7773
begin
7874
outgoing_items = []
@@ -101,17 +97,20 @@ def buffer_flush(options = {})
10197
end
10298

10399
begin
100+
@buffer_config[:logger].info("Attempting to flush #{outgoing_items.size} items to the network")
104101
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
105-
clear_flushed_buffer_states(outgoing_items) unless ::File.zero?(@buffer_config[:buffer_file]) # Clear the flushed items from the file
102+
@buffer_config[:logger].info("Successfully flushed #{outgoing_items.size} items to the network")
106103
rescue => e
107104
@buffer_config[:logger].error("Flush failed: #{e.message}")
108-
# Save the items to the file buffer in case of failure
109-
@pending_mutex.synchronize do
110-
@buffer_state[:pending_items] = outgoing_items + @buffer_state[:pending_items]
111-
@buffer_state[:pending_size] += outgoing_size
112-
save_buffer_to_file
105+
@buffer_config[:logger].error(e.backtrace.join("\n"))
106+
retries += 1
107+
if retries <= max_retries
108+
sleep 1
109+
retry
110+
else
111+
@buffer_config[:logger].error("Max retries reached. Data loss may occur.")
112+
raise e
113113
end
114-
raise e
115114
end
116115

117116
@buffer_state[:last_flush] = Time.now.to_i
@@ -129,78 +128,6 @@ def buffer_initialize
129128
@buffer_state[:pending_items] = []
130129
@buffer_state[:pending_size] = 0
131130
end
132-
133-
def clear_flushed_buffer_states(flushed_items)
134-
remaining_buffer_states = []
135-
::File.foreach(@buffer_config[:buffer_file]) do |line|
136-
begin
137-
buffer_state = Marshal.load(line)
138-
buffer_state[:pending_items] -= flushed_items
139-
buffer_state[:pending_size] = buffer_state[:pending_items].sum(&:bytesize)
140-
remaining_buffer_states << buffer_state unless buffer_state[:pending_items].empty?
141-
rescue ArgumentError => e
142-
@buffer_config[:logger].error("Failed to load buffer state: #{e.message}")
143-
next
144-
end
145-
end
146-
147-
::File.open(@buffer_config[:buffer_file], 'w') do |file|
148-
remaining_buffer_states.each do |state|
149-
file.write(Marshal.dump(state) + "\n")
150-
end
151-
end
152-
end
153-
154-
def save_buffer_to_file
155-
buffer_state_copy = @buffer_state.dup
156-
buffer_state_copy.delete(:timer) # Exclude the Thread object from serialization
157-
158-
::FileUtils.mkdir_p(::File.dirname(@buffer_config[:buffer_file])) # Ensure directory exists
159-
::File.open(@buffer_config[:buffer_file], 'a') do |file|
160-
file.write(Marshal.dump(buffer_state_copy) + "\n")
161-
end
162-
@buffer_config[:logger].info("Saved buffer state to file")
163-
end
164-
165-
def load_buffer_from_file
166-
::FileUtils.mkdir_p(::File.dirname(@buffer_config[:buffer_file])) # Ensure directory exists
167-
::File.open(@buffer_config[:buffer_file], 'a') {} # Create the file if it doesn't exist
168-
169-
if ::File.file?(@buffer_config[:buffer_file]) && !::File.zero?(@buffer_config[:buffer_file])
170-
begin
171-
@pending_mutex.synchronize do
172-
buffer_states = []
173-
::File.foreach(@buffer_config[:buffer_file]) do |line|
174-
buffer_states << Marshal.load(line)
175-
end
176-
@buffer_state = buffer_states.reduce do |acc, state|
177-
acc[:pending_items].concat(state[:pending_items])
178-
acc[:pending_size] += state[:pending_size]
179-
acc
180-
end
181-
@buffer_state[:timer] = Thread.new do
182-
loop do
183-
sleep(@buffer_config[:max_interval])
184-
buffer_flush(force: true)
185-
end
186-
end
187-
# Ensure the buffer does not flush immediately upon loading
188-
@buffer_state[:last_flush] = Time.now.to_i
189-
end
190-
@buffer_config[:logger].info("Loaded buffer state from file")
191-
rescue => e
192-
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
193-
buffer_initialize
194-
end
195-
else
196-
buffer_initialize
197-
end
198-
end
199-
200-
def clear_file_buffer
201-
::File.open(@buffer_config[:buffer_file], 'w') {} # Truncate the file
202-
@buffer_config[:logger].info("File buffer cleared on shutdown")
203-
end
204131
end
205132
end
206133
end

spec/outputs/kusto_spec.rb

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,6 @@
9191
end
9292

9393
describe '#flush_buffer' do
94-
it 'handles errors during buffer flushing' do
95-
RSpec.configuration.reporter.message("Running test: handles errors during buffer flushing")
96-
kusto = described_class.new(options)
97-
kusto.register
98-
99-
allow(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).and_raise(StandardError.new("Test error"))
100-
events = [LogStash::Event.new("message" => "test1")]
101-
encoded_events = events.map { |e| [e, e.to_json] }
102-
kusto.multi_receive_encoded(encoded_events)
103-
104-
expect { kusto.flush_buffer(encoded_events) }.not_to raise_error
105-
RSpec.configuration.reporter.message("Completed test: handles errors during buffer flushing")
106-
end
10794

10895
it 'flushes the buffer when max_size is reached' do
10996
RSpec.configuration.reporter.message("Running test: flushes the buffer when max_size is reached")

0 commit comments

Comments
 (0)