Skip to content

Commit 3e44ea4

Browse files
committed
Add kind, pid and hostname as first-class attributes to Process
1 parent c4940a1 commit 3e44ea4

File tree

12 files changed

+40
-20
lines changed

12 files changed

+40
-20
lines changed

app/models/solid_queue/blocked_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
module SolidQueue
2-
class BlockedExecution < SolidQueue::Execution
2+
class BlockedExecution < Execution
33
assume_attributes_from_job :concurrency_key
44
before_create :set_expires_at
55

app/models/solid_queue/execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
module SolidQueue
2-
class Execution < SolidQueue::Record
2+
class Execution < Record
33
include JobAttributes
44

55
self.abstract_class = true

app/models/solid_queue/process.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ class SolidQueue::Process < SolidQueue::Record
55
has_many :forks, class_name: "SolidQueue::Process", inverse_of: :supervisor, foreign_key: :supervisor_id, dependent: :destroy
66
has_many :claimed_executions
77

8-
store :metadata, accessors: [ :kind, :pid ], coder: JSON
8+
store :metadata, coder: JSON
99

1010
after_destroy -> { claimed_executions.release_all }
1111

app/models/solid_queue/scheduled_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
module SolidQueue
2-
class ScheduledExecution < SolidQueue::Execution
2+
class ScheduledExecution < Execution
33
scope :due, -> { where(scheduled_at: ..Time.current) }
44
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
55
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
class AddNewAttributesToProcesses < ActiveRecord::Migration[7.1]
2+
def change
3+
change_table :solid_queue_processes do |t|
4+
t.string :kind, null: false
5+
t.string :hostname
6+
t.integer :pid, null: false
7+
end
8+
end
9+
end

lib/solid_queue/process_registration.rb

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ def supervised_by(process)
3232
attr_accessor :process
3333

3434
def register
35-
@process = SolidQueue::Process.register(supervisor: supervisor, metadata: metadata)
35+
@process = SolidQueue::Process.register \
36+
kind: self.class.name.demodulize,
37+
pid: process_pid,
38+
supervisor: supervisor,
39+
hostname: hostname,
40+
metadata: metadata
3641
end
3742

3843
def deregister
@@ -65,7 +70,7 @@ def process_pid
6570
end
6671

6772
def metadata
68-
{ kind: self.class.name.demodulize, hostname: hostname, pid: process_pid, supervisor_pid: supervisor&.pid }
73+
{}
6974
end
7075
end
7176
end

test/dummy/db/schema.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
#
1111
# It's strongly recommended that you check this file into your version control system.
1212

13-
ActiveRecord::Schema[7.1].define(version: 2023_12_08_215544) do
13+
ActiveRecord::Schema[7.1].define(version: 2023_12_11_200639) do
1414
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
1515
t.string "queue_name"
1616
t.string "status"
@@ -73,6 +73,9 @@
7373
t.datetime "created_at", null: false
7474
t.datetime "last_heartbeat_at", null: false
7575
t.bigint "supervisor_id"
76+
t.string "kind", null: false
77+
t.string "hostname"
78+
t.integer "pid", null: false
7679
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
7780
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
7881
end

test/integration/processes_lifecycle_test.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,13 +194,13 @@ def assert_registered_workers_for(*queues)
194194
workers = find_processes_registered_as("Worker")
195195
registered_queues = workers.map { |process| process.metadata["queues"] }.compact
196196
assert_equal queues.map(&:to_s).sort, registered_queues.sort
197-
assert_equal [ @pid ], workers.map { |process| process.metadata["supervisor_pid"] }.uniq
197+
assert_equal [ @pid ], workers.map { |process| process.supervisor.pid }.uniq
198198
end
199199

200200
def assert_registered_supervisor
201201
processes = find_processes_registered_as("Supervisor")
202202
assert_equal 1, processes.count
203-
assert_equal @pid, processes.first.metadata["pid"]
203+
assert_equal @pid, processes.first.pid
204204
end
205205

206206
def assert_no_registered_workers

test/models/solid_queue/claimed_execution_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
44
setup do
5-
@process = SolidQueue::Process.register(metadata: { queue: "background" })
5+
@process = SolidQueue::Process.register(kind: "Worker", pid: 42, metadata: { queue: "background" })
66
end
77

88
test "perform job successfully" do

test/models/solid_queue/process_test.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22

33
class SolidQueue::ProcessTest < ActiveSupport::TestCase
44
test "prune processes with expired heartbeats" do
5-
SolidQueue::Process.register(metadata: { pid: 42 })
6-
SolidQueue::Process.register(metadata: { pid: 24 })
5+
SolidQueue::Process.register(kind: "Worker", pid: 42)
6+
SolidQueue::Process.register(kind: "Worker", pid: 43)
77

88
travel_to 10.minutes.from_now
99

10-
SolidQueue::Process.register(metadata: { pid: 16 })
10+
SolidQueue::Process.register(kind: "Worker", pid: 44)
1111

1212
assert_difference -> { SolidQueue::Process.count }, -2 do
1313
SolidQueue::Process.prune

0 commit comments

Comments
 (0)