Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions async-job-adapter-active_job.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ Gem::Specification.new do |spec|

spec.add_dependency "async-job", "~> 0.9"
spec.add_dependency "async-service", "~> 0.12"
# Recurring scheduler support (cron parsing + optional Redis cross-host dedup):
spec.add_dependency "fugit", "~> 1.10"
spec.add_dependency "async-redis", "~> 0.13"
end
28 changes: 27 additions & 1 deletion bin/async-job-adapter-active_job-server
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,37 @@ require "async/service/configuration"
require "async/service/controller"

require "async/job/adapter/active_job/environment"
require "async/job/adapter/active_job/recurring/environment"
require "async/job/adapter/active_job/recurring/loader"
require "yaml"

# Flush output from child processes immediately so logs aren't buffered until shutdown.
STDOUT.sync = true
STDERR.sync = true

root = ENV.fetch("RAILS_ROOT", Dir.pwd)
env = ENV["ASYNC_JOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || ENV["APP_ENV"] || "development"
schedule_file = Async::Job::Adapter::ActiveJob::Recurring::Loader.schedule_path(root)
has_schedule = begin
data = File.exist?(schedule_file) ? (YAML.load_file(schedule_file) || {}) : {}
section = data.fetch(env, {}) || {}
!section.empty?
rescue => _
false
end

skip_scheduler = ENV["ASYNC_JOB_SKIP_RECURRING"] == "true" || ENV["SOLID_QUEUE_SKIP_RECURRING"] == "true" || ENV["JOBS_SKIP_RECURRING"] == "true"

configuration = Async::Service::Configuration.build do
service "async-job-adapter-active_job-server" do
service "jobs" do
include Async::Job::Adapter::ActiveJob::Environment
end

unless skip_scheduler || !has_schedule
service "scheduler" do
include Async::Job::Adapter::ActiveJob::Recurring::Environment
end
end
end

Async::Service::Controller.run(configuration)
7 changes: 6 additions & 1 deletion context/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ end

### Running A Server

If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all define queues.
If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all defined queues, and optionally a recurring scheduler.

``` bash
$ bundle exec async-job-adapter-active_job-server

The server includes by default:

- `jobs` service — processes Active Job queues
- `scheduler` service — executes tasks from `config/recurring.yml` (disable with `ASYNC_JOB_SKIP_RECURRING=true`)
```

You can specify different queues using the `ASYNC_JOB_ADAPTER_ACTIVE_JOB_QUEUE_NAMES` environment variable.
Expand Down
11 changes: 10 additions & 1 deletion guides/getting-started/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ end

### Running A Server

If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all define queues.
If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all defined queues. It can also run a recurring scheduler that reads `config/recurring.yml` and enqueues Active Job tasks using Fugit-based cron.

``` bash
$ bundle exec async-job-adapter-active_job-server

The server runs two services by default:

- `jobs` — Active Job workers for all configured queues.
- `scheduler` — Recurring scheduler (Fugit-based) that enqueues tasks from `config/recurring.yml`.

Disable the scheduler with `ASYNC_JOB_SKIP_RECURRING=true`.

See `guides/recurring` for configuration and environment variables.
```

You can specify different queues using the `ASYNC_JOB_ADAPTER_ACTIVE_JOB_QUEUE_NAMES` environment variable.
Expand Down
61 changes: 61 additions & 0 deletions guides/recurring/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Recurring Tasks (Scheduler)

The `async-job-adapter-active_job` server can run a recurring scheduler alongside workers. It reads `config/recurring.yml`, parses cron expressions using Fugit, and enqueues Active Job tasks on schedule.

## Enable

By default, `bundle exec async-job-adapter-active_job-server` starts workers and the scheduler (if a schedule file is present). You can disable the scheduler with:

```bash
ASYNC_JOB_SKIP_RECURRING=true bundle exec async-job-adapter-active_job-server
# or compatible alias:
SOLID_QUEUE_SKIP_RECURRING=true bundle exec async-job-adapter-active_job-server
```

## Schedule file

Default path: `config/recurring.yml`.

Override via `ASYNC_JOB_RECURRING_SCHEDULE` (or `SOLID_QUEUE_RECURRING_SCHEDULE`).

Structure:

```yaml
production:
my_task:
class: MyJob # or use `command: "SomeModule.some_method"`
args: [ 42, { foo: "bar" } ]
queue: default
priority: 0
schedule: "*/5 * * * *" # Fugit/cron; also accepts "every 5 seconds"
```

Supported schedule strings:
- Cron (Fugit::Cron)
- Convenience phrases: "every N seconds/minutes/hours"

## Cross-host dedup & last run

- Dedup (ensures only one host enqueues per tick when multiple servers run):
- Uses Redis if available (`REDIS_URL`) or `ASYNC_JOB_RECURRING_DEDUP=redis`.
- Key: `<prefix>:recurring:exec:<task_key>:<run_at_epoch>` (NX + EX TTL).
- TTL configurable via `ASYNC_JOB_RECURRING_DEDUP_TTL` (default 600 seconds).
- Last enqueued time:
- Stores `<task_key> -> epoch` in `<prefix>:recurring:last` (Redis) when Redis is enabled.
- Fallback is a no-op; apps may also record to Rails.cache if desired.
- Prefix:
- `ASYNC_JOB_REDIS_PREFIX` (default `async-job`).

## Environment variables

- `ASYNC_JOB_RECURRING_SCHEDULE` — path to schedule file (default `config/recurring.yml`).
- `ASYNC_JOB_SKIP_RECURRING=true` — disable scheduler.
- `ASYNC_JOB_REDIS_PREFIX` — Redis key prefix (default `async-job`).
- `ASYNC_JOB_RECURRING_DEDUP=auto|redis|memory` — dedup backend (default `auto`: redis if `REDIS_URL`, else memory).
- `ASYNC_JOB_RECURRING_DEDUP_TTL` — dedup TTL in seconds (default `600`).
- `ASYNC_JOB_RECURRING_LAST=auto|redis|cache` — last run backend (currently redis or no-op; default `auto`).

## Logs

On start, the scheduler logs how many tasks were loaded and will print an enqueue message per tick. Errors while enqueuing are logged at warn level.

2 changes: 2 additions & 0 deletions lib/async/job/adapter/active_job/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def keys
@definitions.keys
end

# Generate a status string for all active queues.
# @returns [String] A comma-separated list of queue statuses.
def status_string
self.keys.map do |name|
queue = @queues[name]
Expand Down
10 changes: 9 additions & 1 deletion lib/async/job/adapter/active_job/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,17 @@ def queue_names
end

# Number of instances to start. By default (when nil), uses `Etc.nprocessors`.
# You can override via `ASYNC_JOB_WORKERS` or `JOBS_COUNT`.
# In development, defaults to 1 if not specified.
# @returns [Integer | nil]
def count
nil
if workers = (ENV["ASYNC_JOB_WORKERS"] || ENV["JOBS_COUNT"])
Integer(workers)
elsif defined?(::Rails) && ::Rails.env.development?
1
else
nil
end
end

# Options to use when creating the container.
Expand Down
32 changes: 32 additions & 0 deletions lib/async/job/adapter/active_job/recurring/environment.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

# Released under the MIT License.

require_relative "service"

module Async
module Job
module Adapter
module ActiveJob
# Provides recurring job scheduling functionality.
module Recurring
# Environment configuration for the recurring scheduler service.
module Environment
# The service class to use for recurring scheduling.
# @returns [Class] The Service class.
def service_class
Service
end

# The number of recurring scheduler instances to run.
# @returns [Integer] The count (always 1).
def count
1
end
end
end
end
end
end
end

81 changes: 81 additions & 0 deletions lib/async/job/adapter/active_job/recurring/loader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2025

require "yaml"
require "console"
require_relative "task"

module Async
module Job
module Adapter
module ActiveJob
module Recurring
# Loads recurring task definitions from YAML configuration.
module Loader
module_function

DEFAULT_PATH = "config/recurring.yml"

# Determine the path to the recurring schedule file.
# @parameter root [String] The application root directory.
# @returns [String] The path to the schedule file.
def schedule_path(root)
ENV["ASYNC_JOB_RECURRING_SCHEDULE"] || ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || ENV["RECURRING_SCHEDULE_FILE"] || File.join(root, DEFAULT_PATH)
end

# Load tasks from recurring.yml scoped by env.
# @return [Array<Task>]
def load(root:, env:)
path = schedule_path(root)
unless File.exist?(path)
Console.info(self, "Recurring schedule file not found.", path: path)
return []
end

config = YAML.load_file(path) || {}
map = config.fetch(env.to_s, {}) || {}
if map.empty?
Console.info(self, "Recurring schedule has no tasks for env.", path: path, env: env)
end

map.filter_map do |key, spec|
schedule = spec["schedule"]
unless schedule
Console.warn(self, "Skipping recurring task without schedule.", key: key, path: path)
next
end

cron_string = Helper.normalize_schedule(schedule)
cron = Helper.parse_cron(cron_string)
unless cron
Console.warn(self, "Skipping task: unsupported schedule.", key: key, schedule: schedule, normalized: cron_string, path: path)
next
end

if spec["class"]
klass = Helper.constantize(spec["class"])
unless klass
Console.warn(self, "Skipping task: unknown job class.", key: key, class: spec["class"], path: path)
next
end
end

Task.new(
key: key.to_s,
klass: klass,
command: spec["command"],
queue: spec["queue"],
priority: spec["priority"],
args: spec["args"],
cron: cron
)
end
end
end
end
end
end
end
end
Loading