Skip to content

Commit 70654fb

Browse files
authored
Add filter_path to bulk messages (#1154)
This commit sets the `filter_path` query parameter when sending messages to Elasticsearch using the bulk API. This should significantly reduce the size of the query response from Elasticsearch, which should help reduce bandwidth usage, and improve response processing speed due to the lesser amount of JSON to deserialize * Add query to expected results to fix integration tests * Remork PR to respect exiting `filter_path` settings in custom bulk endpoints * Add doc and comment explaining filter_path addition to /_bulk Resolves: #1153
1 parent 0801b43 commit 70654fb

File tree

7 files changed

+141
-36
lines changed

7 files changed

+141
-36
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.19.0
2+
- Added `filter_path` to bulk requests to reduce the size of responses from elasticsearch
3+
14
## 11.18.0
25
- Added request header `Elastic-Api-Version` for serverless [#1147](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1147)
36

docs/index.asciidoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,12 @@ Elasticsearch {ref}/security-api-create-api-key.html[Create API key API].
423423
===== `bulk_path`
424424

425425
* Value type is <<string,string>>
426-
* There is no default value for this setting.
426+
* The default value for this settings is `/_bulk?filter_path=errors,items.*.error,items.*.status`
427427

428428
HTTP Path to perform the _bulk requests to
429-
this defaults to a concatenation of the path parameter and "_bulk"
429+
* This default bulk path is the concatenation of the value of `path` parameter and `/_bulk?filter_path=errors,items.*.error,items.*.status`
430+
* The `filter_path` query parameter is appended to the bulk path to reduce the payload between logstash and elasticsearch. However, if a custom `filter_path` query parameter is included in the `bulk_path` setting, then that value will be used.
431+
430432

431433
[id="plugins-{type}s-{plugin}-ca_trusted_fingerprint"]
432434
===== `ca_trusted_fingerprint`

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ def join_bulk_responses(bulk_responses)
177177

178178
def bulk_send(body_stream, batch_actions)
179179
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
180+
180181
response = @pool.post(@bulk_path, params, body_stream.string)
181182

182183
@bulk_response_metrics.increment(response.code.to_s)

lib/logstash/outputs/elasticsearch/http_client_builder.rb

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ def self.build(logger, hosts, params)
3333
end
3434

3535
common_options[:bulk_path] = if params["bulk_path"]
36-
dedup_slashes("/#{params["bulk_path"]}")
36+
resolve_filter_path(dedup_slashes("/#{params["bulk_path"]}"))
3737
else
38-
dedup_slashes("/#{params["path"]}/_bulk")
38+
resolve_filter_path(dedup_slashes("/#{params["path"]}/_bulk"))
3939
end
4040

4141
common_options[:sniffing_path] = if params["sniffing_path"]
@@ -197,5 +197,16 @@ def self.setup_api_key(logger, params)
197197
def self.dedup_slashes(url)
198198
url.gsub(/\/+/, "/")
199199
end
200+
201+
# Set a `filter_path` query parameter if it is not already set to be
202+
# `filter_path=errors,items.*.error,items.*.status` to reduce the payload between Logstash and Elasticsearch
203+
def self.resolve_filter_path(url)
204+
return url if url.match?(/(?:[&|?])filter_path=/)
205+
("#{url}#{query_param_separator(url)}filter_path=errors,items.*.error,items.*.status")
206+
end
207+
208+
def self.query_param_separator(url)
209+
url.match?(/\?[^\s#]+/) ? '&' : '?'
210+
end
200211
end
201212
end; end; end

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.18.0'
3+
s.version = '11.19.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/index_spec.rb

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
156156
let(:config) { "not implemented" }
157157
let(:events) { event_count.times.map { event }.to_a }
158158
subject { LogStash::Outputs::ElasticSearch.new(config) }
159-
159+
let(:filter_path) { "filter_path=errors,items.*.error,items.*.status"}
160160
let(:es_url) { "http://#{get_host_port}" }
161161
let(:index_url) { "#{es_url}/#{index}" }
162162

@@ -178,7 +178,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
178178
subject.do_close
179179
end
180180

181-
shared_examples "an indexer" do |secure|
181+
shared_examples "an indexer" do |secure, expected_path|
182182
before(:each) do
183183
host_unreachable_error_class = LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError
184184
allow(host_unreachable_error_class).to receive(:new).with(any_args).and_wrap_original do |m, original, url|
@@ -212,13 +212,13 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
212212
expect(doc["_index"]).to eq(index)
213213
end
214214
end
215-
215+
216216
it "sets the correct content-type header" do
217217
expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything}
218218
if secure
219219
expected_manticore_opts = {
220-
:headers => {"Content-Type" => "application/json"},
221-
:body => anything,
220+
:headers => {"Content-Type" => "application/json"},
221+
:body => anything,
222222
:auth => {
223223
:user => user,
224224
:password => password,
@@ -230,6 +230,20 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
230230
and_call_original
231231
subject.multi_receive(events)
232232
end
233+
234+
it "sets the bulk path URL and filter path parameter correctly" do
235+
expect(subject.client.pool.adapter.client).to receive(:send).
236+
with(anything, expected_path != nil ? expected_path : anything, anything).at_least(:once).and_call_original
237+
subject.multi_receive(events)
238+
end
239+
240+
it "receives a filtered response" do
241+
expect(subject.client).to receive(:join_bulk_responses).
242+
with([{"errors"=>false, "items"=>[{"index"=>{"status"=>201}}]}]).
243+
and_call_original
244+
subject.multi_receive([event])
245+
end
246+
233247
end
234248

235249
shared_examples "PKIX path failure" do
@@ -269,6 +283,65 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
269283
it_behaves_like("an indexer")
270284
end
271285

286+
describe "an indexer with custom bulk path", :integration => true do
287+
let(:bulk_path) { "/_bulk?routing=true"}
288+
let(:config) {
289+
{
290+
"hosts" => get_host_port,
291+
"index" => index,
292+
"http_compression" => false,
293+
"bulk_path" => bulk_path
294+
}
295+
}
296+
it_behaves_like("an indexer", false) do
297+
let (:expected_path) { "#{es_url}#{bulk_path}&#{filter_path}" }
298+
end
299+
end
300+
301+
describe "an indexer with filter path as second parameter", :integration => true do
302+
let(:bulk_path) { "/_bulk?routing=true&#{filter_path}"}
303+
let(:config) {
304+
{
305+
"hosts" => get_host_port,
306+
"index" => index,
307+
"http_compression" => false,
308+
"bulk_path" => bulk_path
309+
}
310+
}
311+
it_behaves_like("an indexer", false) do
312+
let (:expected_path) { "#{es_url}/#{bulk_path}" }
313+
end
314+
end
315+
316+
describe "an indexer with filter path as first parameter", :integration => true do
317+
let(:bulk_path) { "/_bulk?#{filter_path}&routing=true"}
318+
let(:config) {
319+
{
320+
"hosts" => get_host_port,
321+
"index" => index,
322+
"http_compression" => false,
323+
"bulk_path" => bulk_path
324+
}
325+
}
326+
it_behaves_like("an indexer", false) do
327+
let (:expected_path) { "#{es_url}/#{bulk_path}" }
328+
end
329+
end
330+
331+
describe "an indexer with the standard bulk path", :integration => true do
332+
let(:config) {
333+
{
334+
"hosts" => get_host_port,
335+
"index" => index,
336+
"http_compression" => false
337+
}
338+
}
339+
it_behaves_like("an indexer", false) do
340+
let (:expected_path) { "#{es_url}/_bulk?#{filter_path}" }
341+
end
342+
343+
end
344+
272345
describe "an indexer with no type value set (default to doc)", :integration => true do
273346
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
274347
let(:config) {
@@ -296,7 +369,7 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
296369
"index" => index,
297370
"http_compression" => false
298371
}
299-
end
372+
end
300373

301374
let(:curl_opts) { "-u #{user}:#{password}" }
302375

spec/unit/http_client_builder_spec.rb

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,55 +36,70 @@
3636
end
3737
end
3838

39-
describe "healthcheck_path" do
39+
describe "bulk_path" do
40+
let (:filter_path) {"filter_path=errors,items.*.error,items.*.status"}
41+
42+
shared_examples("filter_path added to bulk path appropriately") do
43+
it "sets the bulk_path option to the expected bulk path" do
44+
expect(described_class).to receive(:create_http_client) do |options|
45+
expect(options[:bulk_path]).to eq(expected_bulk_path)
46+
end
47+
described_class.build(logger, hosts, options)
48+
end
49+
end
4050

4151
context "when setting bulk_path" do
4252
let(:bulk_path) { "/meh" }
4353
let(:options) { super().merge("bulk_path" => bulk_path) }
4454

4555
context "when using path" do
4656
let(:options) { super().merge("path" => "/path") }
47-
it "ignores the path setting" do
48-
expect(described_class).to receive(:create_http_client) do |options|
49-
expect(options[:bulk_path]).to eq(bulk_path)
50-
end
51-
described_class.build(logger, hosts, options)
52-
end
57+
let(:expected_bulk_path) { "#{bulk_path}?#{filter_path}" }
58+
59+
it_behaves_like "filter_path added to bulk path appropriately"
60+
end
61+
62+
context "when setting a filter path as first parameter" do
63+
let (:filter_path) {"filter_path=error"}
64+
let(:bulk_path) { "/meh?#{filter_path}&routing=true" }
65+
let(:expected_bulk_path) { bulk_path }
66+
67+
it_behaves_like "filter_path added to bulk path appropriately"
68+
end
69+
70+
context "when setting a filter path as second parameter" do
71+
let (:filter_path) {"filter_path=error"}
72+
let(:bulk_path) { "/meh?routing=true&#{filter_path}" }
73+
let(:expected_bulk_path) { bulk_path }
74+
75+
it_behaves_like "filter_path added to bulk path appropriately"
5376
end
77+
5478
context "when not using path" do
79+
let(:expected_bulk_path) { "#{bulk_path}?#{filter_path}"}
5580

56-
it "uses the bulk_path setting" do
57-
expect(described_class).to receive(:create_http_client) do |options|
58-
expect(options[:bulk_path]).to eq(bulk_path)
59-
end
60-
described_class.build(logger, hosts, options)
61-
end
81+
it_behaves_like "filter_path added to bulk path appropriately"
6282
end
6383
end
6484

6585
context "when not setting bulk_path" do
6686

6787
context "when using path" do
6888
let(:path) { "/meh" }
89+
let(:expected_bulk_path) { "#{path}/_bulk?#{filter_path}"}
6990
let(:options) { super().merge("path" => path) }
70-
it "sets bulk_path to path+_bulk" do
71-
expect(described_class).to receive(:create_http_client) do |options|
72-
expect(options[:bulk_path]).to eq("#{path}/_bulk")
73-
end
74-
described_class.build(logger, hosts, options)
75-
end
91+
92+
it_behaves_like "filter_path added to bulk path appropriately"
7693
end
7794

7895
context "when not using path" do
79-
it "sets the bulk_path to _bulk" do
80-
expect(described_class).to receive(:create_http_client) do |options|
81-
expect(options[:bulk_path]).to eq("/_bulk")
82-
end
83-
described_class.build(logger, hosts, options)
84-
end
96+
let(:expected_bulk_path) { "/_bulk?#{filter_path}"}
97+
98+
it_behaves_like "filter_path added to bulk path appropriately"
8599
end
86100
end
87101
end
102+
88103
describe "healthcheck_path" do
89104
context "when setting healthcheck_path" do
90105
let(:healthcheck_path) { "/meh" }

0 commit comments

Comments
 (0)