diff --git a/cloud-crowd.gemspec b/cloud-crowd.gemspec index f802ebd..46c4a64 100644 --- a/cloud-crowd.gemspec +++ b/cloud-crowd.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'cloud-crowd' - s.version = '0.6.2' # Keep version in sync with cloud-cloud.rb + s.version = '0.6.3' # Keep version in sync with cloud-cloud.rb s.date = '2011-04-14' s.homepage = "http://wiki.github.com/documentcloud/cloud-crowd" @@ -26,8 +26,8 @@ Gem::Specification.new do |s| '--main' << 'README' << '--all' - s.add_dependency 'sinatra', ['~> 0.9'] - s.add_dependency 'activerecord', ['~> 2.3'] + s.add_dependency 'sinatra', ['~> 1.2.6'] + s.add_dependency 'activerecord', ['~> 3.0'] s.add_dependency 'json', ['>= 1.1.7'] s.add_dependency 'rest-client', ['>= 1.4'] s.add_dependency 'thin', ['>= 1.2.4'] diff --git a/lib/cloud-crowd.rb b/lib/cloud-crowd.rb index 498fe87..63ceecc 100644 --- a/lib/cloud-crowd.rb +++ b/lib/cloud-crowd.rb @@ -4,7 +4,7 @@ # Common Gems: require 'rubygems' -gem 'activerecord', '~> 2.0' +gem 'activerecord' gem 'json' gem 'rest-client' gem 'sinatra' @@ -29,6 +29,9 @@ require 'net/http' require 'cloud_crowd/exceptions' +#require 'logger' +#ActiveRecord::Base.logger = Logger.new(STDERR) + module CloudCrowd # Autoload all the CloudCrowd internals. @@ -45,7 +48,7 @@ module CloudCrowd autoload :WorkUnit, 'cloud_crowd/models' # Keep this version in sync with the gemspec. - VERSION = '0.6.2' + VERSION = '0.6.3' # Increment the schema version when there's a backwards incompatible change. SCHEMA_VERSION = 4 @@ -160,13 +163,16 @@ def display_status(status) # If you wish to have certain nodes be specialized to only handle certain # Actions, then install only those into the actions directory. def actions + # Load the bootstrap file + require File.expand_path(File.join(Dir.pwd, 'application')) if File::exists?('application.rb') && self.node? + return @actions if @actions @actions = action_paths.inject({}) do |memo, path| name = File.basename(path, File.extname(path)) require path memo[name] = Module.const_get(Inflector.camelize(name)) memo - end + end rescue NameError => e adjusted_message = "One of your actions failed to load. Please ensure that the name of your action class can be deduced from the name of the file. ex: 'word_count.rb' => 'WordCount'\n#{e.message}" raise NameError.new(adjusted_message, e.name) diff --git a/lib/cloud_crowd/models.rb b/lib/cloud_crowd/models.rb index d1ba6e3..ef4f4e0 100644 --- a/lib/cloud_crowd/models.rb +++ b/lib/cloud_crowd/models.rb @@ -8,13 +8,13 @@ def self.included(klass) klass.class_eval do # Note that COMPLETE and INCOMPLETE are unions of other states. - named_scope 'processing', :conditions => {:status => PROCESSING} - named_scope 'succeeded', :conditions => {:status => SUCCEEDED} - named_scope 'failed', :conditions => {:status => FAILED} - named_scope 'splitting', :conditions => {:status => SPLITTING} - named_scope 'merging', :conditions => {:status => MERGING} - named_scope 'complete', :conditions => {:status => COMPLETE} - named_scope 'incomplete', :conditions => {:status => INCOMPLETE} + scope 'processing', :conditions => {:status => PROCESSING} + scope 'succeeded', :conditions => {:status => SUCCEEDED} + scope 'failed', :conditions => {:status => FAILED} + scope 'splitting', :conditions => {:status => SPLITTING} + scope 'merging', :conditions => {:status => MERGING} + scope 'complete', :conditions => {:status => COMPLETE} + scope 'incomplete', :conditions => {:status => INCOMPLETE} end end diff --git a/lib/cloud_crowd/models/job.rb b/lib/cloud_crowd/models/job.rb index 22cab42..ef86721 100644 --- a/lib/cloud_crowd/models/job.rb +++ b/lib/cloud_crowd/models/job.rb @@ -13,12 +13,12 @@ class Job < ActiveRecord::Base validates_presence_of :status, :inputs, :action, :options - before_validation_on_create :set_initial_status + before_validation :set_initial_status, :on => :create after_create :queue_for_workers before_destroy :cleanup_assets # Jobs that were last updated more than N days ago. - named_scope :older_than, lambda {|num| {:conditions => ['updated_at < ?', num.days.ago]} } + scope :older_than, lambda {|num| {:conditions => ['updated_at < ?', num.days.ago]} } # Create a Job from an incoming JSON request, and add it to the queue. def self.create_from_request(h) diff --git a/lib/cloud_crowd/models/node_record.rb b/lib/cloud_crowd/models/node_record.rb index b0f7db3..9e6a9fc 100644 --- a/lib/cloud_crowd/models/node_record.rb +++ b/lib/cloud_crowd/models/node_record.rb @@ -12,7 +12,7 @@ class NodeRecord < ActiveRecord::Base after_destroy :redistribute_work_units # Available Nodes haven't used up their maxiumum number of workers yet. - named_scope :available, { + scope :available, { :conditions => ['(max_workers is null or (select count(*) from work_units where node_record_id = node_records.id) < max_workers)'], :order => 'updated_at asc' } diff --git a/lib/cloud_crowd/models/work_unit.rb b/lib/cloud_crowd/models/work_unit.rb index 0a786f2..26c22e3 100644 --- a/lib/cloud_crowd/models/work_unit.rb +++ b/lib/cloud_crowd/models/work_unit.rb @@ -21,9 +21,9 @@ class WorkUnit < ActiveRecord::Base validates_presence_of :job_id, :status, :input, :action # Available WorkUnits are waiting to be distributed to Nodes for processing. - named_scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} + scope :available, {:conditions => {:reservation => nil, :worker_pid => nil, :status => INCOMPLETE}} # Reserved WorkUnits have been marked for distribution by a central server process. - named_scope :reserved, lambda {|reservation| + scope :reserved, lambda {|reservation| {:conditions => {:reservation => reservation}, :order => 'updated_at asc'} } @@ -43,14 +43,14 @@ def self.distribute_to_nodes # Find the available nodes, and determine what actions we're capable # of running at the moment. - available_nodes = NodeRecord.available + available_nodes = NodeRecord.available.all available_actions = available_nodes.map {|node| node.actions }.flatten.uniq filter = "action in (#{available_actions.map{|a| "'#{a}'"}.join(',')})" # Reserve a handful of available work units. WorkUnit.cancel_reservations(reservation) if reservation return unless reservation = WorkUnit.reserve_available(:limit => RESERVATION_LIMIT, :conditions => filter) - work_units = WorkUnit.reserved(reservation) + work_units = WorkUnit.reserved(reservation).all # Round robin through the nodes and units, sending the unit if the node # is able to process it. diff --git a/lib/cloud_crowd/node.rb b/lib/cloud_crowd/node.rb index c10ffb8..de63c62 100644 --- a/lib/cloud_crowd/node.rb +++ b/lib/cloud_crowd/node.rb @@ -198,4 +198,4 @@ def shut_down end -end \ No newline at end of file +end diff --git a/lib/cloud_crowd/server.rb b/lib/cloud_crowd/server.rb index 5bd78f6..bc4de87 100644 --- a/lib/cloud_crowd/server.rb +++ b/lib/cloud_crowd/server.rb @@ -111,6 +111,7 @@ class Server < Sinatra::Base # they mark it back on the central server and exit. Triggers distribution # of pending work units. put '/work/:work_unit_id' do + puts "Work unit #{params[:work_unit_id]} reported status #{params[:status]}" case params[:status] when 'succeeded' then current_work_unit.finish(params[:output], params[:time]) when 'failed' then current_work_unit.fail(params[:output], params[:time]) diff --git a/lib/cloud_crowd/worker.rb b/lib/cloud_crowd/worker.rb index 516dd2b..636e763 100644 --- a/lib/cloud_crowd/worker.rb +++ b/lib/cloud_crowd/worker.rb @@ -31,6 +31,7 @@ def initialize(node, unit) def complete_work_unit(result) keep_trying_to "complete work unit" do data = base_params.merge({:status => 'succeeded', :output => result}) + log data.inspect @node.central["/work/#{data[:id]}"].put(data) log "finished #{display_work_unit} in #{data[:time]} seconds" end @@ -40,6 +41,7 @@ def complete_work_unit(result) def fail_work_unit(exception) keep_trying_to "mark work unit as failed" do data = base_params.merge({:status => 'failed', :output => {'output' => exception.message}.to_json}) + log data.inspect @node.central["/work/#{data[:id]}"].put(data) log "failed #{display_work_unit} in #{data[:time]} seconds\n#{exception.message}\n#{exception.backtrace}" end