Skip to content

Attempt at fixing message loss #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 296 additions & 0 deletions lib/logstash/outputs/logger.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
# encoding: utf-8

require "stomp"

# == Example STOMP call back logger class.
#
# Optional callback methods:
#
# * on_connecting: connection starting
# * on_connected: successful connect
# * on_connectfail: unsuccessful connect (will usually be retried)
# * on_disconnect: successful disconnect
#
# * on_miscerr: on miscellaneous xmit/recv errors
#
# * on_publish: publish called
# * on_subscribe: subscribe called
# * on_unsubscribe: unsubscribe called
#
# * on_begin: begin called
# * on_ack: ack called
# * on_nack: nack called
# * on_commit: commit called
# * on_abort: abort called
#
# * on_receive: receive called and successful
#
# * on_ssl_connecting: SSL connection starting
# * on_ssl_connected: successful SSL connect
# * on_ssl_connectfail: unsuccessful SSL connect (will usually be retried)
#
# * on_hbread_fail: unsuccessful Heartbeat read
# * on_hbwrite_fail: unsuccessful Heartbeat write
# * on_hbfire: on any send or receive heartbeat
#
# All methods are optional, at the user's requirements.
#
# If a method is not provided, it is not called (of course.)
#
# IMPORTANT NOTE: in general, call back logging methods *SHOULD* not raise exceptions,
# otherwise the underlying STOMP connection may fail in mysterious ways.
#
# There are two useful exceptions to this rule for:
#
# * on_connectfail
# * on_ssl_connectfail
#
# These two methods can raise a Stomp::Errors::LoggerConnectionError. If this
# exception is raised, it is passed up the chain to the caller.
#
# Callback parameters: are a copy of the @parameters instance variable for
# the Stomp::Connection.
#
class StompLogger < Stomp::NullLogger

# Initialize a new callback logger instance.
def initialize(logger = nil)
@log = logger || super()
@log.info("Logger initialization complete.")
end

def marshal_dump
[]
end

def marshal_load(array)
_init
end

# Log connecting events
def on_connecting(parms)
begin
@log.debug "Connecting: #{info(parms)}"
rescue
@log.debug "Connecting oops"
end
end

# Log connected events
def on_connected(parms)
begin
@log.debug "Connected: #{info(parms)}"
rescue
@log.debug "Connected oops"
end
end

# Log connectfail events
def on_connectfail(parms)
begin
@log.debug "Connect Fail #{info(parms)}"
rescue
@log.debug "Connect Fail oops"
end
=begin
# An example LoggerConnectionError raise
@log.debug "Connect Fail, will raise"
raise Stomp::Error::LoggerConnectionError.new("quit from connect fail")
=end
end

# Log disconnect events
def on_disconnect(parms)
begin
@log.debug "Disconnected #{info(parms)}"
rescue
@log.debug "Disconnected oops"
end
end

# Log miscellaneous errors
def on_miscerr(parms, errstr)
begin
@log.debug "Miscellaneous Error #{info(parms)}"
@log.debug "Miscellaneous Error String #{errstr}"
rescue
@log.debug "Miscellaneous Error oops"
end
end

# Log Subscribe
def on_subscribe(parms, headers)
begin
@log.debug "Subscribe Parms #{info(parms)}"
@log.debug "Subscribe Headers #{headers}"
rescue
@log.debug "Subscribe oops"
end
end

# Log UnSubscribe
def on_unsubscribe(parms, headers)
begin
@log.debug "UnSubscribe Parms #{info(parms)}"
@log.debug "UnSubscribe Headers #{headers}"
rescue
@log.debug "UnSubscribe oops"
end
end

# Log Publish
def on_publish(parms, message, headers)
begin
@log.debug "Publish Parms #{info(parms)}"
@log.debug "Publish Message #{message}"
@log.debug "Publish Headers #{headers}"
rescue
@log.debug "Publish oops"
end
end

# Log Receive
def on_receive(parms, result)
begin
@log.debug "Receive Parms #{info(parms)}"
@log.debug "Receive Result #{result}"
rescue
@log.debug "Receive oops"
end
end

# Log Begin
def on_begin(parms, headers)
begin
@log.debug "Begin Parms #{info(parms)}"
@log.debug "Begin Result #{headers}"
rescue
@log.debug "Begin oops"
end
end

# Log Ack
def on_ack(parms, headers)
begin
@log.debug "Ack Parms #{info(parms)}"
@log.debug "Ack Result #{headers}"
rescue
@log.debug "Ack oops"
end
end

# Log NAck
def on_nack(parms, headers)
begin
@log.debug "NAck Parms #{info(parms)}"
@log.debug "NAck Result #{headers}"
rescue
@log.debug "NAck oops"
end
end

# Log Commit
def on_commit(parms, headers)
begin
@log.debug "Commit Parms #{info(parms)}"
@log.debug "Commit Result #{headers}"
rescue
@log.debug "Commit oops"
end
end

# Log Abort
def on_abort(parms, headers)
begin
@log.debug "Abort Parms #{info(parms)}"
@log.debug "Abort Result #{headers}"
rescue
@log.debug "Abort oops"
end
end

# Stomp 1.1+ - heart beat read (receive) failed.
def on_hbread_fail(parms, ticker_data = {})
begin
@log.debug "Hbreadf Parms #{info(parms)}"
@log.debug "Hbreadf Result #{ticker_data.inspect}"
rescue
@log.debug "Hbreadf oops"
end
end

# Stomp 1.1+ - heart beat send (transmit) failed.
def on_hbwrite_fail(parms, ticker_data = {})
begin
@log.debug "Hbwritef Parms #{info(parms)}"
@log.debug "Hbwritef Result #{ticker_data.inspect}"
rescue
@log.debug "Hbwritef oops"
end
end

# Log SSL connection start.
def on_ssl_connecting(parms)
begin
@log.debug "SSL Connecting Parms #{info(parms)}"
rescue
@log.debug "SSL Connecting oops"
end
end

# Log a successful SSL connect.
def on_ssl_connected(parms)
begin
@log.debug "SSL Connected Parms #{info(parms)}"
rescue
@log.debug "SSL Connected oops"
end
end

# Log an unsuccessful SSL connect.
def on_ssl_connectfail(parms)
begin
@log.debug "SSL Connect Fail Parms #{info(parms)}"
@log.debug "SSL Connect Fail Exception #{parms[:ssl_exception]}, #{parms[:ssl_exception].message}"
rescue
@log.debug "SSL Connect Fail oops"
end
=begin
# An example LoggerConnectionError raise
@log.debug "SSL Connect Fail, will raise"
raise Stomp::Error::LoggerConnectionError.new("quit from SSL connect")
=end
end

# Log heart beat fires
def on_hbfire(parms, srind, firedata = {})
begin
@log.debug "HeartBeat Fire Parms #{info(parms)}"
@log.debug "HeartBeat Fire Send/Receive #{srind}"
rescue
@log.debug "HeartBeat Fire oops"
end
end

private

# Example information extract.
def info(parms)
#
# Available in the parms Hash:
# parms[:cur_host]
# parms[:cur_port]
# parms[:cur_login]
# parms[:cur_passcode]
# parms[:cur_ssl]
# parms[:cur_recondelay]
# parms[:cur_parseto]
# parms[:cur_conattempts]
# parms[:openstat]
#
# For the on_ssl_connectfail callback these are also available:
# parms[:ssl_exception]
#
"Host: #{parms[:cur_host]}, Port: #{parms[:cur_port]}, Login: #{parms[:cur_login]}, Passcode: #{parms[:cur_passcode]}, ssl: #{parms[:cur_ssl]}"
end
end # of class
57 changes: 30 additions & 27 deletions lib/logstash/outputs/stomp.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/outputs/logger"
require "logstash/namespace"


Expand Down Expand Up @@ -36,55 +37,57 @@ class LogStash::Outputs::Stomp < LogStash::Outputs::Base
# Enable debugging output?
config :debug, :validate => :boolean, :default => false

private
def connect
# this output is thread-safe
concurrency :shared

public
def register
require "stomp"
begin
@client.connect
@logger.debug("Connected to stomp server") if @client.connected?
params = { :reliable => true,
:max_reconnect_attempts => 3,
:logger => StompLogger.new(@logger),
:hosts => [ { :login => @user,
:passcode => @password.value,
:host => @host,
:port => @port } ] }
@client = Stomp::Client.new(params)
@logger.debug("Connected to stomp server") if @client.open?
rescue => e
@logger.debug("Failed to connect to stomp server, will retry",
:exception => e, :backtrace => e.backtrace)
sleep 2
retry
end
end


public
def register
require "onstomp"
@client = OnStomp::Client.new("stomp://#{@host}:#{@port}", :login => @user, :passcode => @password.value)
@client.host = @vhost if @vhost

# Handle disconnects
@client.on_connection_closed {
connect
}

connect
end # def register

public
def close
@logger.warn("Disconnecting from stomp broker")
@client.disconnect if @client.connected?
@client.close
end # def close

def multi_receive(events)

@logger.debug("stomp sending events in batch", { :host => @host, :events => events.length })
tx_name = "tx-#{Random.rand(2**32..2**64-1)}"
@logger.debug("sending #{events.length} events in transaction #{tx_name}")

@client.transaction do |t|
events.each { |event|
headers = Hash.new
begin
@client.begin tx_name
events.each do |event|
headers = Hash.new(:transaction => tx_name)
if @headers
@headers.each do |k,v|
headers[k] = event.sprintf(v)
end
end

t.send(event.sprintf(@destination), event.to_json, headers)
}
@client.publish(event.sprintf(@destination), event.to_json, headers)
end
@client.commit tx_name
rescue Exception => exception
@logger.error("Error while sending #{events.length} events in transaction #{tx_name}", :error => exception)
end
end # def receive

end # def multi_receive
end # class LogStash::Outputs::Stomp
Loading