Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions docs/static/spec/openapi/logstash-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,63 @@ paths:
- content: Logstash
name: product_name

/_node/pipelines/{pipeline_name}/_reload:
post:
summary: Reload a pipeline
description: Reloads the specified pipeline.
operationId: nodeReloadPipeline
tags:
- node info
parameters:
- name: pipeline_name
in: path
required: true
schema:
type: string
description: The name of the pipeline to reload.
- $ref: "#/components/parameters/pretty"
responses:
'200':
description: Indicates a successful call
content:
application/json:
schema:
allOf:
- type: object
properties:
success:
type: boolean
description: Whether the pipeline was reloaded successfully.
failed_actions:
type: array
items:
type: object
properties:
id:
type: string
actions_type:
type: string
successful_actions:
type: array
items:
type: object
properties:
id:
type: string
action_type:
type: string
example:
ReloadPipelineExample1:
value:
success: true
failed_actions: []
successful_actions:
- id: "main"
actionn_type: "Logstash::PipelineAction::Reload"
x-metaTags:
- content: Logstash
name: product_name

/_node/plugins:
get:
summary: Get plugin info
Expand Down
25 changes: 25 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,31 @@ def stop_pipeline(pipeline_id)
converge_state_with_resolved_actions([action])
end

##
# Reload a pipeline and wait for it to fully reload.
# @param pipeline_id [String]
def reload_pipeline(pipeline_id)
pipeline = get_pipeline(pipeline_id)
return if pipeline.nil?
converge_result = @convergence_lock.synchronize do
pipeline_config = pipeline.pipeline_config
converge_state([LogStash::PipelineAction::Reload.new(pipeline_config, metric)])
end

update_metrics(converge_result)

logger.info(
"Pipelines running",
:count => running_pipelines.size,
:running_pipelines => running_pipelines.keys,
:non_running_pipelines => non_running_pipelines.keys
) if converge_result.success? && converge_result.total > 0

dispatch_events(converge_result)

converge_result
end

# Calculate the Logstash uptime in milliseconds
#
# @return [Integer] Uptime in milliseconds
Expand Down
4 changes: 4 additions & 0 deletions logstash-core/lib/logstash/api/commands/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ def pipeline(pipeline_id, options = {})
{} # empty
end

def reload_pipeline(pipeline_id)
service.agent.reload_pipeline(pipeline_id.to_sym)
end

def os
{
:name => java.lang.System.getProperty("os.name"),
Expand Down
11 changes: 11 additions & 0 deletions logstash-core/lib/logstash/api/modules/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ def node
respond_with(:pipelines => { pipeline_id => payload })
end

post "/pipelines/:id/_reload" do
pipeline_id = params["id"]
halt(404) if node.pipeline(pipeline_id).empty?
converge_result = node.reload_pipeline(pipeline_id)
respond_with({
'success' => converge_result.success?,
'failed_actions' => converge_result.failed_actions.collect { |a, r| "id: #{a.pipeline_id}, action_type: #{a.class}, message: #{r.message}" },
'successful_actions' => converge_result.successful_actions.collect { |a, r| { 'id' => a.pipeline_id, 'action_type' => a.class } }
})
end

get "/pipelines" do
opts = {:graph => as_boolean(params.fetch("graph", false)),
:vertices => as_boolean(params.fetch("vertices", false))}
Expand Down
28 changes: 28 additions & 0 deletions logstash-core/spec/logstash/agent_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,34 @@
end
end

describe "#reload_pipeline" do
let(:config_string) { "input { generator { id => 'old'} } output { }" }
let(:mock_config_pipeline) { mock_pipeline_config(:main, config_string, pipeline_settings) }
let(:source_loader) { TestSourceLoader.new(mock_config_pipeline) }
subject { described_class.new(agent_settings, source_loader) }

before(:each) do
expect(subject.converge_state_and_update).to be_a_successful_converge
expect(subject.get_pipeline('main').running?).to be_truthy
end

after(:each) do
subject.shutdown
end

context "when agent reloads the pipeline" do
it "should reload successfully", :aggregate_failures do
pipeline_before_reload = subject.get_pipeline('main')
converge_result = subject.reload_pipeline('main')
pipeline_after_reload = subject.get_pipeline('main')

expect(converge_result).to be_a_successful_converge
expect(pipeline_after_reload.running?).to be_truthy
expect(pipeline_before_reload.object_id).not_to eq(pipeline_after_reload.object_id)
end
end
end

context "#started_at" do
it "return the start time when the agent is started" do
expect(described_class::STARTED_AT).to be_kind_of(Time)
Expand Down
Loading