Skip to content

Commit 126bd4d

Browse files
authored
Add support for date columns to jdbc static and streaming filters (#171)
Added support for SQL DATE columns to the jdbc static and streaming filters. Before this changes, querying data from tables with DATE columns would produce a missing converter error caused by the RubyDate fields.
1 parent c0061a6 commit 126bd4d

File tree

10 files changed

+179
-40
lines changed

10 files changed

+179
-40
lines changed

.ci/setup.sql

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,10 @@ create database jdbc_streaming_db;
66
create table reference_table (
77
ip VARCHAR(50) NOT NULL,
88
name VARCHAR(50) NOT NULL,
9-
location VARCHAR(50) NOT NULL
9+
location VARCHAR(50) NOT NULL,
10+
entry_date DATE NOT NULL,
11+
entry_time TIME NOT NULL,
12+
timestamp TIMESTAMP NOT NULL
1013
);
1114

1215

@@ -23,7 +26,10 @@ DO $$
2326
INSERT INTO reference_table
2427
VALUES ((SELECT FORMAT(ipTemplate, counter)),
2528
(SELECT FORMAT(nameTemplate, counter)),
26-
(SELECT FORMAT(locationTemplate, counter)));
29+
(SELECT FORMAT(locationTemplate, counter)),
30+
'2003-02-01',
31+
'10:05:00',
32+
'2003-02-01 01:02:03');
2733
counter = counter + 1;
2834
END LOOP;
2935
END $$;
@@ -37,13 +43,16 @@ create database jdbc_static_db;
3743
create table reference_table (
3844
ip VARCHAR(50) NOT NULL,
3945
name VARCHAR(50) NOT NULL,
40-
location VARCHAR(50) NOT NULL
46+
location VARCHAR(50) NOT NULL,
47+
entry_date DATE NOT NULL,
48+
entry_time TIME NOT NULL,
49+
timestamp TIMESTAMP NOT NULL
4150
);
4251

4352

44-
INSERT INTO reference_table VALUES ('10.1.1.1', 'ldn-server-1', 'LDN-2-3-4');
45-
INSERT INTO reference_table VALUES ('10.2.1.1', 'nyc-server-1', 'NYC-5-2-8');
46-
INSERT INTO reference_table VALUES ('10.3.1.1', 'mv-server-1', 'MV-9-6-4');
53+
INSERT INTO reference_table VALUES ('10.1.1.1', 'ldn-server-1', 'LDN-2-3-4','2003-02-01', '10:05:00', '2003-02-01 01:02:03');
54+
INSERT INTO reference_table VALUES ('10.2.1.1', 'nyc-server-1', 'NYC-5-2-8','2003-02-01', '10:05:00', '2003-02-01 01:02:03');
55+
INSERT INTO reference_table VALUES ('10.3.1.1', 'mv-server-1', 'MV-9-6-4', '2003-02-01', '10:05:00', '2003-02-01 01:02:03');
4756

4857
create DATABASE jdbc_input_db;
4958

@@ -52,8 +61,11 @@ create DATABASE jdbc_input_db;
5261
CREATE TABLE employee (
5362
emp_no integer NOT NULL,
5463
first_name VARCHAR (50) NOT NULL,
55-
last_name VARCHAR (50) NOT NULL
64+
last_name VARCHAR (50) NOT NULL,
65+
entry_date DATE NOT NULL,
66+
entry_time TIME NOT NULL,
67+
timestamp TIMESTAMP NOT NULL
5668
);
5769

58-
INSERT INTO employee VALUES (1, 'David', 'Blenkinsop');
59-
INSERT INTO employee VALUES (2, 'Mark', 'Guckenheimer');
70+
INSERT INTO employee VALUES (1, 'David', 'Blenkinsop', '2003-02-01', '10:05:00', '2003-02-01 01:02:03');
71+
INSERT INTO employee VALUES (2, 'Mark', 'Guckenheimer', '2003-02-01', '10:05:00','2003-02-01 01:02:03');

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.5.0
2+
- Feat: add support for SQL `DATE` columns to jdbc static and streaming filters [#171](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/171)
3+
14
## 5.4.11
25
- Fixes an issue in which any one instance of a JDBC input plugin using `jdbc_default_timezone` changes the behaviour of plugin instances that do _not_ use `jdbc_default_timezone`, ensuring that timezone offsets remain consistent for each instance of the plugin _as configured_ [#151](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/151)
36
- Fixes an exception that could occur while reloading `jdbc_static` databases when the underlying connection to the remote has been broken [#165](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/165)

lib/logstash/filters/jdbc/lookup.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
# encoding: utf-8
22
require_relative "lookup_result"
33
require "logstash/util/loggable"
4+
require "logstash/plugin_mixins/jdbc/value_handler"
45

56
module LogStash module Filters module Jdbc
67
class Lookup
78
include LogStash::Util::Loggable
9+
include LogStash::PluginMixins::Jdbc::ValueHandler
810

911
class Sprintfier
1012
def initialize(param)
@@ -134,15 +136,13 @@ def tag_default(event)
134136

135137
def load_data_from_local(local, query, params, result)
136138
local.fetch(query, params).each do |row|
137-
stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
138-
result.push(stringified)
139+
result.push(extract_values_from(row))
139140
end
140141
end
141142

142143
def load_data_from_prepared(_local, _query, params, result)
143144
@prepared_statement.call(params).each do |row|
144-
stringified = row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
145-
result.push(stringified)
145+
result.push(extract_values_from(row))
146146
end
147147
end
148148

lib/logstash/plugin_mixins/jdbc/jdbc.rb

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
require_relative "value_tracking"
77
require_relative "timezone_proxy"
88
require_relative "statement_handler"
9+
require_relative "value_handler"
910

1011
java_import java.util.concurrent.locks.ReentrantLock
1112

1213
# Tentative of abstracting JDBC logic to a mixin
1314
# for potential reuse in other plugins (input/output)
1415
module LogStash module PluginMixins module Jdbc
1516
module Jdbc
17+
include LogStash::PluginMixins::Jdbc::ValueHandler
1618
# This method is called when someone includes this module
1719
def self.included(base)
1820
# Add these methods to the 'base' given.
@@ -252,25 +254,6 @@ def get_column_value(row)
252254
row[@tracking_column.to_sym]
253255
end
254256
end
255-
256-
private
257-
#Stringify row keys and decorate values when necessary
258-
def extract_values_from(row)
259-
Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }]
260-
end
261-
262-
private
263-
def decorate_value(value)
264-
case value
265-
when Time
266-
# transform it to LogStash::Timestamp as required by LS
267-
LogStash::Timestamp.new(value)
268-
when Date, DateTime
269-
LogStash::Timestamp.new(value.to_time)
270-
else
271-
value
272-
end
273-
end
274257
end
275258
end end end
276259

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# encoding: utf-8
2+
require "time"
3+
require "date"
4+
5+
module LogStash module PluginMixins module Jdbc
6+
# Provides functions to extract the row's values, ensuring column types
7+
# are properly decorated to become coercible to a LogStash::Event.
8+
module ValueHandler
9+
# Stringify the row keys and decorate values when necessary
10+
def extract_values_from(row)
11+
Hash[row.map { |k, v| [k.to_s, decorate_value(v)] }]
12+
end
13+
14+
# Decorate the value so it can be used as a LogStash::Event field
15+
def decorate_value(value)
16+
case value
17+
when Date, DateTime
18+
value.to_time
19+
else
20+
value
21+
end
22+
end
23+
end
24+
end end end

lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
# encoding: utf-8
22
require "logstash/util/loggable"
3+
require "logstash/plugin_mixins/jdbc/value_handler"
34

45
module LogStash module PluginMixins module JdbcStreaming
56
# so as to not clash with the class of the same name and function in the jdbc input
67
# this is in the `module JdbcStreaming` namespace
78
# this duplication can be removed in a universal plugin
89

910
class StatementHandler
11+
include LogStash::PluginMixins::Jdbc::ValueHandler
12+
1013
def self.build_statement_handler(plugin)
1114
klass = plugin.use_prepared_statements ? PreparedStatementHandler : NormalStatementHandler
1215
klass.new(plugin)
@@ -86,7 +89,7 @@ def cache_lookup(db, event)
8689
def execute_extract_records(db, params, result)
8790
dataset = db[statement, params] # returns a Sequel dataset
8891
dataset.all do |row|
89-
result.push row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} # Stringify row keys
92+
result.push extract_values_from(row)
9093
end
9194
end
9295

@@ -113,7 +116,7 @@ def cache_lookup(db, event)
113116
def execute_extract_records(db, params, result)
114117
records = db.call(name, params) # returns an array of hashes
115118
records.each do |row|
116-
result.push row.inject({}){|hash,(k,v)| hash[k.to_s] = v; hash} #Stringify row keys
119+
result.push extract_values_from(row)
117120
end
118121
end
119122

logstash-integration-jdbc.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-jdbc'
3-
s.version = '5.4.11'
3+
s.version = '5.5.0'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Integration with JDBC - input and filter plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/filters/integration/jdbc_static_spec.rb

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,45 @@ module LogStash module Filters
116116
plugin.filter(event)
117117
expect(event.get("server")).to eq([{"ip"=>"10.3.1.1", "name"=>"mv-server-1", "location"=>"MV-9-6-4"}])
118118
end
119+
120+
context 'and record with temporal columns' do
121+
let(:loader_statement) { "SELECT ip, name, location, entry_date, entry_time, timestamp FROM reference_table" }
122+
let(:local_db_objects) do
123+
[
124+
{
125+
"name" => "servers",
126+
"columns" => [
127+
%w[ip varchar(64)],
128+
%w[name varchar(64)],
129+
%w[location varchar(64)],
130+
%w[entry_date date],
131+
%w[entry_time time],
132+
%w[timestamp timestamp]
133+
]
134+
},
135+
]
136+
end
137+
138+
before(:each) { plugin.register }
139+
140+
subject { event.get("server").first }
141+
142+
it "maps the DATE to a Logstash Timestamp" do
143+
plugin.filter(event)
144+
expect(subject['entry_date']).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1)))
145+
end
146+
147+
it "maps the TIME field to a Logstash Timestamp" do
148+
plugin.filter(event)
149+
now = DateTime.now
150+
expect(subject['entry_time']).to eq(LogStash::Timestamp.new(Time.new(now.year, now.month, now.day, 10, 5, 0)))
151+
end
152+
153+
it "maps the TIMESTAMP to a Logstash Timestamp" do
154+
plugin.filter(event)
155+
expect(subject['timestamp']).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1, 1, 2, 3)))
156+
end
157+
end
119158
end
120159

121160
context "under normal conditions when index_columns is not specified" do

spec/filters/integration/jdbcstreaming_spec.rb

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,32 @@ class TestJdbcStreaming < JdbcStreaming
6363
end
6464
end
6565

66-
describe "In Prepared Statement mode, found record - uses row" do
66+
describe 'found record with temporal columns' do
67+
let(:idx) { 200 }
68+
let(:statement) { "SELECT entry_date, entry_time, timestamp FROM reference_table WHERE ip = :ip" }
69+
70+
before(:each) { plugin.register }
71+
72+
subject { event.get("server").first }
73+
74+
it "maps the DATE to a Logstash Timestamp" do
75+
plugin.filter(event)
76+
expect(subject['entry_date']).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1)))
77+
end
78+
79+
it "maps the TIME field to a Logstash Timestamp" do
80+
plugin.filter(event)
81+
now = DateTime.now
82+
expect(subject['entry_time']).to eq(LogStash::Timestamp.new(Time.new(now.year, now.month, now.day, 10, 5, 0)))
83+
end
84+
85+
it "maps the TIMESTAMP to a Logstash Timestamp" do
86+
plugin.filter(event)
87+
expect(subject['timestamp']).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1, 1, 2, 3)))
88+
end
89+
end
90+
91+
context 'prepared statement mode' do
6792
let(:idx) { 200 }
6893
let(:statement) { "SELECT name, location FROM reference_table WHERE ip = ?" }
6994
let(:settings) do
@@ -82,10 +107,37 @@ class TestJdbcStreaming < JdbcStreaming
82107
"sequel_opts" => {"pool_timeout" => 600}
83108
}
84109
end
85-
it "fills in the target" do
86-
plugin.filter(event)
87-
expect(event.get("server")).to eq([{"name" => "ldn-server-#{idx}", "location" => "LDN-#{idx}-2-3"}])
88-
expect((event.get("tags") || []) & ["lookup_failed", "default_used_instead"]).to be_empty
110+
111+
describe "found record - uses row" do
112+
it "fills in the target" do
113+
plugin.filter(event)
114+
expect(event.get("server")).to eq([{"name" => "ldn-server-#{idx}", "location" => "LDN-#{idx}-2-3"}])
115+
expect((event.get("tags") || []) & ["lookup_failed", "default_used_instead"]).to be_empty
116+
end
117+
end
118+
119+
describe 'found record with temporal columns' do
120+
let(:statement) { "SELECT entry_date, entry_time, timestamp FROM reference_table WHERE ip = ?" }
121+
122+
before(:each) { plugin.register }
123+
124+
subject { event.get("server").first }
125+
126+
it "maps the DATE to a Logstash Timestamp" do
127+
plugin.filter(event)
128+
expect(subject['entry_date']).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1)))
129+
end
130+
131+
it "maps the TIME field to a Logstash Timestamp" do
132+
plugin.filter(event)
133+
now = DateTime.now
134+
expect(subject['entry_time']).to eq(LogStash::Timestamp.new(Time.new(now.year, now.month, now.day, 10, 5, 0)))
135+
end
136+
137+
it "maps the TIMESTAMP to a Logstash Timestamp" do
138+
plugin.filter(event)
139+
expect(subject['timestamp']).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1, 1, 2, 3)))
140+
end
89141
end
90142
end
91143

spec/inputs/integration/integ_spec.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,29 @@
6666
expect(event.get('first_name')).to eq('David')
6767
end
6868
end
69+
70+
context 'with temporal columns' do
71+
let(:settings) do
72+
super().merge("statement" => 'SELECT ENTRY_DATE, ENTRY_TIME, TIMESTAMP FROM "employee" WHERE EMP_NO = 2')
73+
end
74+
75+
before(:each) { plugin.run(queue) }
76+
77+
subject(:event) { queue.pop }
78+
79+
it "maps the DATE to a Logstash Timestamp" do
80+
expect(event.get('entry_date')).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1)))
81+
end
82+
83+
it "maps the TIME field to a Logstash Timestamp" do
84+
now = DateTime.now
85+
expect(event.get('entry_time')).to eq(LogStash::Timestamp.new(Time.new(now.year, now.month, now.day, 10, 5, 0)))
86+
end
87+
88+
it "maps the TIMESTAMP to a Logstash Timestamp" do
89+
expect(event.get('timestamp')).to eq(LogStash::Timestamp.new(Time.new(2003, 2, 1, 1, 2, 3)))
90+
end
91+
end
6992
end
7093

7194
context "when supplying a non-existent library" do

0 commit comments

Comments
 (0)