Skip to content

Commit 5c6d4b0

Browse files
flavorjonesrosa
authored andcommitted
Add wrap_in_app_executor in a few necessary places
Process#register and #degister for the supervisor to start up properly. RecurringSchedule#reload_tasks resolves all the records immediately to avoid deferred resolution outside the executor block. Supervisor#handle_claimed_jobs_by wraps its code in the executor. A second attempt at #655 without causing the issues from #670
1 parent 6b36dcc commit 5c6d4b0

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

lib/solid_queue/processes/registrable.rb

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@ def process_id
1818
attr_accessor :process
1919

2020
def register
21-
@process = SolidQueue::Process.register \
22-
kind: kind,
23-
name: name,
24-
pid: pid,
25-
hostname: hostname,
26-
supervisor: try(:supervisor),
27-
metadata: metadata.compact
21+
wrap_in_app_executor do
22+
@process = SolidQueue::Process.register \
23+
kind: kind,
24+
name: name,
25+
pid: pid,
26+
hostname: hostname,
27+
supervisor: try(:supervisor),
28+
metadata: metadata.compact
29+
end
2830
end
2931

3032
def deregister
31-
process&.deregister
33+
wrap_in_app_executor { process&.deregister }
3234
end
3335

3436
def registered?

lib/solid_queue/scheduler/recurring_schedule.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def persist_tasks
4646
end
4747

4848
def reload_tasks
49-
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys)
49+
@configured_tasks = SolidQueue::RecurringTask.where(key: task_keys).to_a
5050
end
5151

5252
def schedule(task)

lib/solid_queue/supervisor.rb

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,9 +176,11 @@ def replace_fork(pid, status)
176176
# executions it had claimed as failed so that they can be retried
177177
# by some other worker.
178178
def handle_claimed_jobs_by(terminated_fork, status)
179-
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
180-
error = Processes::ProcessExitError.new(status)
181-
registered_process.fail_all_claimed_executions_with(error)
179+
wrap_in_app_executor do
180+
if registered_process = SolidQueue::Process.find_by(name: terminated_fork.name)
181+
error = Processes::ProcessExitError.new(status)
182+
registered_process.fail_all_claimed_executions_with(error)
183+
end
182184
end
183185
end
184186

0 commit comments

Comments
 (0)