Skip to content

Commit 9979f31

Browse files
authored
Merge pull request #89 from fluent/full-v1-api
out_sql: Use v1 API completely
2 parents 003b986 + 2e79d4a commit 9979f31

File tree

1 file changed

+14
-27
lines changed

1 file changed

+14
-27
lines changed

lib/fluent/plugin/out_sql.rb

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
require "fluent/plugin/output"
22

3+
require 'active_record'
4+
require 'activerecord-import'
5+
36
module Fluent::Plugin
47
class SQLOutput < Output
58
Fluent::Plugin.register_output('sql', self)
69

7-
DEFAULT_BUFFER_TYPE = "memory"
8-
910
helpers :inject, :compat_parameters, :event_emitter
1011

1112
desc 'RDBMS host'
@@ -33,7 +34,7 @@ class SQLOutput < Output
3334
config_param :timeout, :integer, default: 5000
3435

3536
config_section :buffer do
36-
config_set_default :@type, DEFAULT_BUFFER_TYPE
37+
config_set_default :chunk_keys, ["tag"]
3738
end
3839

3940
attr_accessor :tables
@@ -86,11 +87,12 @@ def init(base_model)
8687
# @model.column_names
8788
end
8889

89-
def import(chunk)
90+
def import(chunk, output)
91+
tag = chunk.metadata.tag
9092
records = []
91-
chunk.msgpack_each { |tag, time, data|
93+
chunk.msgpack_each { |time, data|
9294
begin
93-
# format process should be moved to emit / format after supports error stream.
95+
data = output.inject_values_to_record(tag, time, data)
9496
records << @model.new(@format_proc.call(data))
9597
rescue => e
9698
args = {error: e, table: @table, record: Yajl.dump(data)}
@@ -105,7 +107,7 @@ def import(chunk)
105107
@log.warn "Got deterministic error. Fallback to one-by-one import", error: e
106108
one_by_one_import(records)
107109
else
108-
$log.warn "Got deterministic error. Fallback is disabled", error: e
110+
@log.warn "Got deterministic error. Fallback is disabled", error: e
109111
raise e
110112
end
111113
end
@@ -147,8 +149,6 @@ def parse_column_mapping(column_mapping_conf)
147149

148150
def initialize
149151
super
150-
require 'active_record'
151-
require 'activerecord-import'
152152
end
153153

154154
def configure(conf)
@@ -174,7 +174,6 @@ def configure(conf)
174174
@tables << te
175175
end
176176
}
177-
@only_default = @tables.empty?
178177

179178
if @default_table.nil?
180179
raise Fluent::ConfigError, "There is no default table. <table> is required in sql output"
@@ -215,19 +214,6 @@ def shutdown
215214
super
216215
end
217216

218-
def emit(tag, es, chain)
219-
if @only_default
220-
super(tag, es, chain)
221-
else
222-
super(tag, es, chain, format_tag(tag))
223-
end
224-
end
225-
226-
def format(tag, time, record)
227-
record = inject_values_to_record(tag, time, record)
228-
[tag, time, record].to_msgpack
229-
end
230-
231217
def formatted_to_msgpack_binary
232218
true
233219
end
@@ -236,11 +222,12 @@ def write(chunk)
236222
ActiveRecord::Base.connection_pool.with_connection do
237223

238224
@tables.each { |table|
239-
if table.pattern.match(chunk.key)
240-
return table.import(chunk)
225+
tag = format_tag(chunk.metadata.tag)
226+
if table.pattern.match(tag)
227+
return table.import(chunk, self)
241228
end
242229
}
243-
@default_table.import(chunk)
230+
@default_table.import(chunk, self)
244231
end
245232
end
246233

@@ -259,7 +246,7 @@ def init_table(te, base_model)
259246
end
260247

261248
def format_tag(tag)
262-
if @remove_tag_prefix
249+
if tag && @remove_tag_prefix
263250
tag.gsub(@remove_tag_prefix, '')
264251
else
265252
tag

0 commit comments

Comments
 (0)