diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index ee317c18..7485e17e 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -29,10 +29,12 @@ jobs: - 3.3 - 3.4 database: [ mysql, postgres, sqlite ] - gemfile: [ rails_7_1, rails_7_2, rails_8_0, rails_main ] + gemfile: [ rails_7_1, rails_7_2, rails_8_0, rails_8_1, rails_main ] exclude: - ruby-version: "3.1" gemfile: rails_8_0 + - ruby-version: "3.1" + gemfile: rails_8_1 - ruby-version: "3.1" gemfile: rails_main services: @@ -52,6 +54,7 @@ jobs: env: TARGET_DB: ${{ matrix.database }} BUNDLE_GEMFILE: ${{ github.workspace }}/gemfiles/${{ matrix.gemfile }}.gemfile + RAILS_ENV: test steps: - name: Checkout code uses: actions/checkout@v4 @@ -68,3 +71,12 @@ jobs: bin/rails db:setup - name: Run tests run: bin/rails test + - name: Upload logs on failure + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: logs-${{ matrix.database }}-${{ matrix.gemfile }}-ruby${{ matrix.ruby-version }}-attempt${{ github.run_attempt }} + path: | + test/dummy/log/test.log + if-no-files-found: ignore + retention-days: 30 diff --git a/.rubocop.yml b/.rubocop.yml index 75df1173..7299253b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -7,3 +7,4 @@ AllCops: TargetRubyVersion: 3.3 Exclude: - "**/*_schema.rb" + - "lib/generators/solid_queue/update/templates/db/*" diff --git a/Appraisals b/Appraisals index 24860528..9f3c8df4 100644 --- a/Appraisals +++ b/Appraisals @@ -15,6 +15,10 @@ appraise "rails-8-0" do gem "railties", "~> 8.0.0" end +appraise "rails-8-1" do + gem "railties", "~> 8.1.0" +end + appraise "rails-main" do gem "railties", github: "rails/rails", branch: "main" end diff --git a/Gemfile.lock b/Gemfile.lock index 32f66b90..a930856e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - solid_queue (1.2.2) + solid_queue (1.2.4) activejob (>= 7.1) activerecord (>= 7.1) concurrent-ruby (>= 1.3.1) diff --git a/README.md b/README.md index 92a018d4..0f3e0b74 100644 --- a/README.md +++ b/README.md @@ -342,7 +342,7 @@ When receiving a `QUIT` signal, if workers still have jobs in-flight, these will If processes have no chance of cleaning up before exiting (e.g. if someone pulls a cable somewhere), in-flight jobs might remain claimed by the processes executing them. Processes send heartbeats, and the supervisor checks and prunes processes with expired heartbeats. Jobs that were claimed by processes with an expired heartbeat will be marked as failed with a `SolidQueue::Processes::ProcessPrunedError`. You can configure both the frequency of heartbeats and the threshold to consider a process dead. See the section below for this. -In a similar way, if a worker is terminated in any other way not initiated by the above signals (e.g. a worker is sent a `KILL` signal), jobs in progress will be marked as failed so that they can be inspected, with a `SolidQueue::Processes::Process::ProcessExitError`. Sometimes a job in particular is responsible for this, for example, if it has a memory leak and you have a mechanism to kill processes over a certain memory threshold, so this will help identifying this kind of situation. +In a similar way, if a worker is terminated in any other way not initiated by the above signals (e.g. a worker is sent a `KILL` signal), jobs in progress will be marked as failed so that they can be inspected, with a `SolidQueue::Processes::ProcessExitError`. Sometimes a job in particular is responsible for this, for example, if it has a memory leak and you have a mechanism to kill processes over a certain memory threshold, so this will help identifying this kind of situation. ### Database configuration diff --git a/Rakefile b/Rakefile index bdb0ea23..41254701 100644 --- a/Rakefile +++ b/Rakefile @@ -5,7 +5,9 @@ require "bundler/setup" APP_RAKEFILE = File.expand_path("test/dummy/Rakefile", __dir__) load "rails/tasks/engine.rake" -load "rails/tasks/statistics.rake" +if Rails::VERSION::MAJOR < 8 + load "rails/tasks/statistics.rake" +end require "bundler/gem_tasks" require "rake/tasklib" diff --git a/UPGRADING.md b/UPGRADING.md index 51ab06a8..2fc0d8e8 100644 --- a/UPGRADING.md +++ b/UPGRADING.md @@ -1,4 +1,21 @@ -# Upgrading to version 1.x +# Upgrading to version 1.3 +There's a new migration in this version that can be installed via: +```bash +bin/rails solid_queue:update +``` +which is a new generator to facilitate updates. This will use the `queue` database by default, but if you're using a different database name for Solid Queue, you can install the new migrations in the right place with: +```bash +DATABASE=your-solid-queue-db-name bin/rails solid_queue:update +``` + +Finally, the migration needs to be run with: +```bash +bin/rails db:migrate +``` + +The migration affects the tables `solid_queue_claimed_executions` and `solid_queue_processes` tables. It's not mandatory: everything will continue working as before without it, only a deprecation warning will be emitted. The migration will be mandatory in the next major version (2.0). + +# Upgrading to version >=1.0, < 1.3 The value returned for `enqueue_after_transaction_commit?` has changed to `true`, and it's no longer configurable. If you want to change this, you need to use Active Job's configuration options. # Upgrading to version 0.9.x diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 5d0a4057..a5e211cb 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -1,7 +1,20 @@ # frozen_string_literal: true class SolidQueue::ClaimedExecution < SolidQueue::Execution - belongs_to :process + def self.process_name_column_exists? + table_exists? && column_names.include?("process_name") + rescue ActiveRecord::Tenanted::TenantDoesNotExistError + true + end + + if process_name_column_exists? + belongs_to :process, primary_key: :name, foreign_key: :process_name + else + warn_about_pending_migrations if table_exists? + + belongs_to :process + attr_accessor :process_name + end scope :orphaned, -> { where.missing(:process) } @@ -12,12 +25,16 @@ def success? end class << self - def claiming(job_ids, process_id, &block) - job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } + def claiming(job_ids, process, &block) + process_data = { process_id: process.id }.tap do |hsh| + hsh[:process_name] = process.name if process_name_column_exists? + end + + job_data = Array(job_ids).collect { |job_id| { job_id: job_id }.merge(process_data) } - SolidQueue.instrument(:claim, process_id: process_id, job_ids: job_ids) do |payload| + SolidQueue.instrument(:claim, job_ids: job_ids, **process_data) do |payload| insert_all!(job_data) - where(job_id: job_ids, process_id: process_id).load.tap do |claimed| + where(job_id: job_ids, process_id: process.id).load.tap do |claimed| block.call(claimed) payload[:size] = claimed.size @@ -46,7 +63,8 @@ def fail_all_with(error) execution.unblock_next_job end - payload[:process_ids] = executions.map(&:process_id).uniq + payload[:process_ids] = executions.map(&:process_id).uniq.presence + payload[:process_names] = executions.map(&:process_name).uniq.presence payload[:job_ids] = executions.map(&:job_id).uniq payload[:size] = executions.size end @@ -76,7 +94,7 @@ def perform end def release - SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id) do + SolidQueue.instrument(:release_claimed, job_id: job.id, process_id: process_id, process_name: process_name) do transaction do job.dispatch_bypassing_concurrency_limits destroy! diff --git a/app/models/solid_queue/process/executor.rb b/app/models/solid_queue/process/executor.rb index 8dcd12aa..c2ca9d5b 100644 --- a/app/models/solid_queue/process/executor.rb +++ b/app/models/solid_queue/process/executor.rb @@ -6,7 +6,13 @@ module Executor extend ActiveSupport::Concern included do - has_many :claimed_executions + if ClaimedExecution.process_name_column_exists? + has_many :claimed_executions, primary_key: :name, foreign_key: :process_name + else + warn_about_pending_migrations if ClaimedExecution.table_exists? + + has_many :claimed_executions + end after_destroy :release_all_claimed_executions end diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 35a11292..4c207487 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -7,9 +7,9 @@ class ReadyExecution < Execution assumes_attributes_from_job class << self - def claim(queue_list, limit, process_id) + def claim(queue_list, limit, process) QueueSelector.new(queue_list, self).scoped_relations.flat_map do |queue_relation| - select_and_lock(queue_relation, process_id, limit).tap do |locked| + select_and_lock(queue_relation, process, limit).tap do |locked| limit -= locked.size end end @@ -20,12 +20,12 @@ def aggregated_count_across(queue_list) end private - def select_and_lock(queue_relation, process_id, limit) + def select_and_lock(queue_relation, process, limit) return [] if limit <= 0 transaction do candidates = select_candidates(queue_relation, limit) - lock_candidates(candidates, process_id) + lock_candidates(candidates, process) end end @@ -33,10 +33,10 @@ def select_candidates(queue_relation, limit) queue_relation.ordered.limit(limit).non_blocking_lock.select(:id, :job_id) end - def lock_candidates(executions, process_id) + def lock_candidates(executions, process) return [] if executions.none? - SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process_id) do |claimed| + SolidQueue::ClaimedExecution.claiming(executions.map(&:job_id), process) do |claimed| ids_to_delete = executions.index_by(&:job_id).values_at(*claimed.map(&:job_id)).map(&:id) where(id: ids_to_delete).delete_all end diff --git a/app/models/solid_queue/record.rb b/app/models/solid_queue/record.rb index 0a704d2c..918fe216 100644 --- a/app/models/solid_queue/record.rb +++ b/app/models/solid_queue/record.rb @@ -20,6 +20,18 @@ def supports_insert_conflict_target? connection.supports_insert_conflict_target? end end + + def warn_about_pending_migrations + SolidQueue.deprecator.warn(<<~DEPRECATION) + Solid Queue has pending database migrations. To get the new migration files, run: + rails solid_queue:update + which will install the migration under `db/queue_migrate`. To change the database, run + DATABASE=your-solid-queue-db rails solid_queue:update + Then, apply the migrations with: + rails db:migrate + These migrations will be required after version 2.0 + DEPRECATION + end end end end diff --git a/gemfiles/rails_8_1.gemfile b/gemfiles/rails_8_1.gemfile new file mode 100644 index 00000000..5e111e3d --- /dev/null +++ b/gemfiles/rails_8_1.gemfile @@ -0,0 +1,7 @@ +# This file was generated by Appraisal + +source "https://rubygems.org" + +gem "railties", "~> 8.1.0" + +gemspec path: "../" diff --git a/lib/generators/solid_queue/install/templates/db/queue_schema.rb b/lib/generators/solid_queue/install/templates/db/queue_schema.rb index 85194b6a..760ee615 100644 --- a/lib/generators/solid_queue/install/templates/db/queue_schema.rb +++ b/lib/generators/solid_queue/install/templates/db/queue_schema.rb @@ -15,8 +15,10 @@ t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false + t.string "process_name" t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + t.index [ "process_name" ], name: "index_solid_queue_claimed_executions_on_process_name" end create_table "solid_queue_failed_executions", force: :cascade do |t| @@ -60,7 +62,7 @@ t.datetime "created_at", null: false t.string "name", null: false t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index [ "name" ], name: "index_solid_queue_processes_on_name", unique: true t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id" end diff --git a/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb b/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb new file mode 100644 index 00000000..439b9504 --- /dev/null +++ b/lib/generators/solid_queue/update/templates/db/link_claimed_executions_with_processes_through_name.rb @@ -0,0 +1,30 @@ +class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[<%= ActiveRecord::VERSION::STRING.to_f %>] + def up + unless connection.column_exists?(:solid_queue_claimed_executions, :process_name) + add_column :solid_queue_claimed_executions, :process_name, :string + add_index :solid_queue_claimed_executions, :process_name + end + + unless connection.index_exists?(:solid_queue_processes, :name) + add_index :solid_queue_processes, :name, unique: true + end + + if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + remove_index :solid_queue_processes, [ :name, :supervisor_id ] + end + end + + def down + if connection.column_exists?(:solid_queue_claimed_executions, :process_name) + remove_column :solid_queue_claimed_executions, :process_name + end + + if connection.index_exists?(:solid_queue_processes, :name) + remove_index :solid_queue_processes, :name + end + + unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true + end + end +end diff --git a/lib/generators/solid_queue/update/update_generator.rb b/lib/generators/solid_queue/update/update_generator.rb new file mode 100644 index 00000000..272b0e3c --- /dev/null +++ b/lib/generators/solid_queue/update/update_generator.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require "rails/generators/active_record" + +class SolidQueue::UpdateGenerator < Rails::Generators::Base + include ActiveRecord::Generators::Migration + + source_root File.expand_path("templates", __dir__) + + class_option :database, type: :string, aliases: %i[ --db ], default: "queue", + desc: "The database that Solid Queue uses. Defaults to `queue`" + + def copy_new_migrations + template_dir = Dir.new(File.join(self.class.source_root, "db")) + + template_dir.each_child do |migration_file| + migration_template File.join("db", migration_file), File.join(db_migrate_path, migration_file), skip: true + end + end +end diff --git a/lib/solid_queue.rb b/lib/solid_queue.rb index e0d51c8c..2c49a689 100644 --- a/lib/solid_queue.rb +++ b/lib/solid_queue.rb @@ -69,6 +69,10 @@ def preserve_finished_jobs? preserve_finished_jobs end + def deprecator + @deprecator ||= ActiveSupport::Deprecation.new(next_major_version, "SolidQueue") + end + def instrument(channel, **options, &block) ActiveSupport::Notifications.instrument("#{channel}.solid_queue", **options, &block) end diff --git a/lib/solid_queue/configuration.rb b/lib/solid_queue/configuration.rb index a002b41d..b0083a17 100644 --- a/lib/solid_queue/configuration.rb +++ b/lib/solid_queue/configuration.rb @@ -188,6 +188,7 @@ def load_config_from_file(file) if file.exist? ActiveSupport::ConfigurationFile.parse(file).deep_symbolize_keys else + puts "[solid_queue] WARNING: Provided configuration file '#{file}' does not exist. Falling back to default configuration." {} end end diff --git a/lib/solid_queue/engine.rb b/lib/solid_queue/engine.rb index d10997c7..7debf229 100644 --- a/lib/solid_queue/engine.rb +++ b/lib/solid_queue/engine.rb @@ -37,5 +37,9 @@ class Engine < ::Rails::Engine include ActiveJob::ConcurrencyControls end end + + initializer "solid_queue.deprecator" do |app| + app.deprecators[:solid_queue] = SolidQueue.deprecator + end end end diff --git a/lib/solid_queue/log_subscriber.rb b/lib/solid_queue/log_subscriber.rb index 96fb19bf..aeb9d038 100644 --- a/lib/solid_queue/log_subscriber.rb +++ b/lib/solid_queue/log_subscriber.rb @@ -8,7 +8,7 @@ def dispatch_scheduled(event) end def claim(event) - debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :job_ids, :claimed_job_ids, :size)) + debug formatted_event(event, action: "Claim jobs", **event.payload.slice(:process_id, :process_name, :job_ids, :claimed_job_ids, :size)) end def release_many_claimed(event) @@ -16,11 +16,11 @@ def release_many_claimed(event) end def fail_many_claimed(event) - warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids)) + warn formatted_event(event, action: "Fail claimed jobs", **event.payload.slice(:job_ids, :process_ids, :process_names)) end def release_claimed(event) - info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id)) + info formatted_event(event, action: "Release claimed job", **event.payload.slice(:job_id, :process_id, :process_name)) end def retry_all(event) diff --git a/lib/solid_queue/processes/registrable.rb b/lib/solid_queue/processes/registrable.rb index 2cc9036d..c7428010 100644 --- a/lib/solid_queue/processes/registrable.rb +++ b/lib/solid_queue/processes/registrable.rb @@ -18,17 +18,19 @@ def process_id attr_accessor :process def register - @process = SolidQueue::Process.register \ - kind: kind, - name: name, - pid: pid, - hostname: hostname, - supervisor: try(:supervisor), - metadata: metadata.compact + wrap_in_app_executor do + @process = SolidQueue::Process.register \ + kind: kind, + name: name, + pid: pid, + hostname: hostname, + supervisor: try(:supervisor), + metadata: metadata.compact + end end def deregister - process&.deregister + wrap_in_app_executor { process&.deregister } end def registered? diff --git a/lib/solid_queue/scheduler/recurring_schedule.rb b/lib/solid_queue/scheduler/recurring_schedule.rb index 4070a0ec..b765edf1 100644 --- a/lib/solid_queue/scheduler/recurring_schedule.rb +++ b/lib/solid_queue/scheduler/recurring_schedule.rb @@ -46,7 +46,7 @@ def persist_tasks end def reload_tasks - @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys) + @configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a end def schedule(task) diff --git a/lib/solid_queue/supervisor.rb b/lib/solid_queue/supervisor.rb index 05deaa52..ef9c79d6 100644 --- a/lib/solid_queue/supervisor.rb +++ b/lib/solid_queue/supervisor.rb @@ -29,22 +29,18 @@ def initialize(configuration) end def start - wrap_in_app_executor do - boot - run_start_hooks + boot + run_start_hooks - start_processes - launch_maintenance_task + start_processes + launch_maintenance_task - supervise - end + supervise end def stop - wrap_in_app_executor do - super - run_stop_hooks - end + super + run_stop_hooks end private @@ -180,9 +176,11 @@ def replace_fork(pid, status) # executions it had claimed as failed so that they can be retried # by some other worker. def handle_claimed_jobs_by(terminated_fork, status) - if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) - error = Processes::ProcessExitError.new(status) - registered_process.fail_all_claimed_executions_with(error) + wrap_in_app_executor do + if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name) + error = Processes::ProcessExitError.new(status) + registered_process.fail_all_claimed_executions_with(error) + end end end diff --git a/lib/solid_queue/tasks.rb b/lib/solid_queue/tasks.rb index 91cd778b..f8d22fc2 100644 --- a/lib/solid_queue/tasks.rb +++ b/lib/solid_queue/tasks.rb @@ -4,7 +4,12 @@ Rails::Command.invoke :generate, [ "solid_queue:install" ] end - desc "start solid_queue supervisor to dispatch and process jobs" + desc "Update Solid Queue" + task :update do + Rails::Command.invoke :generate, [ "solid_queue:update" ] + end + + desc "Start Solid Queue supervisor to dispatch and process jobs" task start: :environment do SolidQueue::Supervisor.start end diff --git a/lib/solid_queue/version.rb b/lib/solid_queue/version.rb index f1718f9a..c542092d 100644 --- a/lib/solid_queue/version.rb +++ b/lib/solid_queue/version.rb @@ -1,3 +1,7 @@ module SolidQueue - VERSION = "1.2.2" + VERSION = "1.2.4" + + def self.next_major_version + Gem::Version.new(VERSION).segments.first + 1 + end end diff --git a/lib/solid_queue/worker.rb b/lib/solid_queue/worker.rb index e036a5fd..d426688c 100644 --- a/lib/solid_queue/worker.rb +++ b/lib/solid_queue/worker.rb @@ -38,7 +38,7 @@ def poll def claim_executions with_polling_volume do - SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process_id) + SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process) end end diff --git a/solid_queue.gemspec b/solid_queue.gemspec index 64818a2e..439cf519 100644 --- a/solid_queue.gemspec +++ b/solid_queue.gemspec @@ -10,11 +10,6 @@ Gem::Specification.new do |spec| spec.description = "Database-backed Active Job backend." spec.license = "MIT" - spec.post_install_message = <<~MESSAGE - Upgrading from Solid Queue < 1.0? Check details on breaking changes and upgrade instructions - --> https://github.com/rails/solid_queue/blob/main/UPGRADING.md - MESSAGE - spec.metadata["homepage_uri"] = spec.homepage spec.metadata["source_code_uri"] = "https://github.com/rails/solid_queue" diff --git a/test/dummy/config/environments/test.rb b/test/dummy/config/environments/test.rb index a5a99232..4a07d140 100644 --- a/test/dummy/config/environments/test.rb +++ b/test/dummy/config/environments/test.rb @@ -59,4 +59,9 @@ config.solid_queue.logger = ActiveSupport::Logger.new(nil) config.solid_queue.shutdown_timeout = 2.seconds + + config.log_formatter = proc do |severity, timestamp, progname, msg| + ts = timestamp.getlocal.strftime("%H:%M:%S.%3N") + "#{ts} #{msg}\n" + end end diff --git a/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb b/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb new file mode 100644 index 00000000..7d6f93e7 --- /dev/null +++ b/test/dummy/db/queue_migrate/20250720172253_link_claimed_executions_with_processes_through_name.rb @@ -0,0 +1,30 @@ +class LinkClaimedExecutionsWithProcessesThroughName < ActiveRecord::Migration[7.1] + def up + unless connection.column_exists?(:solid_queue_claimed_executions, :process_name) + add_column :solid_queue_claimed_executions, :process_name, :string + add_index :solid_queue_claimed_executions, :process_name + end + + unless connection.index_exists?(:solid_queue_processes, :name) + add_index :solid_queue_processes, :name, unique: true + end + + if connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + remove_index :solid_queue_processes, [ :name, :supervisor_id ] + end + end + + def down + if connection.column_exists?(:solid_queue_claimed_executions, :process_name) + remove_column :solid_queue_claimed_executions, :process_name + end + + if connection.index_exists?(:solid_queue_processes, :name) + remove_index :solid_queue_processes, :name + end + + unless connection.index_exists?(:solid_queue_processes, [ :name, :supervisor_id ]) + add_index :solid_queue_processes, [ :name, :supervisor_id ], unique: true + end + end +end diff --git a/test/dummy/db/queue_schema.rb b/test/dummy/db/queue_schema.rb index 697c2e92..20b83084 100644 --- a/test/dummy/db/queue_schema.rb +++ b/test/dummy/db/queue_schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 1) do +ActiveRecord::Schema[7.1].define(version: 2025_07_20_172253) do create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.bigint "job_id", null: false t.string "queue_name", null: false @@ -27,8 +27,10 @@ t.bigint "job_id", null: false t.bigint "process_id" t.datetime "created_at", null: false + t.string "process_name" t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" + t.index ["process_name"], name: "index_solid_queue_claimed_executions_on_process_name" end create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| @@ -72,7 +74,7 @@ t.datetime "created_at", null: false t.string "name", null: false t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at" - t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true + t.index ["name"], name: "index_solid_queue_processes_on_name", unique: true t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id" end diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 178c796d..b3be95c5 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -36,14 +36,16 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "schedule several conflicting jobs over the same record sequentially" do - UpdateResultJob.set(wait: 0.23.seconds).perform_later(@result, name: "000", pause: 0.1.seconds) + # Writes to @result at 0.4s + UpdateResultJob.set(wait: 0.2.seconds).perform_later(@result, name: "000", pause: 0.2.seconds) ("A".."F").each_with_index do |name, i| - NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds) + # "A" is enqueued at 0.2s and writes to @result at 0.6s, the write at 0.4s gets overwritten + NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.1).seconds).perform_later(@result, name: name, pause: 0.4.seconds) end ("G".."K").each_with_index do |name, i| - NonOverlappingUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name) + NonOverlappingUpdateResultJob.set(wait: (1 + i * 0.1).seconds).perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 046700d0..bb89497a 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -34,7 +34,7 @@ class InstrumentationTest < ActiveSupport::TestCase end assert_equal 1, events.size - assert_event events.first, "claim", process_id: process.id, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3 + assert_event events.first, "claim", process_id: process.id, process_name: process.name, job_ids: jobs.map(&:id), claimed_job_ids: jobs.map(&:id), size: 3 end test "polling emits events" do @@ -68,7 +68,7 @@ class InstrumentationTest < ActiveSupport::TestCase assert_equal 2, events.size release_one_event, release_many_event = events - assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id + assert_event release_one_event, "release_claimed", job_id: SolidQueue::Job.last.id, process_id: process.id, process_name: process.name assert_event release_many_event, "release_many_claimed", size: 1 end @@ -150,14 +150,14 @@ class InstrumentationTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) events = subscribed(/fail.*_claimed\.solid_queue/) do SolidQueue::Process.prune end assert_equal 1, events.count - assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], job_ids: jobs.map(&:id), size: 3 + assert_event events.first, "fail_many_claimed", process_ids: [ process.id ], process_names: [ process.name ], job_ids: jobs.map(&:id), size: 3 end test "errors when deregistering processes are included in deregister_process events" do diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index a86b2f15..b66718d1 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -182,12 +182,6 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase assert_completed_job_results("no exit", :background, 4) assert_completed_job_results("paused no exit", :default, 1) - # The background worker exits because of the exit job, - # leaving the pause job claimed - [ exit_job, pause_job ].each do |job| - assert_job_status(job, :claimed) - end - assert process_exists?(@pid) terminate_process(@pid) @@ -268,7 +262,7 @@ class ProcessesLifecycleTest < ActiveSupport::TestCase private def assert_clean_termination - wait_for_registered_processes 0, timeout: 0.2.second + wait_for_registered_processes 0, timeout: 0.5.second assert_no_registered_processes assert_no_claimed_jobs assert_not process_exists?(@pid) diff --git a/test/models/solid_queue/claimed_execution_test.rb b/test/models/solid_queue/claimed_execution_test.rb index 98513c94..5dfe69b3 100644 --- a/test/models/solid_queue/claimed_execution_test.rb +++ b/test/models/solid_queue/claimed_execution_test.rb @@ -87,7 +87,7 @@ def prepare_and_claim_job(active_job, process: @process) job.prepare_for_execution assert_difference -> { SolidQueue::ClaimedExecution.count } => +1 do - SolidQueue::ReadyExecution.claim(job.queue_name, 1, process.id) + SolidQueue::ReadyExecution.claim(job.queue_name, 1, process) end SolidQueue::ClaimedExecution.last diff --git a/test/models/solid_queue/process_test.rb b/test/models/solid_queue/process_test.rb index 489b2aca..e81a848e 100644 --- a/test/models/solid_queue/process_test.rb +++ b/test/models/solid_queue/process_test.rb @@ -20,7 +20,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) travel_to 10.minutes.from_now @@ -40,7 +40,7 @@ class SolidQueue::ProcessTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } jobs = SolidQueue::Job.last(3) - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) travel_to 10.minutes.from_now diff --git a/test/models/solid_queue/ready_execution_test.rb b/test/models/solid_queue/ready_execution_test.rb index dd9269ca..6c712598 100644 --- a/test/models/solid_queue/ready_execution_test.rb +++ b/test/models/solid_queue/ready_execution_test.rb @@ -6,12 +6,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.set(queue: "backend", priority: 5 - i).perform_later(i) end + @process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123", metadata: { queue: "background" }) @jobs = SolidQueue::Job.where(queue_name: "backend").order(:priority) end test "claim all jobs for existing queue" do assert_claimed_jobs(@jobs.count) do - SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, 42) + SolidQueue::ReadyExecution.claim("backend", @jobs.count + 1, @process) end @jobs.each(&:reload) @@ -21,13 +22,13 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase test "claim jobs for queue without jobs at the moment" do assert_no_difference [ -> { SolidQueue::ReadyExecution.count }, -> { SolidQueue::ClaimedExecution.count } ] do - SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, 42) + SolidQueue::ReadyExecution.claim("some_non_existing_queue", 10, @process) end end test "claim some jobs for existing queue" do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim("backend", 2, 42) + SolidQueue::ReadyExecution.claim("backend", 2, @process) end @jobs.first(2).each do |job| @@ -45,7 +46,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ backend background ], SolidQueue::Job.count + 1, @process) end end @@ -53,7 +54,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process) end end @@ -61,7 +62,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("backgr*", SolidQueue::Job.count + 1, @process) end assert @jobs.none?(&:claimed?) @@ -73,7 +74,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase SolidQueue::Queue.find_by_name("backend").pause assert_claimed_jobs(1) do - SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim("*", SolidQueue::Job.count + 1, @process) end @jobs.each(&:reload) @@ -84,7 +85,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(6) do - SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ backe* background ], SolidQueue::Job.count + 1, @process) end end @@ -92,7 +93,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase AddToBufferJob.perform_later("hey") assert_claimed_jobs(0) do - SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, 42) + SolidQueue::ReadyExecution.claim(%w[ none* ], SolidQueue::Job.count + 1, @process) end end @@ -101,7 +102,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase job = SolidQueue::Job.last assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim("*", 3, 42) + SolidQueue::ReadyExecution.claim("*", 3, @process) end assert job.reload.claimed? @@ -117,7 +118,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase assert_equal "background", job.queue_name assert_claimed_jobs(3) do - SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, 42) + SolidQueue::ReadyExecution.claim(%w[ background backend ], 3, @process) end assert job.reload.claimed? @@ -136,7 +137,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase claimed_jobs = [] 4.times do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, 42) + SolidQueue::ReadyExecution.claim(%w[ queue_b* queue_a* ], 2, @process) end claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) @@ -157,7 +158,7 @@ class SolidQueue::ReadyExecutionTest < ActiveSupport::TestCase claimed_jobs = [] 5.times do assert_claimed_jobs(2) do - SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, 42) + SolidQueue::ReadyExecution.claim(%w[ queue_a2 queue_c1 queue_b* queue_c2 queue_a* ], 2, @process) end claimed_jobs += SolidQueue::ClaimedExecution.order(:id).last(2).map(&:job) diff --git a/test/test_helper.rb b/test/test_helper.rb index 7c1c8792..9e743f96 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -53,6 +53,17 @@ class ActiveSupport::TestCase end end + def run(...) + # Rails 8.1.dev changed default logging levels + if defined?(with_debug_event_reporting) + with_debug_event_reporting do + super + end + else + super + end + end + private def wait_while_with_timeout(timeout, &block) wait_while_with_timeout!(timeout, &block) diff --git a/test/unit/configuration_test.rb b/test/unit/configuration_test.rb index 2ccaa728..11c2a5ff 100644 --- a/test/unit/configuration_test.rb +++ b/test/unit/configuration_test.rb @@ -26,6 +26,13 @@ class ConfigurationTest < ActiveSupport::TestCase assert_processes configuration, :dispatcher, 1, batch_size: SolidQueue::Configuration::DISPATCHER_DEFAULTS[:batch_size] end + test "warns if provided configuration file does not exist" do + assert_output "[solid_queue] WARNING: Provided configuration file '/path/to/nowhere.yml' does not exist. Falling back to default configuration.\n" do + configuration = SolidQueue::Configuration.new(config_file: Pathname.new("/path/to/nowhere.yml")) + assert configuration.valid? + end + end + test "read configuration from default file" do configuration = SolidQueue::Configuration.new assert 3, configuration.configured_processes.count @@ -120,16 +127,20 @@ class ConfigurationTest < ActiveSupport::TestCase assert error.include?("periodic_invalid_class: Class name doesn't correspond to an existing class") assert error.include?("periodic_incorrect_schedule: Schedule is not a supported recurring schedule") - assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid? + assert_output(/Provided configuration file '[^']+' does not exist\./) do + assert SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:empty_recurring)).valid? + end assert SolidQueue::Configuration.new(skip_recurring: true).valid? configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_production_only)) assert configuration.valid? assert_processes configuration, :scheduler, 0 - configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty)) - assert configuration.valid? - assert_processes configuration, :scheduler, 0 + assert_output(/Provided configuration file '[^']+' does not exist\./) do + configuration = SolidQueue::Configuration.new(recurring_schedule_file: config_file_path(:recurring_with_empty)) + assert configuration.valid? + assert_processes configuration, :scheduler, 0 + end # No processes configuration = SolidQueue::Configuration.new(skip_recurring: true, dispatchers: [], workers: []) diff --git a/test/unit/dispatcher_test.rb b/test/unit/dispatcher_test.rb index 9aa2196e..89d87c1a 100644 --- a/test/unit/dispatcher_test.rb +++ b/test/unit/dispatcher_test.rb @@ -107,6 +107,8 @@ class DispatcherTest < ActiveSupport::TestCase assert_equal 0, SolidQueue::ScheduledExecution.count assert_equal 3, SolidQueue::ReadyExecution.count + ensure + dispatcher.stop end test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do @@ -117,6 +119,8 @@ class DispatcherTest < ActiveSupport::TestCase dispatcher.start wait_while_with_timeout(1.second) { !SolidQueue::ScheduledExecution.exists? } + ensure + dispatcher.stop end private diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index 7a531ad2..7ef568b4 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -111,7 +111,7 @@ class SupervisorTest < ActiveSupport::TestCase 3.times { |i| StoreResultJob.set(queue: :new_queue).perform_later(i) } process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) assert_equal 3, SolidQueue::ClaimedExecution.count assert_equal 0, SolidQueue::ReadyExecution.count @@ -138,7 +138,7 @@ class SupervisorTest < ActiveSupport::TestCase 4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) } process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") - SolidQueue::ReadyExecution.claim("*", 5, process.id) + SolidQueue::ReadyExecution.claim("*", 5, process) assert_equal 3, SolidQueue::ClaimedExecution.count assert_equal 0, SolidQueue::ReadyExecution.count @@ -193,7 +193,7 @@ class SupervisorTest < ActiveSupport::TestCase worker_process = SolidQueue::Process.register(kind: "Worker", pid: 999_999, name: worker_name) job = StoreResultJob.perform_later(42) - claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process.id).first + claimed_execution = SolidQueue::ReadyExecution.claim("*", 1, worker_process).first terminated_fork = Struct.new(:name).new(worker_name)