Skip to content

Commit e123dbb

Browse files
author
Ryan Sundberg
committed
Optional callback mode requires acknowledging rabbitmq emssages
1 parent de5d2a3 commit e123dbb

File tree

1 file changed

+18
-5
lines changed

1 file changed

+18
-5
lines changed

lib/fluent/plugin/in_amqp.rb

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ class AMQPInput < Input
4949
config_param :auth_mechanism, :string, default: nil
5050
# milliseconds to delay between messages
5151
config_param :delay, :integer, default: 0
52+
config_param :callback, :bool, default: false
5253

5354
def initialize
5455
super
@@ -101,13 +102,17 @@ def start
101102
@ack_thread = Thread.new {
102103
@stop = false
103104
loop do
104-
tag = @ack_queue.pop
105+
[tag, success] = @ack_queue.pop
105106
case tag
106107
when :stop
107108
@stop = true
108109
else
109110
begin
110-
@channel.acknowledge(tag, false)
111+
if success
112+
@channel.acknowledge(tag, false)
113+
else
114+
@channel.reject(tag, false)
115+
end
111116
rescue => e
112117
log.error "Failed to acknowledge event", tag: tag, error: e
113118
end
@@ -121,10 +126,18 @@ def start
121126

122127
q.subscribe(:manual_ack => @manual_ack) do |delivery, meta, msg|
123128
log.debug "Recieved message #{@msg}"
124-
payload = parse_payload(msg)
129+
body = parse_payload(msg)
130+
if @manual_ack && @callback
131+
cb = Proc.new do | success |
132+
@ack_queue.push([delivery.delivery_tag, success])
133+
end
134+
payload = { :payload => payload, :callback => cb }
135+
else
136+
payload = body
137+
end
125138
router.emit(parse_tag(delivery, meta), parse_time(meta), payload)
126-
if @manual_ack
127-
@ack_queue.push(delivery.delivery_tag)
139+
if @manual_ack && !@callback
140+
@ack_queue.push([delivery.delivery_tag, true])
128141
end
129142
if @delay > 0
130143
sleep(delay / 1000)

0 commit comments

Comments
 (0)