Skip to content

Commit bab7664

Browse files
committed
Always send Fluent::EventTime to MultiEventStream.add (Probably fixes #63)
Sending a native Fluent timestamp instead of an integer prevents the loss of time zone information and millisecond-level precision. Currently it seems like the only cases where milliseconds and proper timezones would end up in emitted events is in cases where there was no timestamp on the input stream at all Signed-off-by: Eric Herot <[email protected]>
1 parent 57ce2d5 commit bab7664

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

lib/fluent/plugin/in_sql.rb

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ def read_attribute_for_serialization(n)
108108
end
109109
end
110110

111+
# Make sure we always have a Fluent::EventTime object regardless of what comes in
112+
def normalized_time(tv, now)
113+
return Fluent::EventTime.from_time(tv) if tv.is_a?(Time)
114+
begin
115+
Fluent::EventTime.parse(tv.to_s)
116+
rescue
117+
$log.warn "Message contains invalid timestamp, using current time instead (#{now.inspect})"
118+
now
119+
end
120+
end
121+
111122
# emits next records and returns the last record of emitted records
112123
def emit_next_records(last_record, limit)
113124
relation = @model
@@ -123,15 +134,13 @@ def emit_next_records(last_record, limit)
123134
relation.each do |obj|
124135
record = obj.serializable_hash rescue nil
125136
if record
126-
if @time_column && tv = obj.read_attribute(@time_column)
127-
if tv.is_a?(Time)
128-
time = tv.to_i
137+
time =
138+
if @time_column && (tv = obj.read_attribute(@time_column))
139+
normalized_time(tv, now)
129140
else
130-
time = Time.parse(tv.to_s).to_i rescue now
141+
now
131142
end
132-
else
133-
time = now
134-
end
143+
135144
me.add(time, record)
136145
last_record = record
137146
end

0 commit comments

Comments
 (0)