Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/easy_stalk/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def initialize
log.progname = Module.nesting.last.name
end
@default_worker_on_fail = Proc.new { |job_class, job_body, ex|
EasyStalk.logger.error "Worker for #{job_class} on tube[#{job_class.tube_name}] failed #{ex.message}"
EasyStalk.logger.error "Worker for #{job_class} on tube[#{job_class.get_tube_name}] failed #{ex.message}"
EasyStalk.logger.error ex.backtrace.join("\n")
}

Expand Down
81 changes: 43 additions & 38 deletions lib/easy_stalk/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,70 +10,75 @@ class Job
DEFAULT_SERIALIZABLE_CONTEXT_KEYS = []

def self.tube_name(tube=nil)
if tube
@tube_name = tube
else
tube_prefix + (@tube_name || name.split('::').last)
end
@tube_name = tube
end
def self.get_tube_name
get_tube_prefix + (@tube_name || name.split('::').last)
end

def self.tube_prefix(prefix=nil)
if prefix
@tube_prefix = prefix
else
@tube_prefix || EasyStalk.configuration.default_tube_prefix
define_singleton_method :get_tube_prefix do
prefix
end
end
def self.get_tube_prefix
EasyStalk.configuration.default_tube_prefix
end

def self.retry_times(attempts=nil)
# max number of times to retry job before burying
define_singleton_method :get_retry_times do
attempts
end
end
def self.get_retry_times
EasyStalk.configuration.default_retry_times
end

def self.priority(pri=nil)
# integer < 2**32. 0 is highest
if pri
@priority = pri
else
@priority || EasyStalk.configuration.default_priority
define_method :priority do
pri
end
end
def priority
EasyStalk.configuration.default_priority
end

def self.time_to_run(seconds=nil)
# integer seconds to run this job
if seconds
@time_to_run = seconds
else
@time_to_run || EasyStalk.configuration.default_time_to_run
define_method :time_to_run do
seconds
end
end
def time_to_run
EasyStalk.configuration.default_time_to_run
end

def self.delay(seconds=nil)
# integer seconds before job is in ready queue
if seconds
@delay = seconds
else
@delay || EasyStalk.configuration.default_delay
define_method :delay do
seconds
end
end

def self.retry_times(attempts=nil)
# max number of times to retry job before burying
if attempts
@retry_times = attempts
else
@retry_times || EasyStalk.configuration.default_retry_times
end
def delay
EasyStalk.configuration.default_delay
end

def self.serializable_context_keys(*keys)
if keys.size > 0
@serializable_context_keys = keys
else
@serializable_context_keys || DEFAULT_SERIALIZABLE_CONTEXT_KEYS
define_method :serializable_context_keys do
keys
end
end
def serializable_context_keys
DEFAULT_SERIALIZABLE_CONTEXT_KEYS
end

def enqueue(beanstalk_connection, priority: nil, time_to_run: nil, delay: nil, delay_until: nil)
tube = beanstalk_connection.tubes[self.class.tube_name]
pri = priority || self.class.priority
ttr = time_to_run || self.class.time_to_run
delay = delay || self.class.delay
tube = beanstalk_connection.tubes[self.class.get_tube_name]
pri = priority || self.priority
ttr = time_to_run || self.time_to_run
delay = delay || self.delay

if delay_until && DateTime === delay_until
days = delay_until - DateTime.now
Expand All @@ -84,7 +89,7 @@ def enqueue(beanstalk_connection, priority: nil, time_to_run: nil, delay: nil, d
end

def job_data
data = context.to_h.select { |key, value| self.class.serializable_context_keys.include? key }
data = context.to_h.select { |key, value| self.serializable_context_keys.include? key }
JSON.dump(data)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/easy_stalk/tasks.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

tubes = if args.extras.size > 0
EasyStalk::Job.descendants.select do |job|
tubs.include?(job.tube_name)
tubs.include?(job.get_tube_name)
end
end

Expand Down
7 changes: 4 additions & 3 deletions lib/easy_stalk/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def work(job_classes = nil, on_fail: nil)
@cancelled = false

tube_class_hash = Hash[
job_classes.map { |cls| [cls.tube_name, cls] }
job_classes.map { |cls| [cls.get_tube_name, cls] }
]

pool = EasyStalk::Client.create_worker_pool(tube_class_hash.keys)
Expand All @@ -40,7 +40,8 @@ def work(job_classes = nil, on_fail: nil)
rescue => ex
# Job issued a failed context or raised an unhandled exception
job_class = tube_class_hash[job.tube]
if job.stats.releases <= job_class.retry_times

if job.stats.releases <= job_class.get_retry_times
# Re-enqueue with stepped delay
release_with_delay(job)
else
Expand All @@ -54,7 +55,7 @@ def work(job_classes = nil, on_fail: nil)
end
end

jobs_list = job_classes.map { |job_class| "#{job_class} on tube #{job_class.tube_name}" }.join(", ")
jobs_list = job_classes.map { |job_class| "#{job_class} on tube #{job_class.get_tube_name}" }.join(", ")
EasyStalk.logger.info "Worker running #{jobs_list} has been stopped"
end

Expand Down
Loading