Skip to content

Commit b7aba1a

Browse files
committed
Adds temp file buffer used during network downtime
1 parent 379da3b commit b7aba1a

File tree

4 files changed

+160
-29
lines changed

4 files changed

+160
-29
lines changed

lib/logstash/outputs/kusto.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
7777
default :codec, 'json_lines'
7878

7979
def register
80-
# Initialize the custom buffer with size and interval
81-
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
80+
# Set buffer_file to a valid file path
81+
buffer_file = File.join(Dir.pwd, 'buffer', 'kusto_buffer.dat')
82+
83+
# Ensure the buffer directory exists
84+
FileUtils.mkdir_p(File.dirname(buffer_file))
85+
# Initialize the custom buffer with size, interval, and buffer file
86+
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, buffer_file) do |events|
8287
flush_buffer(events)
8388
end
8489

lib/logstash/outputs/kusto/custom_size_based_buffer.rb

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
require 'logger'
22
require 'thread'
3+
require 'fileutils'
34

45
module LogStash
56
module Outputs
67
class CustomSizeBasedBuffer
7-
def initialize(max_size_mb, max_interval, &flush_callback)
8+
def initialize(max_size_mb, max_interval, buffer_file, &flush_callback)
9+
raise ArgumentError, "buffer_file cannot be nil" if buffer_file.nil?
10+
811
@buffer_config = {
912
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
1013
max_interval: max_interval,
14+
buffer_file: buffer_file,
1115
logger: Logger.new(STDOUT)
1216
}
1317
@buffer_state = {
@@ -25,6 +29,7 @@ def initialize(max_size_mb, max_interval, &flush_callback)
2529
@shutdown = false
2630
@pending_mutex = Mutex.new
2731
@flush_mutex = Mutex.new
32+
load_buffer_from_file
2833
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
2934
end
3035

@@ -46,6 +51,7 @@ def shutdown
4651
@shutdown = true
4752
@buffer_state[:timer].kill
4853
buffer_flush(final: true)
54+
clear_file_buffer
4955
end
5056

5157
private
@@ -69,30 +75,49 @@ def buffer_flush(options = {})
6975
items_flushed = 0
7076

7177
begin
78+
outgoing_items = []
79+
outgoing_size = 0
80+
7281
@pending_mutex.synchronize do
7382
return 0 if @buffer_state[:pending_size] == 0
7483

7584
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
7685

77-
return 0 if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
86+
if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
87+
return 0
88+
end
7889

7990
if force
8091
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
8192
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
8293
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
94+
else
95+
@buffer_config[:logger].info("Flush triggered without specific condition")
8396
end
8497

8598
outgoing_items = @buffer_state[:pending_items].dup
8699
outgoing_size = @buffer_state[:pending_size]
87100
buffer_initialize
101+
end
88102

103+
begin
89104
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
105+
clear_flushed_buffer_states(outgoing_items) unless ::File.zero?(@buffer_config[:buffer_file]) # Clear the flushed items from the file
106+
rescue => e
107+
@buffer_config[:logger].error("Flush failed: #{e.message}")
108+
# Save the items to the file buffer in case of failure
109+
@pending_mutex.synchronize do
110+
@buffer_state[:pending_items] = outgoing_items + @buffer_state[:pending_items]
111+
@buffer_state[:pending_size] += outgoing_size
112+
save_buffer_to_file
113+
end
114+
raise e
115+
end
90116

91-
@buffer_state[:last_flush] = Time.now.to_i
92-
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")
117+
@buffer_state[:last_flush] = Time.now.to_i
118+
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")
93119

94-
items_flushed = outgoing_items.size
95-
end
120+
items_flushed = outgoing_items.size
96121
ensure
97122
@flush_mutex.unlock
98123
end
@@ -104,6 +129,78 @@ def buffer_initialize
104129
@buffer_state[:pending_items] = []
105130
@buffer_state[:pending_size] = 0
106131
end
132+
133+
def clear_flushed_buffer_states(flushed_items)
134+
remaining_buffer_states = []
135+
::File.foreach(@buffer_config[:buffer_file]) do |line|
136+
begin
137+
buffer_state = Marshal.load(line)
138+
buffer_state[:pending_items] -= flushed_items
139+
buffer_state[:pending_size] = buffer_state[:pending_items].sum(&:bytesize)
140+
remaining_buffer_states << buffer_state unless buffer_state[:pending_items].empty?
141+
rescue ArgumentError => e
142+
@buffer_config[:logger].error("Failed to load buffer state: #{e.message}")
143+
next
144+
end
145+
end
146+
147+
::File.open(@buffer_config[:buffer_file], 'w') do |file|
148+
remaining_buffer_states.each do |state|
149+
file.write(Marshal.dump(state) + "\n")
150+
end
151+
end
152+
end
153+
154+
def save_buffer_to_file
155+
buffer_state_copy = @buffer_state.dup
156+
buffer_state_copy.delete(:timer) # Exclude the Thread object from serialization
157+
158+
::FileUtils.mkdir_p(::File.dirname(@buffer_config[:buffer_file])) # Ensure directory exists
159+
::File.open(@buffer_config[:buffer_file], 'a') do |file|
160+
file.write(Marshal.dump(buffer_state_copy) + "\n")
161+
end
162+
@buffer_config[:logger].info("Saved buffer state to file")
163+
end
164+
165+
def load_buffer_from_file
166+
::FileUtils.mkdir_p(::File.dirname(@buffer_config[:buffer_file])) # Ensure directory exists
167+
::File.open(@buffer_config[:buffer_file], 'a') {} # Create the file if it doesn't exist
168+
169+
if ::File.file?(@buffer_config[:buffer_file]) && !::File.zero?(@buffer_config[:buffer_file])
170+
begin
171+
@pending_mutex.synchronize do
172+
buffer_states = []
173+
::File.foreach(@buffer_config[:buffer_file]) do |line|
174+
buffer_states << Marshal.load(line)
175+
end
176+
@buffer_state = buffer_states.reduce do |acc, state|
177+
acc[:pending_items].concat(state[:pending_items])
178+
acc[:pending_size] += state[:pending_size]
179+
acc
180+
end
181+
@buffer_state[:timer] = Thread.new do
182+
loop do
183+
sleep(@buffer_config[:max_interval])
184+
buffer_flush(force: true)
185+
end
186+
end
187+
# Ensure the buffer does not flush immediately upon loading
188+
@buffer_state[:last_flush] = Time.now.to_i
189+
end
190+
@buffer_config[:logger].info("Loaded buffer state from file")
191+
rescue => e
192+
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
193+
buffer_initialize
194+
end
195+
else
196+
buffer_initialize
197+
end
198+
end
199+
200+
def clear_file_buffer
201+
::File.open(@buffer_config[:buffer_file], 'w') {} # Truncate the file
202+
@buffer_config[:logger].info("File buffer cleared on shutdown")
203+
end
107204
end
108205
end
109206
end

lib/logstash/outputs/kusto/ingestor.rb

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,15 @@ def upload_async(data)
126126

127127
@workers_pool.post do
128128
LogStash::Util.set_thread_name("Kusto to ingest data")
129-
upload(data)
129+
begin
130+
upload(data)
131+
rescue => e
132+
@logger.error('Error during async upload.', exception: e.class, message: e.message, backtrace: e.backtrace)
133+
raise e
134+
end
130135
end
131136
rescue Exception => e
132-
@logger.error('StandardError.', exception: e.class, message: e.message, backtrace: e.backtrace)
137+
@logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace)
133138
raise e
134139
end
135140

@@ -150,21 +155,30 @@ def upload(data)
150155
# local_ingestion_properties.addJsonMappingName(json_mapping)
151156
# end
152157

158+
153159
if data.size > 0
154-
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
155-
@kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
156-
else
157-
@logger.warn("Data is empty and is not ingested.")
158-
end
159-
@logger.debug("Data sent to Kusto.")
160-
rescue => e
161-
# When the retry limit is reached or another error happen we will wait and retry.
160+
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
161+
@kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
162+
else
163+
@logger.warn("Data is empty and is not ingested.")
164+
end
165+
@logger.debug("Data sent to Kusto.")
166+
rescue => e
167+
# When the retry limit is reached or another error happens we will wait and retry.
162168
#
163-
# Thread might be stuck here, but I think its better than losing anything
164-
# its either a transient errors or something bad really happened.
169+
# Thread might be stuck here, but I think it's better than losing anything
170+
# it's either a transient error or something bad really happened.
165171
@logger.error('Uploading failed, retrying.', exception: e.class, message: e.message, backtrace: e.backtrace)
166-
sleep RETRY_DELAY_SECONDS
167-
retry
172+
retry_count = 0
173+
max_retries = 5
174+
begin
175+
sleep (2 ** retry_count) * RETRY_DELAY_SECONDS
176+
retry_count += 1
177+
retry if retry_count <= max_retries
178+
rescue => retry_error
179+
@logger.error('Retrying failed.', exception: retry_error.class, message: retry_error.message, backtrace: retry_error.backtrace)
180+
raise retry_error
181+
end
168182
end
169183

170184
def stop

spec/outputs/kusto_spec.rb

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,23 @@
5050

5151
events = [LogStash::Event.new("message" => "test1"), LogStash::Event.new("message" => "test2")]
5252
encoded_events = events.map { |e| [e, e.to_json] }
53-
kusto.multi_receive_encoded(encoded_events)
5453

54+
# Temporarily disable automatic flushing for the test
5555
buffer = kusto.instance_variable_get(:@buffer)
56-
expect(buffer.instance_variable_get(:@buffer_state)[:pending_items].size).to eq(2)
56+
allow(buffer).to receive(:buffer_flush)
57+
58+
# Clear the buffer before the test
59+
buffer.instance_variable_set(:@buffer_state, { pending_items: [], pending_size: 0, last_flush: Time.now.to_i })
60+
61+
kusto.multi_receive_encoded(encoded_events)
62+
63+
pending_items = buffer.instance_variable_get(:@buffer_state)[:pending_items]
64+
RSpec.configuration.reporter.message("Pending items in buffer: #{pending_items.inspect}")
65+
66+
expect(pending_items.size).to eq(2)
5767
RSpec.configuration.reporter.message("Completed test: processes events and adds them to the buffer")
5868
end
59-
69+
6070
it 'handles errors during event processing' do
6171
RSpec.configuration.reporter.message("Running test: handles errors during event processing")
6272
kusto = described_class.new(options)
@@ -102,9 +112,14 @@
102112

103113
events = [LogStash::Event.new("message" => "test1")]
104114
encoded_events = events.map { |e| [e, e.to_json] }
105-
expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything)
115+
# Ensure upload_async is called only once
116+
expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything).once
106117
kusto.multi_receive_encoded(encoded_events)
107-
kusto.flush_buffer(encoded_events) # Pass the encoded events
118+
119+
# Trigger the buffer flush manually
120+
buffer = kusto.instance_variable_get(:@buffer)
121+
buffer.send(:buffer_flush, force: true)
122+
108123
RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_size is reached")
109124
end
110125

@@ -119,7 +134,7 @@
119134
sleep(2) # Wait for the interval to pass
120135

121136
expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything)
122-
kusto.flush_buffer(encoded_events) # Pass the encoded events
137+
kusto.flush_buffer(encoded_events) # Pass the encoded events here
123138
RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_interval is reached")
124139
end
125140

@@ -136,7 +151,7 @@
136151
sleep(2)
137152

138153
expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything)
139-
kusto.flush_buffer(encoded_events) # Pass the encoded events
154+
kusto.flush_buffer(encoded_events) # Pass the encoded events here
140155
RSpec.configuration.reporter.message("Completed test: eventually flushes without receiving additional events based on max_interval")
141156
end
142157
end

0 commit comments

Comments
 (0)