Skip to content

Commit edc0b4d

Browse files
authored
Merge pull request #97 from gas-buddy/eherot/preserve_millisecond_precision
Preserve millisecond precision and time zones.
2 parents f0ebf58 + a9b6aa6 commit edc0b4d

File tree

4 files changed

+145
-12
lines changed

4 files changed

+145
-12
lines changed

lib/fluent/plugin/in_sql.rb

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,16 @@ class TableElement
6060
config_param :time_column, :string, default: nil
6161
config_param :primary_key, :string, default: nil
6262

63+
attr_reader :log
64+
6365
def configure(conf)
6466
super
6567
end
6668

67-
def init(tag_prefix, base_model, router)
69+
def init(tag_prefix, base_model, router, log)
6870
@router = router
6971
@tag = "#{tag_prefix}.#{@tag}" if tag_prefix
72+
@log = log
7073

7174
# creates a model for this table
7275
table_name = @table
@@ -108,6 +111,17 @@ def read_attribute_for_serialization(n)
108111
end
109112
end
110113

114+
# Make sure we always have a Fluent::EventTime object regardless of what comes in
115+
def normalized_time(tv, now)
116+
return Fluent::EventTime.from_time(tv) if tv.is_a?(Time)
117+
begin
118+
Fluent::EventTime.parse(tv.to_s)
119+
rescue
120+
log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})"
121+
now
122+
end
123+
end
124+
111125
# emits next records and returns the last record of emitted records
112126
def emit_next_records(last_record, limit)
113127
relation = @model
@@ -123,15 +137,13 @@ def emit_next_records(last_record, limit)
123137
relation.each do |obj|
124138
record = obj.serializable_hash rescue nil
125139
if record
126-
if @time_column && tv = obj.read_attribute(@time_column)
127-
if tv.is_a?(Time)
128-
time = tv.to_i
140+
time =
141+
if @time_column && (tv = obj.read_attribute(@time_column))
142+
normalized_time(tv, now)
129143
else
130-
time = Time.parse(tv.to_s).to_i rescue now
144+
now
131145
end
132-
else
133-
time = now
134-
end
146+
135147
me.add(time, record)
136148
last_record = record
137149
end
@@ -217,7 +229,7 @@ def start
217229
# ignore tables if TableElement#init failed
218230
@tables.reject! do |te|
219231
begin
220-
te.init(@tag_prefix, @base_model, router)
232+
te.init(@tag_prefix, @base_model, router, log)
221233
log.info "Selecting '#{te.table}' table"
222234
false
223235
rescue => e

test/fixtures/schema.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,12 @@
2020
t.datetime "created_at", null: false
2121
t.datetime "updated_at", null: false
2222
end
23+
24+
create_table "messages_custom_time", force: :cascade do |t|
25+
t.string "message"
26+
t.datetime "created_at", null: false
27+
t.datetime "updated_at", null: false
28+
t.string "custom_time"
29+
end
2330
end
2431

test/plugin/test_in_sql.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,9 @@ def test_message
8282
[d.events[2][1], "message 3"],
8383
]
8484
actual = [
85-
[Time.parse(d.events[0][2]["updated_at"]).to_i, d.events[0][2]["message"]],
86-
[Time.parse(d.events[1][2]["updated_at"]).to_i, d.events[1][2]["message"]],
87-
[Time.parse(d.events[2][2]["updated_at"]).to_i, d.events[2][2]["message"]],
85+
[Fluent::EventTime.parse(d.events[0][2]["updated_at"]), d.events[0][2]["message"]],
86+
[Fluent::EventTime.parse(d.events[1][2]["updated_at"]), d.events[1][2]["message"]],
87+
[Fluent::EventTime.parse(d.events[2][2]["updated_at"]), d.events[2][2]["message"]],
8888
]
8989
assert_equal(expected, actual)
9090
end
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
require "helper"
2+
require "fluent/test/driver/input"
3+
4+
class SqlInputCustomTimeTest < Test::Unit::TestCase
5+
def setup
6+
Fluent::Test.setup
7+
end
8+
9+
def teardown
10+
end
11+
12+
CONFIG = %[
13+
adapter postgresql
14+
host localhost
15+
port 5432
16+
database fluentd_test
17+
18+
username fluentd
19+
password fluentd
20+
21+
schema_search_path public
22+
23+
tag_prefix db
24+
25+
<table>
26+
table messages_custom_time
27+
tag logs
28+
update_column updated_at
29+
time_column custom_time
30+
</table>
31+
]
32+
33+
def create_driver(conf = CONFIG)
34+
Fluent::Test::Driver::Input.new(Fluent::Plugin::SQLInput).configure(conf)
35+
end
36+
37+
def test_configure
38+
d = create_driver
39+
expected = {
40+
host: "localhost",
41+
port: 5432,
42+
adapter: "postgresql",
43+
database: "fluentd_test",
44+
username: "fluentd",
45+
password: "fluentd",
46+
schema_search_path: "public",
47+
tag_prefix: "db"
48+
}
49+
actual = {
50+
host: d.instance.host,
51+
port: d.instance.port,
52+
adapter: d.instance.adapter,
53+
database: d.instance.database,
54+
username: d.instance.username,
55+
password: d.instance.password,
56+
schema_search_path: d.instance.schema_search_path,
57+
tag_prefix: d.instance.tag_prefix
58+
}
59+
assert_equal(expected, actual)
60+
tables = d.instance.instance_variable_get(:@tables)
61+
assert_equal(1, tables.size)
62+
messages_custom_time = tables.first
63+
assert_equal("messages_custom_time", messages_custom_time.table)
64+
assert_equal("logs", messages_custom_time.tag)
65+
end
66+
67+
def test_message
68+
d = create_driver(CONFIG + "select_interval 1")
69+
70+
start_time = Fluent::EventTime.now
71+
72+
# Create one message with a valid timestamp containing milliseconds and a time zone
73+
Message.create!(message: "message 1", custom_time: '2020-08-27 15:00:16.100758000 -0400')
74+
75+
# Create one message without a timestamp so that we can test auto-creation
76+
Message.create!(message: "message 2 (no timestamp)", custom_time: nil)
77+
78+
# Create one message with an unparseable timestamp so that we can check that a valid
79+
# one is auto-generated.
80+
Message.create!(message: "message 3 (bad timestamp)", custom_time: 'foo')
81+
82+
d.end_if do
83+
d.record_count >= 3
84+
end
85+
d.run(timeout: 5)
86+
87+
assert_equal("db.logs", d.events[0][0])
88+
expected = [
89+
[d.events[0][1], "message 1"],
90+
[d.events[1][1], "message 2 (no timestamp)"],
91+
[d.events[2][1], "message 3 (bad timestamp)"],
92+
]
93+
94+
actual = [
95+
[Fluent::EventTime.parse(d.events[0][2]["custom_time"]), d.events[0][2]["message"]],
96+
d.events[1][2]["message"],
97+
d.events[2][2]["message"],
98+
]
99+
100+
assert_equal(expected[0], actual[0])
101+
102+
# Messages 2 and 3 should have the same messages but (usually) a slightly later
103+
# timestamps because they are generated by the input plugin instead of the test
104+
# code
105+
[1,2].each do |i|
106+
assert_equal(expected[i][1], actual[i])
107+
assert_operator(expected[i][0], :>=, start_time)
108+
end
109+
end
110+
111+
class Message < ActiveRecord::Base
112+
self.table_name = "messages_custom_time"
113+
end
114+
end

0 commit comments

Comments
 (0)