diff --git a/lib/logstash/outputs/logger.rb b/lib/logstash/outputs/logger.rb new file mode 100644 index 0000000..d503aed --- /dev/null +++ b/lib/logstash/outputs/logger.rb @@ -0,0 +1,296 @@ +# encoding: utf-8 + +require "stomp" + +# == Example STOMP call back logger class. +# +# Optional callback methods: +# +# * on_connecting: connection starting +# * on_connected: successful connect +# * on_connectfail: unsuccessful connect (will usually be retried) +# * on_disconnect: successful disconnect +# +# * on_miscerr: on miscellaneous xmit/recv errors +# +# * on_publish: publish called +# * on_subscribe: subscribe called +# * on_unsubscribe: unsubscribe called +# +# * on_begin: begin called +# * on_ack: ack called +# * on_nack: nack called +# * on_commit: commit called +# * on_abort: abort called +# +# * on_receive: receive called and successful +# +# * on_ssl_connecting: SSL connection starting +# * on_ssl_connected: successful SSL connect +# * on_ssl_connectfail: unsuccessful SSL connect (will usually be retried) +# +# * on_hbread_fail: unsuccessful Heartbeat read +# * on_hbwrite_fail: unsuccessful Heartbeat write +# * on_hbfire: on any send or receive heartbeat +# +# All methods are optional, at the user's requirements. +# +# If a method is not provided, it is not called (of course.) +# +# IMPORTANT NOTE: in general, call back logging methods *SHOULD* not raise exceptions, +# otherwise the underlying STOMP connection may fail in mysterious ways. +# +# There are two useful exceptions to this rule for: +# +# * on_connectfail +# * on_ssl_connectfail +# +# These two methods can raise a Stomp::Errors::LoggerConnectionError. If this +# exception is raised, it is passed up the chain to the caller. +# +# Callback parameters: are a copy of the @parameters instance variable for +# the Stomp::Connection. +# +class StompLogger < Stomp::NullLogger + + # Initialize a new callback logger instance. + def initialize(logger = nil) + @log = logger || super() + @log.info("Logger initialization complete.") + end + + def marshal_dump + [] + end + + def marshal_load(array) + _init + end + + # Log connecting events + def on_connecting(parms) + begin + @log.debug "Connecting: #{info(parms)}" + rescue + @log.debug "Connecting oops" + end + end + + # Log connected events + def on_connected(parms) + begin + @log.debug "Connected: #{info(parms)}" + rescue + @log.debug "Connected oops" + end + end + + # Log connectfail events + def on_connectfail(parms) + begin + @log.debug "Connect Fail #{info(parms)}" + rescue + @log.debug "Connect Fail oops" + end +=begin + # An example LoggerConnectionError raise + @log.debug "Connect Fail, will raise" + raise Stomp::Error::LoggerConnectionError.new("quit from connect fail") +=end + end + + # Log disconnect events + def on_disconnect(parms) + begin + @log.debug "Disconnected #{info(parms)}" + rescue + @log.debug "Disconnected oops" + end + end + + # Log miscellaneous errors + def on_miscerr(parms, errstr) + begin + @log.debug "Miscellaneous Error #{info(parms)}" + @log.debug "Miscellaneous Error String #{errstr}" + rescue + @log.debug "Miscellaneous Error oops" + end + end + + # Log Subscribe + def on_subscribe(parms, headers) + begin + @log.debug "Subscribe Parms #{info(parms)}" + @log.debug "Subscribe Headers #{headers}" + rescue + @log.debug "Subscribe oops" + end + end + + # Log UnSubscribe + def on_unsubscribe(parms, headers) + begin + @log.debug "UnSubscribe Parms #{info(parms)}" + @log.debug "UnSubscribe Headers #{headers}" + rescue + @log.debug "UnSubscribe oops" + end + end + + # Log Publish + def on_publish(parms, message, headers) + begin + @log.debug "Publish Parms #{info(parms)}" + @log.debug "Publish Message #{message}" + @log.debug "Publish Headers #{headers}" + rescue + @log.debug "Publish oops" + end + end + + # Log Receive + def on_receive(parms, result) + begin + @log.debug "Receive Parms #{info(parms)}" + @log.debug "Receive Result #{result}" + rescue + @log.debug "Receive oops" + end + end + + # Log Begin + def on_begin(parms, headers) + begin + @log.debug "Begin Parms #{info(parms)}" + @log.debug "Begin Result #{headers}" + rescue + @log.debug "Begin oops" + end + end + + # Log Ack + def on_ack(parms, headers) + begin + @log.debug "Ack Parms #{info(parms)}" + @log.debug "Ack Result #{headers}" + rescue + @log.debug "Ack oops" + end + end + + # Log NAck + def on_nack(parms, headers) + begin + @log.debug "NAck Parms #{info(parms)}" + @log.debug "NAck Result #{headers}" + rescue + @log.debug "NAck oops" + end + end + + # Log Commit + def on_commit(parms, headers) + begin + @log.debug "Commit Parms #{info(parms)}" + @log.debug "Commit Result #{headers}" + rescue + @log.debug "Commit oops" + end + end + + # Log Abort + def on_abort(parms, headers) + begin + @log.debug "Abort Parms #{info(parms)}" + @log.debug "Abort Result #{headers}" + rescue + @log.debug "Abort oops" + end + end + + # Stomp 1.1+ - heart beat read (receive) failed. + def on_hbread_fail(parms, ticker_data = {}) + begin + @log.debug "Hbreadf Parms #{info(parms)}" + @log.debug "Hbreadf Result #{ticker_data.inspect}" + rescue + @log.debug "Hbreadf oops" + end + end + + # Stomp 1.1+ - heart beat send (transmit) failed. + def on_hbwrite_fail(parms, ticker_data = {}) + begin + @log.debug "Hbwritef Parms #{info(parms)}" + @log.debug "Hbwritef Result #{ticker_data.inspect}" + rescue + @log.debug "Hbwritef oops" + end + end + + # Log SSL connection start. + def on_ssl_connecting(parms) + begin + @log.debug "SSL Connecting Parms #{info(parms)}" + rescue + @log.debug "SSL Connecting oops" + end + end + + # Log a successful SSL connect. + def on_ssl_connected(parms) + begin + @log.debug "SSL Connected Parms #{info(parms)}" + rescue + @log.debug "SSL Connected oops" + end + end + + # Log an unsuccessful SSL connect. + def on_ssl_connectfail(parms) + begin + @log.debug "SSL Connect Fail Parms #{info(parms)}" + @log.debug "SSL Connect Fail Exception #{parms[:ssl_exception]}, #{parms[:ssl_exception].message}" + rescue + @log.debug "SSL Connect Fail oops" + end +=begin + # An example LoggerConnectionError raise + @log.debug "SSL Connect Fail, will raise" + raise Stomp::Error::LoggerConnectionError.new("quit from SSL connect") +=end + end + + # Log heart beat fires + def on_hbfire(parms, srind, firedata = {}) + begin + @log.debug "HeartBeat Fire Parms #{info(parms)}" + @log.debug "HeartBeat Fire Send/Receive #{srind}" + rescue + @log.debug "HeartBeat Fire oops" + end + end + + private + + # Example information extract. + def info(parms) + # + # Available in the parms Hash: + # parms[:cur_host] + # parms[:cur_port] + # parms[:cur_login] + # parms[:cur_passcode] + # parms[:cur_ssl] + # parms[:cur_recondelay] + # parms[:cur_parseto] + # parms[:cur_conattempts] + # parms[:openstat] + # + # For the on_ssl_connectfail callback these are also available: + # parms[:ssl_exception] + # + "Host: #{parms[:cur_host]}, Port: #{parms[:cur_port]}, Login: #{parms[:cur_login]}, Passcode: #{parms[:cur_passcode]}, ssl: #{parms[:cur_ssl]}" + end +end # of class diff --git a/lib/logstash/outputs/stomp.rb b/lib/logstash/outputs/stomp.rb index 627f8c0..9439ddd 100644 --- a/lib/logstash/outputs/stomp.rb +++ b/lib/logstash/outputs/stomp.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "logstash/outputs/base" +require "logstash/outputs/logger" require "logstash/namespace" @@ -36,55 +37,57 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base # Enable debugging output? config :debug, :validate => :boolean, :default => false - private - def connect + # this output is thread-safe + concurrency :shared + + public + def register + require "stomp" begin - @client.connect - @logger.debug("Connected to stomp server") if @client.connected? + params = { :reliable => true, + :max_reconnect_attempts => 3, + :logger => StompLogger.new(@logger), + :hosts => [ { :login => @user, + :passcode => @password.value, + :host => @host, + :port => @port } ] } + @client = Stomp::Client.new(params) + @logger.debug("Connected to stomp server") if @client.open? rescue => e @logger.debug("Failed to connect to stomp server, will retry", :exception => e, :backtrace => e.backtrace) sleep 2 retry end - end - - - public - def register - require "onstomp" - @client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value) - @client.host = @vhost if @vhost - - # Handle disconnects - @client.on_connection_closed { - connect - } - - connect end # def register public def close @logger.warn("Disconnecting from stomp broker") - @client.disconnect if @client.connected? + @client.close end # def close def multi_receive(events) - @logger.debug("stomp sending events in batch", { :host => @host, :events => events.length }) + tx_name = "tx-#{Random.rand(2**32..2**64-1)}" + @logger.debug("sending #{events.length} events in transaction #{tx_name}") - @client.transaction do |t| - events.each { |event| - headers = Hash.new + begin + @client.begin tx_name + events.each do |event| + headers = Hash.new(:transaction => tx_name) if @headers @headers.each do |k,v| headers[k] = event.sprintf(v) end end - t.send(event.sprintf(@destination), event.to_json, headers) - } + @client.publish(event.sprintf(@destination), event.to_json, headers) + end + @client.commit tx_name + rescue Exception => exception + @logger.error("Error while sending #{events.length} events in transaction #{tx_name}", :error => exception) end - end # def receive + + end # def multi_receive end # class LogStash::Outputs::Stomp diff --git a/logstash-output-stomp.gemspec b/logstash-output-stomp.gemspec index d146b92..1ab3313 100644 --- a/logstash-output-stomp.gemspec +++ b/logstash-output-stomp.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-stomp' - s.version = '3.0.4' + s.version = '3.0.5' s.licenses = ['Apache License (2.0)'] s.summary = "Send events to a stomp server" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -21,7 +21,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" - s.add_runtime_dependency 'onstomp' + s.add_runtime_dependency 'stomp', '~> 1.4.3' s.add_development_dependency 'logstash-devutils' end