|
1 | 1 | require "logstash/devutils/rspec/spec_helper" |
2 | 2 | require "logstash/outputs/jdbc" |
3 | 3 | require "stud/temporary" |
| 4 | +require "java" |
4 | 5 |
|
5 | 6 | describe LogStash::Outputs::Jdbc do |
| 7 | + def fetch_log_table_rowcount |
| 8 | + # sleep for a second to let the flush happen |
| 9 | + sleep 1 |
| 10 | + |
| 11 | + stmt = @sql.createStatement() |
| 12 | + rs = stmt.executeQuery("select count(*) as total from log") |
| 13 | + count = 0 |
| 14 | + while rs.next() |
| 15 | + count = rs.getInt("total") |
| 16 | + end |
| 17 | + stmt.close() |
| 18 | + |
| 19 | + return count |
| 20 | + end |
| 21 | + |
| 22 | + let(:base_settings) { { |
| 23 | + "driver_jar_path" => @driver_jar_path, |
| 24 | + "connection_string" => @test_connection_string, |
| 25 | + "username" => ENV['SQL_USERNAME'], |
| 26 | + "password" => ENV['SQL_PASSWORD'], |
| 27 | + "statement" => [ "insert into log (message) values(?)", "message" ], |
| 28 | + "max_pool_size" => 1, |
| 29 | + "flush_size" => 1, |
| 30 | + "max_flush_exceptions" => 1 |
| 31 | + } } |
| 32 | + let(:test_settings) { {} } |
| 33 | + let(:plugin) { LogStash::Outputs::Jdbc.new(base_settings.merge(test_settings)) } |
| 34 | + let(:event_fields) { { "message" => "This is a message!" } } |
| 35 | + let(:event) { LogStash::Event.new(event_fields) } |
| 36 | + |
| 37 | + before(:all) do |
| 38 | + @driver_jar_path = File.absolute_path(ENV['SQL_JAR']) |
| 39 | + @test_db_path = File.join(Stud::Temporary.directory, "test.db") |
| 40 | + @test_connection_string = "jdbc:sqlite:#{@test_db_path}" |
| 41 | + |
| 42 | + require @driver_jar_path |
| 43 | + |
| 44 | + @sql = java.sql.DriverManager.get_connection(@test_connection_string, ENV['SQL_USERNAME'].to_s, ENV['SQL_PASSWORD'].to_s) |
| 45 | + stmt = @sql.createStatement() |
| 46 | + stmt.executeUpdate("CREATE table log (host text, timestamp datetime, message text);") |
| 47 | + stmt.close() |
| 48 | + end |
| 49 | + |
| 50 | + before(:each) do |
| 51 | + stmt = @sql.createStatement() |
| 52 | + stmt.executeUpdate("delete from log") |
| 53 | + stmt.close() |
| 54 | + end |
| 55 | + |
| 56 | + after(:all) do |
| 57 | + File.unlink(@test_db_path) |
| 58 | + Dir.rmdir(File.dirname(@test_db_path)) |
| 59 | + end |
| 60 | + |
| 61 | + describe "safe statement" do |
| 62 | + it "should register without errors" do |
| 63 | + expect { plugin.register }.to_not raise_error |
| 64 | + end |
| 65 | + |
| 66 | + it "receive event, without error" do |
| 67 | + plugin.register |
| 68 | + expect { plugin.receive(event) }.to_not raise_error |
| 69 | + |
| 70 | + expect(fetch_log_table_rowcount).to eq(1) |
| 71 | + end |
| 72 | + |
| 73 | + end |
| 74 | + |
| 75 | + describe "unsafe statement" do |
| 76 | + let(:event_fields) { |
| 77 | + { "message" => "This is a message!", "table" => "log" } |
| 78 | + } |
| 79 | + let(:test_settings) { { |
| 80 | + "statement" => [ "insert into %{table} (message) values(?)", "message" ], |
| 81 | + "unsafe_statement" => true |
| 82 | + } } |
| 83 | + |
| 84 | + it "should register without errors" do |
| 85 | + expect { plugin.register }.to_not raise_error |
| 86 | + end |
6 | 87 |
|
7 | | - it "should register without errors" do |
8 | | - plugin = LogStash::Plugin.lookup("output", "jdbc").new({}) |
9 | | - expect { plugin.register }.to_not raise_error |
| 88 | + it "receive event, without error" do |
| 89 | + plugin.register |
| 90 | + plugin.receive(event) |
| 91 | + expect(fetch_log_table_rowcount).to eq(1) |
| 92 | + end |
10 | 93 |
|
11 | 94 | end |
12 | | - |
13 | 95 | end |
0 commit comments