diff --git a/README.md b/README.md
index 9a3b6fc..983a9d6 100644
--- a/README.md
+++ b/README.md
@@ -18,6 +18,10 @@ This plugin will help to gathering status log from these status api.
* [Monitor Status Code](#monitoring-http-status-code-only)
* [Override User Agent](#override-user-agent)
* [HTTP Basic Auth](#http-basic-auth)
+ * [HTTP Login with Payload](#http-login-with-payload)
+ * [HTTP Pull with Payload](#http-pull-with-payload)
+ * [Multiple Events](#multiple-events)
+ * [Encapsulated items](#encapsulated-items)
* [HTTP Proxy](#http-proxy)
* [Logging Response Header](#logging-http-response-header)
* [Custom Request Header](#custom-request-header)
@@ -161,6 +165,95 @@ You can use `user`, `password` options to provide authentication information.
# 2017-05-17 21:41:47.872951000 +0900 status: {"url":"http://yourinfrastructure/api/status.json","status":200,"message":{ ... }}
```
+### HTTP login with payload
+
+If your infrastructure use cookies to manage the login session, and the login path is different to query path, you can use `login_path`, `login_payload`, and `path` options to provide authentication information.
+
+For example, login url is https://localhost:8080/login, query url is https://localhost:8080/search
+
+```
+
+ @type http_pull
+
+ tag status
+ url https://localhost:8080
+ path search
+ interval 1s
+
+ login_path login
+ login_payload {"username":"tester","password":"drowssaP"}
+ verify_ssl false
+
+ format json
+
+```
+
+### HTTP pull with payload
+
+You can send json format `payload` to togather with the query.
+
+```
+
+ @type http_pull
+
+ tag status
+ url https://localhost:8080/search
+ payload {"max-results": 1500}
+
+ interval 1s
+
+```
+
+### Multiple events
+
+If the server returns multiple events per request, for example
+
+```
+[{"message": "message 1"},
+ {"message": "message 2"}]
+```
+This can be handled by specify option `multi_event true`
+
+```
+
+ @type http_pull
+
+ tag status
+ url https://localhost:8080/search
+ multi_event true
+
+ interval 1s
+
+```
+
+### Encapsulated items
+
+If the expected items are encapsulated in json structure, for example,
+
+```
+{"meta": {"server": "localhost"},
+ "items": [
+ {"message": "message 1"},
+ {"message": "message 2"}
+ ]
+}
+```
+
+You can fetch the messages by setting up option `event_key`,
+
+```
+
+ @type http_pull
+
+ tag status
+ url https://localhost:8080/search
+ multi_event true
+ event_key items
+
+ interval 1s
+
+```
+
### HTTP proxy
You can send your requests via proxy server.
@@ -344,6 +437,31 @@ The user for basic auth
The password for basic auth
+### Authentication Session Configuration
+
+#### login_path (string) (optional, default: nil)
+
+The subpath of the login url.
+
+#### login_payload (hash) (optional, default: nil)
+
+The payload send for authentication.
+Note: login_path and login_payload has to be both nil or both not-nil.
+
+### Request Configuration
+
+#### payload (hash) (optional, default: nil)
+
+The query payload sent to server.
+
+#### multi_event (bool) (optional, default: false)
+
+Whether the response contains multiple events.
+
+#### event_key (string) (optional, default: nil)
+
+The key of the expected items in a json format response.
+
### Req/Resp Header Configuration
#### response_header (section) (optional, default: nil)
diff --git a/lib/fluent/plugin/in_http_pull.rb b/lib/fluent/plugin/in_http_pull.rb
index 6568320..9440ef4 100644
--- a/lib/fluent/plugin/in_http_pull.rb
+++ b/lib/fluent/plugin/in_http_pull.rb
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+require "fluent/log"
require "fluent/plugin/input"
require "rest-client"
@@ -33,6 +34,18 @@ def initialize
desc 'The url of monitoring target'
config_param :url, :string
+ desc 'The path of monitoring target'
+ config_param :path, :string, default: nil
+
+ desc 'Payload to query target'
+ config_param :payload, :hash, default: nil
+
+ desc 'Message key'
+ config_param :event_key, :string, default: nil
+
+ desc 'Response contains multiple events'
+ config_param :multi_event, :bool, default: false
+
desc 'The interval time between periodic request'
config_param :interval, :time
@@ -59,6 +72,13 @@ def initialize
desc 'password of basic auth'
config_param :password, :string, default: nil, secret: true
+ # login session
+ desc 'login path'
+ config_param :login_path, :string, default: nil
+
+ desc 'login payload'
+ config_param :login_payload, :hash, default: nil
+
# req/res header options
config_section :response_header, param_name: :response_headers, multi: true do
desc 'The name of header to cature from response'
@@ -88,6 +108,9 @@ def configure(conf)
compat_parameters_convert(conf, :parser)
super
+ if (@login_path && !@login_payload ) || (!@login_path && @login_payload)
+ raise Fluent::ConfigError, "login_path and login_payload should be both set or unset"
+ end
@parser = parser_create unless @status_only
@_request_headers = {
"Content-Type" => "application/x-www-form-urlencoded",
@@ -100,6 +123,7 @@ def configure(conf)
end.to_h)
@http_method = :head if @status_only
+ @cookies = {}
end
def start
@@ -111,23 +135,36 @@ def start
def on_timer
body = nil
record = nil
-
+ record_time = Engine.now
+ emit_stream = Fluent::MultiEventStream.new
+ base_site = RestClient::Resource.new(@url, request_options)
+ login_site = @login_path ? base_site[@login_path] : base_site
+ query_site = @path ? base_site[@path] : base_site
begin
- res = RestClient::Request.execute request_options
- record, body = get_record(res)
-
+ get_session_cookie(login_site)
rescue StandardError => err
- record = { "url" => @url, "error" => err.message }
- if err.respond_to? :http_code
- record["status"] = err.http_code || 0
+ record = log_error(login_site.url, err)
+ emit_stream.add(record_time, record)
+ router.emit_stream(@tag, emit_stream)
+ return
+ end
+ begin
+ if @payload
+ res = query_site.method(@http_method).call(@payload.to_json, :cookies=>@cookies)
else
- record["status"] = 0
+ res = query_site.method(@http_method).call(:cookie=>@cookies)
end
+
+ record, body = get_record(res, query_site.url)
+ process_events(record, body, emit_stream)
+ rescue StandardError => err
+ record = log_error(query_site.url, err)
+ # reset session cookie if it is expired
+ @cookies = {} if not @cookies.empty? and record["status"] == 401
+ emit_stream.add(record_time, record)
end
- record_time = Engine.now
- record = parse(record, body)
- router.emit(@tag, record_time, record)
+ router.emit_stream(@tag, emit_stream)
end
def shutdown
@@ -135,8 +172,19 @@ def shutdown
end
private
+ def log_error(url, err)
+ record = { "url" => url, "error" => err.message }
+ if err.respond_to? :http_code
+ record["status"] = err.http_code || 0
+ else
+ record["status"] = 0
+ end
+ log.error(record)
+ return record
+ end
+
def request_options
- options = { method: @http_method, url: @url, timeout: @timeout, headers: @_request_headers }
+ options = { timeout: @timeout, headers: @_request_headers }
options[:proxy] = @proxy if @proxy
options[:user] = @user if @user
@@ -151,9 +199,21 @@ def request_options
return options
end
- def get_record(response)
+ def get_session_cookie(resource)
+ @cookies = {} if @cookies.nil?
+ return unless @login_path and @login_payload and @cookies.empty?
+ login_response = resource.post(@login_payload.to_json)
+ if login_response.code != 200
+ raise RestClient::ExceptionWithResponse.new(nil, login_response.code)
+ else
+ @cookies = login_response.cookie_jar
+ end
+
+ end
+
+ def get_record(response, url)
body = response.body
- record = { "url" => @url, "status" => response.code }
+ record = { "url" => url, "status" => response.code }
record["header"] = {} unless @response_headers.empty?
@response_headers.each do |section|
name = section["header"]
@@ -161,20 +221,37 @@ def get_record(response)
record["header"][name] = response.headers[symbolize_name]
end
-
return record, body
end
- def parse(record, body)
- if !@status_only && body != nil
- @parser.parse(body) do |time, message|
- record["message"] = message
- record_time = time
+ def process_events(record, body, es)
+
+ return es.add(Engine.now, record) if @status_only or body == nil
+
+ # consume errors produced by parser by logging it
+ begin
+ @parser.parse(body) do |time, events|
+
+ events = events[@event_key] if @multi_event and @event_key || []
+
+ # if @event_key not found, events will be converted to empty Array.
+ log.warning("event_key '#{@event_key}' not found") if events == []
+
+ # if each query result is a record, covert it to array.
+ events = [ events ] unless @multi_event
+
+ events.each do |event|
+ item = record.dup
+ item['message'] = event
+ es.add(time, item)
+ end
end
+ rescue StandardError => err
+ log.error("Failed to process result with error: #{err}")
+ log.debug("Failed to parse #{body}")
end
-
- return record
end
+
end
end
end
diff --git a/test/helper/stub_server.rb b/test/helper/stub_server.rb
index 24ab0d2..2527067 100644
--- a/test/helper/stub_server.rb
+++ b/test/helper/stub_server.rb
@@ -30,7 +30,12 @@ def initialize(port=3939, ssl_enable=false)
@server.mount_proc '/custom_header', &method(:custom_header)
@server.mount_proc '/method_post', &method(:method_post)
+
+ @server.mount_proc '/login', &method(:login)
+ @server.mount_proc '/session_events', &method(:session_events)
+
@server.mount '/method_delete', DeleteService
+
end
def start
@@ -131,4 +136,21 @@ def method_post(req, res)
res.body = '{ "status": "OK" }'
end
end
+
+ def login(req, res)
+ if req.body and JSON.parse(req.body) == {"username"=>"admin", "password"=>"pwd"}
+ res.status = 200
+ res['Content-Type'] = 'application/json'
+ res.cookies.push WEBrick::Cookie.new("session", "1")
+ else
+ res.status = 401
+ end
+ end
+
+ def session_events(req, res)
+ res.status = 200
+ res['Content-Type'] = 'application/json'
+ res.body = '{"ListMeta":{},"items":[{"kind":"Event","meta":{"name":"1","uuid":"c51d9e82"}},{"kind":"Event","meta":{"name":"2","uuid":"b1b5686d"}}]}'
+ end
+
end
diff --git a/test/plugin/test_in_http_pull.rb b/test/plugin/test_in_http_pull.rb
index 8b84607..b8725be 100644
--- a/test/plugin/test_in_http_pull.rb
+++ b/test/plugin/test_in_http_pull.rb
@@ -1,5 +1,5 @@
require "helper"
-require "fluent/plugin/in_http_pull.rb"
+require "fluent/plugin/in_http_pull"
require 'ostruct'
diff --git a/test/plugin/test_in_http_pull_auth.rb b/test/plugin/test_in_http_pull_auth.rb
index 876df9c..ec2c505 100644
--- a/test/plugin/test_in_http_pull_auth.rb
+++ b/test/plugin/test_in_http_pull_auth.rb
@@ -79,6 +79,7 @@ class HttpPullInputTestAuth < Test::Unit::TestCase
assert_equal("test", tag)
assert_equal("http://localhost:3939/protected", record["url"])
+ puts "tag=#{tag}, time=#{time}, record=#{record}, time_class=#{time.class}"
assert(time.is_a?(Fluent::EventTime))
assert_equal(401, record["status"])
diff --git a/test/plugin/test_in_http_pull_session_event.rb b/test/plugin/test_in_http_pull_session_event.rb
new file mode 100644
index 0000000..961f5ba
--- /dev/null
+++ b/test/plugin/test_in_http_pull_session_event.rb
@@ -0,0 +1,103 @@
+require "helper"
+require "fluent/plugin/in_http_pull.rb"
+
+require 'ostruct'
+
+class HttpPullInputTestMultiEvent < Test::Unit::TestCase
+ @stub_server = nil
+
+ setup do
+ @stub_server = StubServer.new
+ @stub_server.start
+ end
+
+ teardown do
+ @stub_server.shutdown
+ end
+
+ sub_test_case "multi event" do
+ TEST_NO_LOGIN_PAYLOAD = %[
+ tag test
+ url http://localhost:3939
+ path session_events
+ event_key items
+ multi_event true
+
+ login_path login
+
+ interval 3s
+ format json
+ http_method post
+ ]
+
+ TEST_LOGIN_FAILURE = %[
+ tag test
+ url http://localhost:3939
+ path session_events
+ event_key items
+ multi_event true
+
+ login_path login
+ login_payload {"username": "admin","password": "wrong"}
+
+ interval 3s
+ format json
+ http_method post
+ ]
+ TEST_LOGIN_MULTI_EVENT = %[
+ tag test
+ url http://localhost:3939
+ path session_events
+ event_key items
+ multi_event true
+
+ login_path login
+ login_payload {"username": "admin","password": "pwd"}
+
+ interval 3s
+ format json
+ http_method post
+ ]
+
+ test 'login failed no login payload' do
+
+ assert_raise do
+ create_driver TEST_NO_LOGIN_PAYLOAD
+ end
+ end
+
+ test 'login failed wrong credential' do
+
+ d = create_driver TEST_LOGIN_FAILURE
+ d.run(timeout: 5) do
+ sleep 4
+ end
+ assert_equal(1, d.events.length)
+ d.events.each do |tag, time, record|
+ assert_equal(401, record['status'])
+ end
+ end
+
+ test 'session multi events' do
+ d = create_driver TEST_LOGIN_MULTI_EVENT
+ d.run(timeout: 5) do
+ sleep 4
+ end
+
+ assert_equal(2, d.events.length)
+
+ uuids = []
+ d.events.each do |tag, time, record|
+ uuids.push(record['message']['meta']['uuid'])
+ end
+ assert_equal(["c51d9e82","b1b5686d"], uuids)
+ end
+
+ end
+
+ private
+
+ def create_driver(conf)
+ Fluent::Test::Driver::Input.new(Fluent::Plugin::HttpPullInput).configure(conf)
+ end
+end