Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cloud-crowd.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'cloud-crowd'
s.version = '0.6.2' # Keep version in sync with cloud-cloud.rb
s.version = '0.6.3' # Keep version in sync with cloud-cloud.rb
s.date = '2011-04-14'

s.homepage = "http://wiki.github.com/documentcloud/cloud-crowd"
Expand All @@ -26,8 +26,8 @@ Gem::Specification.new do |s|
'--main' << 'README' <<
'--all'

s.add_dependency 'sinatra', ['~> 0.9']
s.add_dependency 'activerecord', ['~> 2.3']
s.add_dependency 'sinatra', ['~> 1.2.6']
s.add_dependency 'activerecord', ['~> 3.0']
s.add_dependency 'json', ['>= 1.1.7']
s.add_dependency 'rest-client', ['>= 1.4']
s.add_dependency 'thin', ['>= 1.2.4']
Expand Down
12 changes: 9 additions & 3 deletions lib/cloud-crowd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

# Common Gems:
require 'rubygems'
gem 'activerecord', '~> 2.0'
gem 'activerecord'
gem 'json'
gem 'rest-client'
gem 'sinatra'
Expand All @@ -29,6 +29,9 @@
require 'net/http'
require 'cloud_crowd/exceptions'

#require 'logger'
#ActiveRecord::Base.logger = Logger.new(STDERR)

module CloudCrowd

# Autoload all the CloudCrowd internals.
Expand All @@ -45,7 +48,7 @@ module CloudCrowd
autoload :WorkUnit, 'cloud_crowd/models'

# Keep this version in sync with the gemspec.
VERSION = '0.6.2'
VERSION = '0.6.3'

# Increment the schema version when there's a backwards incompatible change.
SCHEMA_VERSION = 4
Expand Down Expand Up @@ -160,13 +163,16 @@ def display_status(status)
# If you wish to have certain nodes be specialized to only handle certain
# Actions, then install only those into the actions directory.
def actions
# Load the bootstrap file
require File.expand_path(File.join(Dir.pwd, 'application')) if File::exists?('application.rb') && self.node?

return @actions if @actions
@actions = action_paths.inject({}) do |memo, path|
name = File.basename(path, File.extname(path))
require path
memo[name] = Module.const_get(Inflector.camelize(name))
memo
end
end
rescue NameError => e
adjusted_message = "One of your actions failed to load. Please ensure that the name of your action class can be deduced from the name of the file. ex: 'word_count.rb' => 'WordCount'\n#{e.message}"
raise NameError.new(adjusted_message, e.name)
Expand Down
14 changes: 7 additions & 7 deletions lib/cloud_crowd/models.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ def self.included(klass)

klass.class_eval do
# Note that COMPLETE and INCOMPLETE are unions of other states.
named_scope 'processing', :conditions => {:status => PROCESSING}
named_scope 'succeeded', :conditions => {:status => SUCCEEDED}
named_scope 'failed', :conditions => {:status => FAILED}
named_scope 'splitting', :conditions => {:status => SPLITTING}
named_scope 'merging', :conditions => {:status => MERGING}
named_scope 'complete', :conditions => {:status => COMPLETE}
named_scope 'incomplete', :conditions => {:status => INCOMPLETE}
scope 'processing', :conditions => {:status => PROCESSING}
scope 'succeeded', :conditions => {:status => SUCCEEDED}
scope 'failed', :conditions => {:status => FAILED}
scope 'splitting', :conditions => {:status => SPLITTING}
scope 'merging', :conditions => {:status => MERGING}
scope 'complete', :conditions => {:status => COMPLETE}
scope 'incomplete', :conditions => {:status => INCOMPLETE}
end

end
Expand Down
4 changes: 2 additions & 2 deletions lib/cloud_crowd/models/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ class Job < ActiveRecord::Base

validates_presence_of :status, :inputs, :action, :options

before_validation_on_create :set_initial_status
before_validation :set_initial_status, :on => :create
after_create :queue_for_workers
before_destroy :cleanup_assets

# Jobs that were last updated more than N days ago.
named_scope :older_than, lambda {|num| {:conditions => ['updated_at < ?', num.days.ago]} }
scope :older_than, lambda {|num| {:conditions => ['updated_at < ?', num.days.ago]} }

# Create a Job from an incoming JSON request, and add it to the queue.
def self.create_from_request(h)
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_crowd/models/node_record.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class NodeRecord < ActiveRecord::Base
after_destroy :redistribute_work_units

# Available Nodes haven't used up their maxiumum number of workers yet.
named_scope :available, {
scope :available, {
:conditions => ['(max_workers is null or (select count(*) from work_units where node_record_id = node_records.id) < max_workers)'],
:order => 'updated_at asc'
}
Expand Down
8 changes: 4 additions & 4 deletions lib/cloud_crowd/models/work_unit.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ class WorkUnit < ActiveRecord::Base
validates_presence_of :job_id, :status, :input, :action

# Available WorkUnits are waiting to be distributed to Nodes for processing.
named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}}
# Reserved WorkUnits have been marked for distribution by a central server process.
named_scope :reserved, lambda {|reservation|
scope :reserved, lambda {|reservation|
{:conditions => {:reservation => reservation}, :order => 'updated_at asc'}
}

Expand All @@ -43,14 +43,14 @@ def self.distribute_to_nodes

# Find the available nodes, and determine what actions we're capable
# of running at the moment.
available_nodes = NodeRecord.available
available_nodes = NodeRecord.available.all
available_actions = available_nodes.map {|node| node.actions }.flatten.uniq
filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})"

# Reserve a handful of available work units.
WorkUnit.cancel_reservations(reservation) if reservation
return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter)
work_units = WorkUnit.reserved(reservation)
work_units = WorkUnit.reserved(reservation).all

# Round robin through the nodes and units, sending the unit if the node
# is able to process it.
Expand Down
2 changes: 1 addition & 1 deletion lib/cloud_crowd/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,4 @@ def shut_down

end

end
end
1 change: 1 addition & 0 deletions lib/cloud_crowd/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class Server < Sinatra::Base
# they mark it back on the central server and exit. Triggers distribution
# of pending work units.
put '/work/:work_unit_id' do
puts "Work unit #{params[:work_unit_id]} reported status #{params[:status]}"
case params[:status]
when 'succeeded' then current_work_unit.finish(params[:output], params[:time])
when 'failed' then current_work_unit.fail(params[:output], params[:time])
Expand Down
2 changes: 2 additions & 0 deletions lib/cloud_crowd/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def initialize(node, unit)
def complete_work_unit(result)
keep_trying_to "complete work unit" do
data = base_params.merge({:status => 'succeeded', :output => result})
log data.inspect
@node.central["/work/#{data[:id]}"].put(data)
log "finished #{display_work_unit} in #{data[:time]} seconds"
end
Expand All @@ -40,6 +41,7 @@ def complete_work_unit(result)
def fail_work_unit(exception)
keep_trying_to "mark work unit as failed" do
data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json})
log data.inspect
@node.central["/work/#{data[:id]}"].put(data)
log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}"
end
Expand Down