Skip to content

Commit bc900c7

Browse files
andrewvcjordansissel
authored andcommitted
Refactored code sharing to play nicer with java plugin
Fixes #302
1 parent 06ea707 commit bc900c7

File tree

3 files changed

+49
-42
lines changed

3 files changed

+49
-42
lines changed

lib/logstash/outputs/elasticsearch.rb

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
6262

6363
config_name "elasticsearch"
6464

65+
# The Elasticsearch action to perform. Valid actions are:
66+
#
67+
# - index: indexes a document (an event from Logstash).
68+
# - delete: deletes a document by id (An id is required for this action)
69+
# - create: indexes a document, fails if a document by that id already exists in the index.
70+
# - update: updates a document by id. Update has a special case where you can upsert -- update a
71+
# document if not already present. See the `upsert` option
72+
#
73+
# For more details on actions, check out the http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation]
74+
config :action, :validate => %w(index delete create update), :default => "index"
75+
6576
# Username to authenticate to a secure Elasticsearch cluster
6677
config :user, :validate => :string
6778
# Password to authenticate to a secure Elasticsearch cluster
@@ -106,27 +117,12 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
106117
# How long to wait, in seconds, between sniffing attempts
107118
config :sniffing_delay, :validate => :number, :default => 5
108119

109-
# Set max retry for each event. The total time spent blocked on retries will be
110-
# (max_retries * retry_max_interval). This may vary a bit if Elasticsearch is very slow to respond
111-
config :max_retries, :validate => :number, :default => 3
112-
113-
# Set max interval between bulk retries.
114-
config :retry_max_interval, :validate => :number, :default => 2
115-
116-
# DEPRECATED This setting no longer does anything. If you need to change the number of retries in flight
117-
# try increasing the total number of workers to better handle this.
118-
config :retry_max_items, :validate => :number, :default => 500, :deprecated => true
119-
120120
# Set the address of a forward HTTP proxy.
121121
# Can be either a string, such as `http://localhost:123` or a hash in the form
122122
# of `{host: 'proxy.org' port: 80 scheme: 'http'}`.
123123
# Note, this is NOT a SOCKS proxy, but a plain HTTP proxy
124124
config :proxy
125125

126-
# Enable `doc_as_upsert` for update mode.
127-
# Create a new document with source if `document_id` doesn't exist in Elasticsearch
128-
config :doc_as_upsert, :validate => :boolean, :default => false
129-
130126
# Set the timeout for network operations and requests sent Elasticsearch. If
131127
# a timeout occurs, the request will be retried.
132128
config :timeout, :validate => :number
@@ -135,6 +131,12 @@ def build_client
135131
@client = ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
136132
end
137133

134+
def close
135+
@stopping.make_true
136+
@client.stop_sniffing!
137+
@buffer.stop
138+
end
139+
138140
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
139141

140142
@@plugins.each do |plugin|

lib/logstash/outputs/elasticsearch/common.rb

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,25 @@ def register
1919
end
2020

2121
def receive(event)
22+
@buffer << event_action_tuple(event)
23+
end
24+
25+
# Receive an array of events and immediately attempt to index them (no buffering)
26+
def multi_receive(events)
27+
retrying_submit(events.map {|e| event_action_tuple(e) })
28+
end
29+
30+
# Convert the event into a 3-tuple of action, params, and event
31+
def event_action_tuple(event)
2232
params = event_action_params(event)
2333
action = event.sprintf(@action)
24-
25-
@buffer << [action, params, event]
34+
[action, params, event]
2635
end
2736

2837
def flush
2938
@buffer.flush
3039
end
3140

32-
def close
33-
@stopping.make_true
34-
@client.stop_sniffing!
35-
@buffer.stop
36-
end
37-
38-
3941
def setup_hosts
4042
@hosts = Array(@hosts)
4143
if @hosts.empty?
@@ -61,7 +63,7 @@ def retrying_submit(actions)
6163
submit_actions = actions
6264

6365
while submit_actions && submit_actions.length > 0 && retries_left > 0
64-
return if !submit_actions # If everything's a success we move along
66+
return if !submit_actions || submit_actions.empty? # If everything's a success we move along
6567
# We retry with whatever is didn't succeed
6668
begin
6769
submit_actions = submit(submit_actions)
@@ -76,7 +78,7 @@ def retrying_submit(actions)
7678
end
7779

7880
def submit(actions)
79-
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] }
81+
es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash]}
8082

8183
bulk_response = safe_bulk(es_actions,actions)
8284

lib/logstash/outputs/elasticsearch/common_configs.rb

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def self.included(mod)
1111
# The index type to write events to. Generally you should try to write only
1212
# similar events to the same 'type'. String expansion `%{foo}` works here.
1313
#
14-
# Deprecated in favor of `document_type` field.
14+
# Deprecated in favor of `docoument_type` field.
1515
mod.config :index_type, :validate => :string, :obsolete => "Please use the 'document_type' setting instead. It has the same effect, but is more appropriately named."
1616

1717
# The document type to write events to. Generally you should try to write only
@@ -62,8 +62,7 @@ def self.included(mod)
6262
# `["127.0.0.1:9200","127.0.0.2:9200"]`
6363
# It is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `hosts` list
6464
# to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes in Elasticsearch.
65-
66-
mod.config :hosts, :validate => :array
65+
mod.config :hosts, :validate => :array, :default => ["127.0.0.1"]
6766

6867
mod.config :host, :obsolete => "Please use the 'hosts' setting instead. You can specify multiple entries separated by comma in 'host:port' format."
6968

@@ -91,20 +90,24 @@ def self.included(mod)
9190
# near-real-time.
9291
mod.config :idle_flush_time, :validate => :number, :default => 1
9392

94-
# The Elasticsearch action to perform. Valid actions are:
95-
#
96-
# - index: indexes a document (an event from Logstash).
97-
# - delete: deletes a document by id (An id is required for this action)
98-
# - create: indexes a document, fails if a document by that id already exists in the index.
99-
# - update: updates a document by id. Update has a special case where you can upsert -- update a
100-
# document if not already present. See the `upsert` option
101-
#
102-
# For more details on actions, check out the http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation]
103-
mod.config :action, :validate => %w(index delete create update), :default => "index"
104-
105-
# Set upsert content for update mode.
93+
# Set upsert content for update mode.s
10694
# Create a new document with this parameter as json string if `document_id` doesn't exists
10795
mod.config :upsert, :validate => :string, :default => ""
96+
97+
# Enable `doc_as_upsert` for update mode.
98+
# Create a new document with source if `document_id` doesn't exist in Elasticsearch
99+
mod.config :doc_as_upsert, :validate => :boolean, :default => false
100+
101+
# Set max retry for each event. The total time spent blocked on retries will be
102+
# (max_retries * retry_max_interval). This may vary a bit if Elasticsearch is very slow to respond
103+
mod.config :max_retries, :validate => :number, :default => 3
104+
105+
# Set max interval between bulk retries.
106+
mod.config :retry_max_interval, :validate => :number, :default => 2
107+
108+
# DEPRECATED This setting no longer does anything. If you need to change the number of retries in flight
109+
# try increasing the total number of workers to better handle this.
110+
mod.config :retry_max_items, :validate => :number, :default => 500, :deprecated => true
108111
end
109112
end
110-
end; end; end
113+
end end end

0 commit comments

Comments
 (0)