-
Notifications
You must be signed in to change notification settings - Fork 290
Worker options
The queue option associates a queue with a worker, each queue can have only one associated worker. If you define two or more workers for the same queue, the latest loaded worker will be used as the queue worker. Default 'default'.
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: 'my_queue'
def perform(sqs_msg, body); end
endYou can also pass a block to define the queue:
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: ->{ "#{ENV['APPLICATION_ENV']}_my_queue" }
# shoryuken_options queue: ->{ "#{Socket.gethostname}_my_queue" }
def perform(sqs_msg, body); end
endWhen delete is true, Shoryuken deletes the messages after their consumption, but only in case the worker doesn't raise any exception during the consumption. Default false.
As delete is false by default, remember to set it to true or call sqs_msg.delete before ending the perform method, otherwise the messages will get back to the queue, becoming available to be consumed again.
When batch is true Shoryuken sends the messages in batches to the workers instead of individually. Default false.
One of the advantages of batch is when you use APIs that accept batch, for example Keen IO.
The BatchableKeenIOWorker example below is more efficient than the KeenIOWorker. This better efficiency isn't a general rule, it depends on the use case.
class KeenIOWorker
include Shoryuken::Worker
shoryuken_options queue: 'keen_io', delete: true, body_parser: :json
def perform(sqs_msg, event)
Keen.publish 'stats', event
end
end
class BatchableKeenIOWorker
include Shoryuken::Worker
shoryuken_options queue: 'keen_io', delete: true, batch: true, body_parser: :json
def perform(sqs_msgs, events)
Keen.publish_batch 'stats' => events
end
endThe body_parser allows to parse the body before calling the perform method. It accepts symbols: :json or :text or a block returning the body or a class that responds to .parse. Default :text.
class MyWorker
include Shoryuken::Worker
shoryuken_options queue: 'default', body_parser: :json
# shoryuken_options queue: 'default', body_parser: ->(sqs_msg){ REXML::Document.new(sqs_msg.body) }
# shoryuken_options queue: 'default', body_parser: JSON
def perform(sqs_msg, hash)
puts hash['name']
sqs_msg.delete
end
end