diff --git a/x-pack/lib/config_management/elasticsearch_source.rb b/x-pack/lib/config_management/elasticsearch_source.rb index 0baf448f601..622c785a3a6 100644 --- a/x-pack/lib/config_management/elasticsearch_source.rb +++ b/x-pack/lib/config_management/elasticsearch_source.rb @@ -63,13 +63,7 @@ def pipeline_configs es_version = get_es_version fetcher = get_pipeline_fetcher(es_version) - begin - fetcher.fetch_config(es_version, pipeline_ids, client) - rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - # es-output 12.0.2 throws 404 as error, but we want to handle it as empty config - return [] if e.response_code == 404 - raise e - end + fetcher.fetch_config(es_version, pipeline_ids, client) fetcher.get_pipeline_ids.collect do |pid| get_pipeline(pid, fetcher) @@ -210,22 +204,40 @@ class SystemIndicesFetcher SYSTEM_INDICES_API_PATH = "_logstash/pipeline" def fetch_config(es_version, pipeline_ids, client) - es_supports_pipeline_wildcard_search = es_supports_pipeline_wildcard_search?(es_version) + # if we are talking with an Elasticsearch that supports wildcard search, and get + # a successful response, use it. But wildcard search has a weird quirk that it 404's + # when there are no matches, so we need to fall through to traditional client-side + # search to rule out a proxy emitting a 404. + if es_supports_pipeline_wildcard_search?(es_version) + begin + logger.trace("querying for pipelines #{pipeline_ids.join(",")} using server-side wildcard search") + @pipelines = get_response(client,"#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}") + return @pipelines + rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + raise unless e.response_code == 404 + logger.warn("got 404 requesting pipelines from Elasticsearch using wildcard search; falling back to client-side filtering") + end + end + + # client-side filtering + logger.trace("querying for pipelines #{pipeline_ids.join(",")} using client-side wildcard search") + response = get_response(client,"#{SYSTEM_INDICES_API_PATH}/") + @pipelines = get_wildcard_pipelines(pipeline_ids, response) + + rescue LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + raise ElasticsearchSource::RemoteConfigError, "Cannot load configuration for pipeline_id: #{pipeline_ids}, server returned `#{e}`" + end + + def get_response(client, path) retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management') response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) { - path = es_supports_pipeline_wildcard_search ? - "#{SYSTEM_INDICES_API_PATH}?id=#{ERB::Util.url_encode(pipeline_ids.join(","))}" : - "#{SYSTEM_INDICES_API_PATH}/" client.get(path) } - if response["error"] raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`" end - @pipelines = es_supports_pipeline_wildcard_search ? - response : - get_wildcard_pipelines(pipeline_ids, response) + response end def es_supports_pipeline_wildcard_search?(es_version) diff --git a/x-pack/spec/config_management/elasticsearch_source_spec.rb b/x-pack/spec/config_management/elasticsearch_source_spec.rb index b16916ca5e6..57bc99a23b4 100644 --- a/x-pack/spec/config_management/elasticsearch_source_spec.rb +++ b/x-pack/spec/config_management/elasticsearch_source_spec.rb @@ -208,6 +208,8 @@ let(:all_pipelines) { JSON.parse(::File.read(::File.join(::File.dirname(__FILE__), "fixtures", "pipelines.json"))) } let(:mock_logger) { double("fetcher's logger") } + let(:bad_response_code_error) { LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError } + before(:each) { allow(subject).to receive(:logger).and_return(mock_logger) } @@ -218,16 +220,36 @@ expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"}) end - it "#fetch_config from ES v8.3" do - expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone) - expect(subject.fetch_config(es_version_8_3, [pipeline_id], mock_client)).to eq(elasticsearch_response) - expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"}) - end + { + 'v8.3' => { major: 8, minor: 3}, + 'v9.0' => { major: 9, minor: 0} + }.each do |desc, es_version| + context "ES #{desc}" do + it "#fetch_config works" do + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone) + expect(subject.fetch_config(es_version, [pipeline_id], mock_client)).to eq(elasticsearch_response) + expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"}) + end - it "#fetch_config from ES v9.0" do - expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}").and_return(elasticsearch_response.clone) - expect(subject.fetch_config(es_version_9_0, [pipeline_id], mock_client)).to eq(elasticsearch_response) - expect(subject.get_single_pipeline_setting(pipeline_id)).to eq({"pipeline" => "#{config}"}) + it "#fetch_config from ES v8.3 with 404->200 is empty list" do + wildcard_search_path = "#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}" + expect(mock_client).to receive(:get).with(wildcard_search_path) + .and_raise(bad_response_code_error.new(404, wildcard_search_path, nil, '{}')) + expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/").and_return({}) + expect(subject.fetch_config(es_version, [pipeline_id], mock_client)).to eq({}) + end + + it "#fetch_config from ES v8.3 with 404->404 is error" do + # 404's on Wildcard search need to be confirmed with a 404 from the get-all endpoint + wildcard_search_path = "#{described_class::SYSTEM_INDICES_API_PATH}?id=#{pipeline_id}" + expect(mock_client).to receive(:get).with(wildcard_search_path) + .and_raise(bad_response_code_error.new(404, wildcard_search_path, nil, '{}')) + get_all_path = "#{described_class::SYSTEM_INDICES_API_PATH}/" + expect(mock_client).to receive(:get).with(get_all_path) + .and_raise(bad_response_code_error.new(404, get_all_path, nil, '{}')) + expect { subject.fetch_config(es_version, [pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) + end + end end it "#fetch_config should raise error" do @@ -335,7 +357,7 @@ expect { subject.fetch_config(empty_es_version, [pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end - it "#fetch_config should raise error when response is empty" do + it "#fetch_config should raise error when response is malformed" do expect(mock_client).to receive(:post).with("#{described_class::PIPELINE_INDEX}/_mget", {}, "{\"docs\":[{\"_id\":\"#{pipeline_id}\"},{\"_id\":\"#{another_pipeline_id}\"}]}").and_return(LogStash::Json.load("{}")) expect { subject.fetch_config(empty_es_version, [pipeline_id, another_pipeline_id], mock_client) }.to raise_error(LogStash::ConfigManagement::ElasticsearchSource::RemoteConfigError) end