Skip to content
This repository was archived by the owner on Sep 17, 2019. It is now read-only.

Commit a6c669c

Browse files
committed
Adds unsafe_statement support
1 parent e615829 commit a6c669c

File tree

2 files changed

+79
-46
lines changed

2 files changed

+79
-46
lines changed

lib/logstash/outputs/jdbc.rb

Lines changed: 78 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ class LogStash::Outputs::Jdbc < LogStash::Outputs::Base
3030
# [ "insert into table (message) values(?)", "%{message}" ]
3131
config :statement, :validate => :array, :required => true
3232

33+
# If this is an unsafe statement, use event.sprintf
34+
# This also has potential performance penalties due to having to create a
35+
# new statement for each event, rather than adding to the batch and issuing
36+
# multiple inserts in 1 go
37+
config :unsafe_statement, :validate => :boolean, :default => false
38+
3339
# Number of connections in the pool to maintain
3440
config :max_pool_size, :validate => :number, :default => 5
3541

@@ -72,7 +78,7 @@ def register
7278

7379
@pool = Java::ComZaxxerHikari::HikariDataSource.new
7480
@pool.setJdbcUrl(@connection_string)
75-
81+
7682
@pool.setUsername(@username) if @username
7783
@pool.setPassword(@password) if @password
7884

@@ -105,50 +111,10 @@ def receive(event)
105111
end
106112

107113
def flush(events, teardown=false)
108-
connection = @pool.getConnection()
109-
110-
statement = connection.prepareStatement(@statement[0])
111-
112-
events.each do |event|
113-
next if @statement.length < 2
114-
115-
@statement[1..-1].each_with_index do |i, idx|
116-
case event[i]
117-
when Time, LogStash::Timestamp
118-
# Most reliable solution, cross JDBC driver
119-
statement.setString(idx + 1, event[i].iso8601())
120-
when Fixnum, Integer
121-
statement.setInt(idx + 1, event[i])
122-
when Float
123-
statement.setFloat(idx + 1, event[i])
124-
when String
125-
statement.setString(idx + 1, event[i])
126-
when true
127-
statement.setBoolean(idx + 1, true)
128-
when false
129-
statement.setBoolean(idx + 1, false)
130-
else
131-
statement.setString(idx + 1, event.sprintf(i))
132-
end
133-
end
134-
135-
statement.addBatch()
136-
end
137-
138-
begin
139-
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
140-
statement.executeBatch()
141-
statement.close()
142-
rescue => e
143-
# Raising an exception will incur a retry from Stud::Buffer.
144-
# Since the exceutebatch failed this should mean any events failed to be
145-
# inserted will be re-run. We're going to log it for the lols anyway.
146-
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
147-
if e.getNextException() != nil
148-
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
149-
end
150-
ensure
151-
connection.close();
114+
if @unsafe_statement == true
115+
unsafe_flush(events, teardown)
116+
else
117+
safe_flush(events, teardown)
152118
end
153119
end
154120

@@ -204,4 +170,71 @@ def load_jar_files!
204170
require jar
205171
end
206172
end
173+
174+
def safe_flush(events, teardown=false)
175+
connection = @pool.getConnection()
176+
177+
statement = connection.prepareStatement(@statement[0])
178+
179+
events.each do |event|
180+
next if @statement.length < 2
181+
statement = add_statement_event_params(statement, event)
182+
183+
statement.addBatch()
184+
end
185+
186+
begin
187+
@logger.debug("JDBC - Sending SQL", :sql => statement.toString())
188+
statement.executeBatch()
189+
statement.close()
190+
rescue => e
191+
# Raising an exception will incur a retry from Stud::Buffer.
192+
# Since the exceutebatch failed this should mean any events failed to be
193+
# inserted will be re-run. We're going to log it for the lols anyway.
194+
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e)
195+
if e.getNextException() != nil
196+
@logger.warn("JDBC - Exception. Will automatically retry", :exception => e.getNextException())
197+
end
198+
ensure
199+
connection.close();
200+
end
201+
end
202+
203+
def unsafe_flush(events, teardown=false)
204+
connection = @pool.getConnection()
205+
206+
events.each do |event|
207+
statement = connection.prepareStatement(event.sprintf(@statement[0]))
208+
209+
statement = add_statement_event_params(statement, event) if @statement.length > 1
210+
211+
statement.execute()
212+
statement.close()
213+
connection.close()
214+
end
215+
end
216+
217+
def add_statement_event_params(statement, event)
218+
@statement[1..-1].each_with_index do |i, idx|
219+
case event[i]
220+
when Time, LogStash::Timestamp
221+
# Most reliable solution, cross JDBC driver
222+
statement.setString(idx + 1, event[i].iso8601())
223+
when Fixnum, Integer
224+
statement.setInt(idx + 1, event[i])
225+
when Float
226+
statement.setFloat(idx + 1, event[i])
227+
when String
228+
statement.setString(idx + 1, event[i])
229+
when true
230+
statement.setBoolean(idx + 1, true)
231+
when false
232+
statement.setBoolean(idx + 1, false)
233+
else
234+
statement.setString(idx + 1, event.sprintf(i))
235+
end
236+
end
237+
238+
statement
239+
end
207240
end # class LogStash::Outputs::jdbc

logstash-output-jdbc.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-jdbc'
3-
s.version = "0.2.0.rc1"
3+
s.version = "0.2.0.rc2"
44
s.licenses = [ "Apache License (2.0)" ]
55
s.summary = "This plugin allows you to output to SQL, via JDBC"
66
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)