diff --git a/async-job-adapter-active_job.gemspec b/async-job-adapter-active_job.gemspec index d4e97d7..ddbed92 100644 --- a/async-job-adapter-active_job.gemspec +++ b/async-job-adapter-active_job.gemspec @@ -28,4 +28,7 @@ Gem::Specification.new do |spec| spec.add_dependency "async-job", "~> 0.9" spec.add_dependency "async-service", "~> 0.12" + # Recurring scheduler support (cron parsing + optional Redis cross-host dedup): + spec.add_dependency "fugit", "~> 1.10" + spec.add_dependency "async-redis", "~> 0.13" end diff --git a/bin/async-job-adapter-active_job-server b/bin/async-job-adapter-active_job-server index a060854..3bff592 100755 --- a/bin/async-job-adapter-active_job-server +++ b/bin/async-job-adapter-active_job-server @@ -5,11 +5,37 @@ require "async/service/configuration" require "async/service/controller" require "async/job/adapter/active_job/environment" +require "async/job/adapter/active_job/recurring/environment" +require "async/job/adapter/active_job/recurring/loader" +require "yaml" + +# Flush output from child processes immediately so logs aren't buffered until shutdown. +STDOUT.sync = true +STDERR.sync = true + +root = ENV.fetch("RAILS_ROOT", Dir.pwd) +env = ENV["ASYNC_JOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || ENV["APP_ENV"] || "development" +schedule_file = Async::Job::Adapter::ActiveJob::Recurring::Loader.schedule_path(root) +has_schedule = begin + data = File.exist?(schedule_file) ? (YAML.load_file(schedule_file) || {}) : {} + section = data.fetch(env, {}) || {} + !section.empty? +rescue => _ + false +end + +skip_scheduler = ENV["ASYNC_JOB_SKIP_RECURRING"] == "true" || ENV["SOLID_QUEUE_SKIP_RECURRING"] == "true" || ENV["JOBS_SKIP_RECURRING"] == "true" configuration = Async::Service::Configuration.build do - service "async-job-adapter-active_job-server" do + service "jobs" do include Async::Job::Adapter::ActiveJob::Environment end + + unless skip_scheduler || !has_schedule + service "scheduler" do + include Async::Job::Adapter::ActiveJob::Recurring::Environment + end + end end Async::Service::Controller.run(configuration) diff --git a/context/getting-started.md b/context/getting-started.md index 752de99..36ddcd7 100644 --- a/context/getting-started.md +++ b/context/getting-started.md @@ -71,10 +71,15 @@ end ### Running A Server -If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all define queues. +If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all defined queues, and optionally a recurring scheduler. ``` bash $ bundle exec async-job-adapter-active_job-server + +The server includes by default: + +- `jobs` service — processes Active Job queues +- `scheduler` service — executes tasks from `config/recurring.yml` (disable with `ASYNC_JOB_SKIP_RECURRING=true`) ``` You can specify different queues using the `ASYNC_JOB_ADAPTER_ACTIVE_JOB_QUEUE_NAMES` environment variable. diff --git a/guides/getting-started/readme.md b/guides/getting-started/readme.md index 752de99..496db6e 100644 --- a/guides/getting-started/readme.md +++ b/guides/getting-started/readme.md @@ -71,10 +71,19 @@ end ### Running A Server -If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all define queues. +If you are using a queue that requires a server (e.g. Redis), you will need to run a server. A simple server is provided `async-job-adapter-active_job-server`, which by default will run all defined queues. It can also run a recurring scheduler that reads `config/recurring.yml` and enqueues Active Job tasks using Fugit-based cron. ``` bash $ bundle exec async-job-adapter-active_job-server + +The server runs two services by default: + +- `jobs` — Active Job workers for all configured queues. +- `scheduler` — Recurring scheduler (Fugit-based) that enqueues tasks from `config/recurring.yml`. + +Disable the scheduler with `ASYNC_JOB_SKIP_RECURRING=true`. + +See `guides/recurring` for configuration and environment variables. ``` You can specify different queues using the `ASYNC_JOB_ADAPTER_ACTIVE_JOB_QUEUE_NAMES` environment variable. diff --git a/guides/recurring/readme.md b/guides/recurring/readme.md new file mode 100644 index 0000000..84f35cc --- /dev/null +++ b/guides/recurring/readme.md @@ -0,0 +1,61 @@ +# Recurring Tasks (Scheduler) + +The `async-job-adapter-active_job` server can run a recurring scheduler alongside workers. It reads `config/recurring.yml`, parses cron expressions using Fugit, and enqueues Active Job tasks on schedule. + +## Enable + +By default, `bundle exec async-job-adapter-active_job-server` starts workers and the scheduler (if a schedule file is present). You can disable the scheduler with: + +```bash +ASYNC_JOB_SKIP_RECURRING=true bundle exec async-job-adapter-active_job-server +# or compatible alias: +SOLID_QUEUE_SKIP_RECURRING=true bundle exec async-job-adapter-active_job-server +``` + +## Schedule file + +Default path: `config/recurring.yml`. + +Override via `ASYNC_JOB_RECURRING_SCHEDULE` (or `SOLID_QUEUE_RECURRING_SCHEDULE`). + +Structure: + +```yaml +production: + my_task: + class: MyJob # or use `command: "SomeModule.some_method"` + args: [ 42, { foo: "bar" } ] + queue: default + priority: 0 + schedule: "*/5 * * * *" # Fugit/cron; also accepts "every 5 seconds" +``` + +Supported schedule strings: +- Cron (Fugit::Cron) +- Convenience phrases: "every N seconds/minutes/hours" + +## Cross-host dedup & last run + +- Dedup (ensures only one host enqueues per tick when multiple servers run): + - Uses Redis if available (`REDIS_URL`) or `ASYNC_JOB_RECURRING_DEDUP=redis`. + - Key: `:recurring:exec::` (NX + EX TTL). + - TTL configurable via `ASYNC_JOB_RECURRING_DEDUP_TTL` (default 600 seconds). +- Last enqueued time: + - Stores ` -> epoch` in `:recurring:last` (Redis) when Redis is enabled. + - Fallback is a no-op; apps may also record to Rails.cache if desired. +- Prefix: + - `ASYNC_JOB_REDIS_PREFIX` (default `async-job`). + +## Environment variables + +- `ASYNC_JOB_RECURRING_SCHEDULE` — path to schedule file (default `config/recurring.yml`). +- `ASYNC_JOB_SKIP_RECURRING=true` — disable scheduler. +- `ASYNC_JOB_REDIS_PREFIX` — Redis key prefix (default `async-job`). +- `ASYNC_JOB_RECURRING_DEDUP=auto|redis|memory` — dedup backend (default `auto`: redis if `REDIS_URL`, else memory). +- `ASYNC_JOB_RECURRING_DEDUP_TTL` — dedup TTL in seconds (default `600`). +- `ASYNC_JOB_RECURRING_LAST=auto|redis|cache` — last run backend (currently redis or no-op; default `auto`). + +## Logs + +On start, the scheduler logs how many tasks were loaded and will print an enqueue message per tick. Errors while enqueuing are logged at warn level. + diff --git a/lib/async/job/adapter/active_job/dispatcher.rb b/lib/async/job/adapter/active_job/dispatcher.rb index ea627da..0203879 100644 --- a/lib/async/job/adapter/active_job/dispatcher.rb +++ b/lib/async/job/adapter/active_job/dispatcher.rb @@ -61,6 +61,8 @@ def keys @definitions.keys end + # Generate a status string for all active queues. + # @returns [String] A comma-separated list of queue statuses. def status_string self.keys.map do |name| queue = @queues[name] diff --git a/lib/async/job/adapter/active_job/environment.rb b/lib/async/job/adapter/active_job/environment.rb index 5f52b47..364c0b1 100644 --- a/lib/async/job/adapter/active_job/environment.rb +++ b/lib/async/job/adapter/active_job/environment.rb @@ -38,9 +38,17 @@ def queue_names end # Number of instances to start. By default (when nil), uses `Etc.nprocessors`. + # You can override via `ASYNC_JOB_WORKERS` or `JOBS_COUNT`. + # In development, defaults to 1 if not specified. # @returns [Integer | nil] def count - nil + if workers = (ENV["ASYNC_JOB_WORKERS"] || ENV["JOBS_COUNT"]) + Integer(workers) + elsif defined?(::Rails) && ::Rails.env.development? + 1 + else + nil + end end # Options to use when creating the container. diff --git a/lib/async/job/adapter/active_job/recurring/environment.rb b/lib/async/job/adapter/active_job/recurring/environment.rb new file mode 100644 index 0000000..9bfc4b7 --- /dev/null +++ b/lib/async/job/adapter/active_job/recurring/environment.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require_relative "service" + +module Async + module Job + module Adapter + module ActiveJob + # Provides recurring job scheduling functionality. + module Recurring + # Environment configuration for the recurring scheduler service. + module Environment + # The service class to use for recurring scheduling. + # @returns [Class] The Service class. + def service_class + Service + end + + # The number of recurring scheduler instances to run. + # @returns [Integer] The count (always 1). + def count + 1 + end + end + end + end + end + end +end + diff --git a/lib/async/job/adapter/active_job/recurring/loader.rb b/lib/async/job/adapter/active_job/recurring/loader.rb new file mode 100644 index 0000000..c5c7d66 --- /dev/null +++ b/lib/async/job/adapter/active_job/recurring/loader.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025 + +require "yaml" +require "console" +require_relative "task" + +module Async + module Job + module Adapter + module ActiveJob + module Recurring + # Loads recurring task definitions from YAML configuration. + module Loader + module_function + + DEFAULT_PATH = "config/recurring.yml" + + # Determine the path to the recurring schedule file. + # @parameter root [String] The application root directory. + # @returns [String] The path to the schedule file. + def schedule_path(root) + ENV["ASYNC_JOB_RECURRING_SCHEDULE"] || ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || ENV["RECURRING_SCHEDULE_FILE"] || File.join(root, DEFAULT_PATH) + end + + # Load tasks from recurring.yml scoped by env. + # @return [Array] + def load(root:, env:) + path = schedule_path(root) + unless File.exist?(path) + Console.info(self, "Recurring schedule file not found.", path: path) + return [] + end + + config = YAML.load_file(path) || {} + map = config.fetch(env.to_s, {}) || {} + if map.empty? + Console.info(self, "Recurring schedule has no tasks for env.", path: path, env: env) + end + + map.filter_map do |key, spec| + schedule = spec["schedule"] + unless schedule + Console.warn(self, "Skipping recurring task without schedule.", key: key, path: path) + next + end + + cron_string = Helper.normalize_schedule(schedule) + cron = Helper.parse_cron(cron_string) + unless cron + Console.warn(self, "Skipping task: unsupported schedule.", key: key, schedule: schedule, normalized: cron_string, path: path) + next + end + + if spec["class"] + klass = Helper.constantize(spec["class"]) + unless klass + Console.warn(self, "Skipping task: unknown job class.", key: key, class: spec["class"], path: path) + next + end + end + + Task.new( + key: key.to_s, + klass: klass, + command: spec["command"], + queue: spec["queue"], + priority: spec["priority"], + args: spec["args"], + cron: cron + ) + end + end + end + end + end + end + end +end diff --git a/lib/async/job/adapter/active_job/recurring/scheduler.rb b/lib/async/job/adapter/active_job/recurring/scheduler.rb new file mode 100644 index 0000000..048ab1f --- /dev/null +++ b/lib/async/job/adapter/active_job/recurring/scheduler.rb @@ -0,0 +1,212 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "console" +require "socket" + +begin + require "async/redis/client" +rescue LoadError + # Optional; only used if REDIS_URL is present or backend=redis. +end + +module Async + module Job + module Adapter + module ActiveJob + module Recurring + # Reconciles configured recurring tasks with any previously + # persisted set, removing keys for tasks that were removed + # from the schedule (sidekiq-cron style). This does NOT + # purge already-enqueued jobs from queues. + module Reconciler + module_function + + # Reconcile configured tasks with persisted Redis state, removing keys for deleted tasks. + # Cleans up deduplication locks, last-run records, and task set entries for tasks + # that were removed from the schedule configuration. + # @parameter tasks [Array] The current set of recurring tasks to reconcile. + # @parameter prefix [String] The Redis key prefix for recurring task data. + def reconcile(tasks, prefix: Scheduler::DEFAULT_PREFIX) + return unless Backend.redis_enabled? + redis = Backend.redis_client + set_key = "#{prefix}:recurring:tasks" + last_key = "#{prefix}:recurring:last" + + current = Array(tasks).map(&:key) + existing = redis.call("SMEMBERS", set_key) || [] + removed = existing - current + added = current - existing + + removed.each do |key| + # Delete dedup locks for historical executions of this task + pattern = "#{prefix}:recurring:exec:#{key}:*" + scan_and_delete(redis, pattern) + # Drop last-run record for this task + redis.call("HDEL", last_key, key) + # Remove from tasks set + redis.call("SREM", set_key, key) + Console.info(self, "[recurring] removed task", key: key) + end + + redis.call("SADD", set_key, *added) unless added.empty? + Console.info(self, "[recurring] reconcile", kept: (current - added), removed: removed, added: added) unless (removed.empty? && added.empty?) + rescue => e + Console.warn(self, "[recurring] reconcile failed", exception: e) + end + + # Scan Redis for keys matching a pattern and delete them in batches. + # Uses Redis SCAN to safely iterate through keys without blocking. + # @parameter redis [Async::Redis::Client] The Redis client connection. + # @parameter pattern [String] The Redis key pattern to match (e.g., "prefix:*"). + def scan_and_delete(redis, pattern) + cursor = "0" + begin + cursor, batch = redis.call("SCAN", cursor, "MATCH", pattern, "COUNT", 200) + slice = Array(batch) + redis.call("DEL", *slice) unless slice.empty? + end while cursor != "0" + end + end + + # Backend configuration for deduplication and last-run tracking. + module Backend + module_function + + # Check if Redis backend is enabled based on environment configuration. + # @returns [Boolean] True if Redis is enabled and available. + def redis_enabled? + backend = ENV["ASYNC_JOB_RECURRING_DEDUP"] || ENV["JOBS_DEDUP_BACKEND"] || "auto" + last = ENV["ASYNC_JOB_RECURRING_LAST"] || ENV["JOBS_LAST_BACKEND"] || "auto" + wants_redis = [backend, last].any? {|v| v == "redis"} + auto_redis = ([backend, last].include?("auto") && ENV.key?("REDIS_URL")) + (wants_redis || auto_redis) && defined?(Async::Redis::Client) + end + + # Get or create a Redis client connection. + # @returns [Async::Redis::Client, nil] The Redis client or nil if not enabled. + def redis_client + return @redis if defined?(@redis) && @redis + return nil unless redis_enabled? + + endpoint = ENV["REDIS_URL"] ? Async::Redis::Endpoint.parse(ENV["REDIS_URL"]) : Async::Redis::Endpoint.local + @redis = Async::Redis::Client.new(endpoint) + end + + # Determine which backend to use for deduplication. + # @returns [String] Either "redis" or "memory". + def dedup_backend + v = ENV["ASYNC_JOB_RECURRING_DEDUP"] || ENV["JOBS_DEDUP_BACKEND"] || "auto" + return "redis" if v == "redis" + return "memory" if v == "memory" + redis_enabled? ? "redis" : "memory" + end + + # Determine which backend to use for last-run tracking. + # @returns [String] Either "redis" or "cache". + def last_backend + v = ENV["ASYNC_JOB_RECURRING_LAST"] || ENV["JOBS_LAST_BACKEND"] || "auto" + return "redis" if v == "redis" + return "cache" if v == "cache" + redis_enabled? ? "redis" : "cache" + end + end + + # Schedules and enqueues recurring tasks based on cron expressions. + class Scheduler + DEFAULT_DEDUP_TTL = Integer(ENV["ASYNC_JOB_RECURRING_DEDUP_TTL"] || ENV["JOBS_SCHEDULER_DEDUP_TTL"] || "600") + DEFAULT_PREFIX = ENV["ASYNC_JOB_REDIS_PREFIX"] || ENV["JOBS_REDIS_PREFIX"] || "async-job" + + # Initialize the scheduler with a list of tasks. + # @parameter tasks [Array] The recurring tasks to schedule. + # @parameter prefix [String] The Redis key prefix. + def initialize(tasks, prefix: DEFAULT_PREFIX) + @tasks = tasks + @prefix = prefix + end + + # Start the scheduler and run all recurring tasks in parallel. + def run + barrier = Async::Barrier.new + @tasks.each do |task| + barrier.async {run_task(task)} + end + barrier.wait + end + + private + def run_task(task) + loop do + begin + now = Time.now + next_eo = task.cron.next_time(now) + delay = next_eo.to_f - now.to_f + Async::Task.current.sleep(delay) if delay > 0 + + run_at = Time.at(next_eo.to_i) + next unless claim(task.key, run_at) + + if task.klass + set_opts = {} + set_opts[:queue] = task.queue if task.queue + set_opts[:priority] = task.priority if task.priority + job = set_opts.empty? ? task.klass : task.klass.set(**set_opts) + args = task.args + if args.nil? + job.perform_later + elsif args.is_a?(Array) + job.perform_later(*args) + else + job.perform_later(args) + end + elsif task.command + eval(task.command, TOPLEVEL_BINDING) + end + + write_last(task.key, Time.now) + Console.info(self, "Enqueued recurring task.", key: task.key) + rescue => e + Console.warn(self, "Recurring task failed!", key: task.key, exception: e) + # Sleep briefly before retrying to avoid tight error loop + Async::Task.current.sleep(5) + end + end + end + + def claim(key, run_at) + if Backend.redis_enabled? + redis = Backend.redis_client + dedup_key = "#{@prefix}:recurring:exec:#{key}:#{run_at.to_i}" + value = "#{Socket.gethostname}-#{Process.pid}" + result = redis.call("SET", dedup_key, value, "NX", "EX", DEFAULT_DEDUP_TTL) + return result == "OK" + else + @mem ||= {} + mk = [key, run_at.to_i].join(":") + return false if @mem[mk] + @mem[mk] = true + return true + end + rescue => e + Console.warn(self, "Dedup claim failed; proceeding.", key: key, exception: e) + true + end + + def write_last(key, t) + if Backend.redis_enabled? + Backend.redis_client.call("HSET", "#{@prefix}:recurring:last", key, t.to_i) + else + if defined?(::Rails) && ::Rails.respond_to?(:cache) && ::Rails.cache + ::Rails.cache.write("#{@prefix}:recurring:last:#{key}", t.to_i, expires_in: 1.hour) + end + end + rescue => e + Console.warn(self, "Failed to write last run.", key: key, exception: e) + end + end + end + end + end + end +end diff --git a/lib/async/job/adapter/active_job/recurring/service.rb b/lib/async/job/adapter/active_job/recurring/service.rb new file mode 100644 index 0000000..7be1323 --- /dev/null +++ b/lib/async/job/adapter/active_job/recurring/service.rb @@ -0,0 +1,88 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "async/service/generic" +require "async" +require "console" + +require_relative "loader" +require_relative "scheduler" + +module Async + module Job + module Adapter + module ActiveJob + module Recurring + # Service for running the recurring task scheduler. + class Service < Async::Service::Generic + # Setup the recurring scheduler service container. + # This implementation is framework-agnostic. It optionally boots the + # application if a boot file is specified or detected, but does not + # require Rails. + # @parameter container [Async::Container::Generic] The service container. + def setup(container) + container.run(name: self.name, count: 1) do |instance| + root = ENV.fetch("RAILS_ROOT", Dir.pwd) + # Determine environment without relying on Rails: + env = ENV["ASYNC_JOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || ENV["APP_ENV"] || "development" + + # Optionally boot the application. Prefer explicit boot file via env, + # otherwise auto-detect common Rails boot file if present: + boot = ENV["ASYNC_JOB_BOOT"] + boot ||= "config/environment" if File.exist?(File.expand_path("config/environment.rb", root)) + begin + require File.expand_path(boot, root) if boot + rescue LoadError => e + Console.warn(self, "Failed to boot application; continuing without.", boot: boot, exception: e) + end + + if skip? + Console.info(self, "Recurring scheduler disabled via env.") + instance.ready! + Async {sleep}.wait + else + schedule_file = Loader.schedule_path(root) + tasks = Loader.load(root: root, env: env) + # Reconcile tasks so removed ones are cleaned up (like sidekiq-cron): + begin + Reconciler.reconcile(tasks, prefix: Scheduler::DEFAULT_PREFIX) + rescue => e + Console.warn(self, "Failed to reconcile recurring tasks.", exception: e) + end + if tasks.empty? + if File.exist?(schedule_file) + Console.info(self, "No valid recurring tasks after parsing schedule.", env: env, path: schedule_file) + else + Console.info(self, "No recurring tasks loaded (missing file).", env: env, path: schedule_file) + end + instance.ready! + Async {sleep}.wait + else + Console.info(self, "Starting recurring scheduler.", tasks: tasks.size, env: env, path: schedule_file, + dedup: Recurring::Backend.dedup_backend, last: Recurring::Backend.last_backend, prefix: Recurring::Scheduler::DEFAULT_PREFIX) + # Per-task banner with schedule label and optional queue/priority: + tasks.each do |t| + label = t.cron.respond_to?(:original) ? t.cron.original : t.cron.to_s + Console.info(self, "[scheduler] task", key: t.key, schedule: label, queue: t.queue, priority: t.priority) + end + instance.ready! + # Ensure we have an async task context for the scheduler loops: + Async do + Scheduler.new(tasks).run + end.wait + end + end + end + end + + private + def skip? + ENV["ASYNC_JOB_SKIP_RECURRING"] == "true" || ENV["SOLID_QUEUE_SKIP_RECURRING"] == "true" || ENV["JOBS_SKIP_RECURRING"] == "true" + end + end + end + end + end + end +end diff --git a/lib/async/job/adapter/active_job/recurring/task.rb b/lib/async/job/adapter/active_job/recurring/task.rb new file mode 100644 index 0000000..90ffc2d --- /dev/null +++ b/lib/async/job/adapter/active_job/recurring/task.rb @@ -0,0 +1,58 @@ +# frozen_string_literal: true + +# Released under the MIT License. +# Copyright, 2025, by Samuel Williams. + +require "fugit" + +module Async + module Job + module Adapter + module ActiveJob + module Recurring + # Represents a single recurring task loaded from config/recurring.yml. + Task = Struct.new(:key, :klass, :command, :queue, :priority, :args, :cron, keyword_init: true) + + # Helper methods for parsing and processing recurring task configuration. + module Helper + module_function + + # Normalize simple natural schedules to a cron string that Fugit::Cron understands. + def normalize_schedule(spec) + s = spec.to_s.strip + return s if s.empty? + + if (m = s.match(/\Aevery\s+(\d+)\s*seconds?\z/i)) + "*/#{Integer(m[1])} * * * * *" + elsif (m = s.match(/\Aevery\s+(\d+)\s*minutes?\z/i)) + "0 */#{Integer(m[1])} * * * *" + elsif (m = s.match(/\Aevery\s+(\d+)\s*hours?\z/i)) + "0 0 */#{Integer(m[1])} * * *" + else + s + end + end + + # Parse a cron expression string into a Fugit::Cron object. + # @parameter s [String] The cron expression to parse. + # @returns [Fugit::Cron, nil] The parsed cron object or nil if invalid. + def parse_cron(s) + cron = Fugit.parse(s) + cron.is_a?(Fugit::Cron) ? cron : nil + end + + # Resolve a constant name string to its class/module object. + # @parameter name [String] The constant name (e.g., "MyJob" or "Foo::Bar"). + # @returns [Class, Module, nil] The constant or nil if not found. + def constantize(name) + name.to_s.split("::").inject(Object) {|m, part| m.const_get(part)} + rescue NameError + nil + end + end + end + end + end + end +end + diff --git a/test/active_job/queue_adapters/async_job_adapter.rb b/test/active_job/queue_adapters/async_job_adapter.rb index dbf4730..d34dc1d 100644 --- a/test/active_job/queue_adapters/async_job_adapter.rb +++ b/test/active_job/queue_adapters/async_job_adapter.rb @@ -29,12 +29,6 @@ adapter = subject.new(dispatcher) expect(adapter.instance_variable_get(:@dispatcher)).to be == dispatcher end - - it "can initialize with default dispatcher" do - # Skip test that requires Rails/Railtie - tested in integration - # This would test: adapter = subject.new - # expect(adapter.instance_variable_get(:@dispatcher)).to be_a(Async::Job::Adapter::ActiveJob::ThreadLocalDispatcher) - end end with "#enqueue" do diff --git a/test/async/job/adapter/active_job/dispatcher.rb b/test/async/job/adapter/active_job/dispatcher.rb index 1dfac64..49f0bc6 100644 --- a/test/async/job/adapter/active_job/dispatcher.rb +++ b/test/async/job/adapter/active_job/dispatcher.rb @@ -89,5 +89,18 @@ expect(dispatcher.status_string).to be == "test_queue" end + + it "includes server status when server responds to status_string" do + mock_server = Object.new + mock_server.define_singleton_method(:status_string) {"running"} + + mock_queue = Object.new + mock_queue.define_singleton_method(:server) {mock_server} + + dispatcher.definitions["test_queue"] = Object.new + dispatcher.queues["test_queue"] = mock_queue + + expect(dispatcher.status_string).to be == "test_queue running" + end end end diff --git a/test/async/job/adapter/active_job/recurring/environment.rb b/test/async/job/adapter/active_job/recurring/environment.rb new file mode 100644 index 0000000..97f6003 --- /dev/null +++ b/test/async/job/adapter/active_job/recurring/environment.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "async/job/adapter/active_job/recurring/environment" + +describe Async::Job::Adapter::ActiveJob::Recurring::Environment do + # Create a test class that includes the module + let(:test_class) do + Class.new do + include Async::Job::Adapter::ActiveJob::Recurring::Environment + end + end + + let(:test_instance) {test_class.new} + + it "returns Service class from service_class method" do + expect(test_instance.service_class).to be == Async::Job::Adapter::ActiveJob::Recurring::Service + end + + it "returns 1 from count method" do + expect(test_instance.count).to be == 1 + end +end diff --git a/test/async/job/adapter/active_job/recurring/loader.rb b/test/async/job/adapter/active_job/recurring/loader.rb new file mode 100644 index 0000000..39be88e --- /dev/null +++ b/test/async/job/adapter/active_job/recurring/loader.rb @@ -0,0 +1,221 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "yaml" +require "tmpdir" + +require "sus/fixtures/async/reactor_context" +require "sus/fixtures/console" + +require "async/job/adapter/active_job/recurring/loader" +require "async/job/adapter/active_job/recurring/task" + +describe Async::Job::Adapter::ActiveJob::Recurring::Loader do + include Sus::Fixtures::Async::ReactorContext + include Sus::Fixtures::Console::CapturedLogger + + let(:root) {Dir.mktmpdir("recurring-spec")} + + after do + FileUtils.remove_entry(root) if File.exist?(root) + end + + it 'loads tasks from recurring.yml and normalizes "every N seconds"' do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' # any constant to survive constantize + queue: default + schedule: every 5 seconds + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + task = tasks.first + expect(task.key).to be == "example" + expect(task.queue).to be == "default" + # Cron with seconds field every 5s + expect(task.cron.original).to be == "*/5 * * * * *" + end + + it 'normalizes "every N minutes"' do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: every 10 minutes + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + expect(tasks.first.cron.original).to be == "0 */10 * * * *" + end + + it 'normalizes "every N hours"' do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: every 2 hours + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + expect(tasks.first.cron.original).to be == "0 0 */2 * * *" + end + + it "returns empty array when file does not exist" do + tasks = subject.load(root: root, env: "test") + expect(tasks).to be == [] + end + + it "returns empty array when env has no tasks" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + production: + example: + class: 'ActiveJob::Base' + schedule: every 5 seconds + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks).to be == [] + end + + it "skips task without schedule" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks).to be == [] + end + + it "skips task with invalid schedule" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: 'not a valid cron' + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks).to be == [] + end + + it "skips task with unknown job class" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'NonExistentJobClass' + schedule: every 5 seconds + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks).to be == [] + end + + it "loads task with command instead of class" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + command: 'puts "Hello"' + schedule: every 5 seconds + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + task = tasks.first + expect(task.command).to be == 'puts "Hello"' + expect(task.klass).to be == nil + end + + it "loads task with priority and queue" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: every 5 seconds + queue: high_priority + priority: 10 + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + task = tasks.first + expect(task.queue).to be == "high_priority" + expect(task.priority).to be == 10 + end + + it "loads task with array args" do + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: every 5 seconds + args: [1, 2, 3] + YAML + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + expect(tasks.first.args).to be == [1, 2, 3] + end + + it "uses ASYNC_JOB_RECURRING_SCHEDULE env var for schedule path" do + ENV["ASYNC_JOB_RECURRING_SCHEDULE"] = File.join(root, "custom.yml") + FileUtils.mkdir_p(root) + File.write(ENV["ASYNC_JOB_RECURRING_SCHEDULE"], <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: every 5 seconds + YAML + + path = subject.schedule_path(root) + expect(path).to be == File.join(root, "custom.yml") + + tasks = subject.load(root: root, env: "test") + expect(tasks.size).to be == 1 + ensure + ENV.delete("ASYNC_JOB_RECURRING_SCHEDULE") + end + + it "uses SOLID_QUEUE_RECURRING_SCHEDULE env var for schedule path" do + ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] = File.join(root, "solid.yml") + FileUtils.mkdir_p(root) + File.write(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"], <<~YAML) + test: + example: + class: 'ActiveJob::Base' + schedule: every 5 seconds + YAML + + path = subject.schedule_path(root) + expect(path).to be == File.join(root, "solid.yml") + ensure + ENV.delete("SOLID_QUEUE_RECURRING_SCHEDULE") + end +end + diff --git a/test/async/job/adapter/active_job/recurring/reconciler.rb b/test/async/job/adapter/active_job/recurring/reconciler.rb new file mode 100644 index 0000000..c90c7dd --- /dev/null +++ b/test/async/job/adapter/active_job/recurring/reconciler.rb @@ -0,0 +1,206 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "sus/fixtures/async/reactor_context" +require "sus/fixtures/console" + +require "fugit" + +require "async/job/adapter/active_job/recurring/task" +require "async/job/adapter/active_job/recurring/scheduler" + +describe Async::Job::Adapter::ActiveJob::Recurring::Reconciler do + include Sus::Fixtures::Async::ReactorContext + include Sus::Fixtures::Console::CapturedLogger + + let(:task_struct) {Async::Job::Adapter::ActiveJob::Recurring::Task} + let(:reconciler) {Async::Job::Adapter::ActiveJob::Recurring::Reconciler} + let(:backend) {Async::Job::Adapter::ActiveJob::Recurring::Backend} + + def with_redis_enabled + orig = ENV.to_h + begin + ENV["REDIS_URL"] = "redis://localhost:6379" + ENV["JOBS_DEDUP_BACKEND"] = "redis" + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + yield + ensure + ENV.replace(orig) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + end + end + + with "Redis integration" do + it "removes orphaned tasks from Redis when they are removed from schedule" do + with_redis_enabled do + prefix = "test-reconcile-#{rand(100000)}" + redis = backend.redis_client + + # Create two tasks + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task_keep", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + task2 = task_struct.new(key: "task_remove", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + + # Simulate that both tasks existed previously in Redis + set_key = "#{prefix}:recurring:tasks" + redis.call("SADD", set_key, "task_keep", "task_remove") + + # Add some dedup keys for task_remove + redis.call("SET", "#{prefix}:recurring:exec:task_remove:12345", "value1") + redis.call("SET", "#{prefix}:recurring:exec:task_remove:67890", "value2") + + # Add last-run record for task_remove + last_key = "#{prefix}:recurring:last" + redis.call("HSET", last_key, "task_remove", "12345") + + # Now reconcile with only task1 (task2 was removed from schedule) + reconciler.reconcile([task1], prefix: prefix) + + # Verify task_remove was cleaned up + members = redis.call("SMEMBERS", set_key) + expect(members).to be == ["task_keep"] + + # Verify dedup keys were deleted + keys = redis.call("KEYS", "#{prefix}:recurring:exec:task_remove:*") + expect(keys).to be == [] + + # Verify last-run record was deleted + last_val = redis.call("HGET", last_key, "task_remove") + expect(last_val).to be == nil + + # Cleanup + redis.call("DEL", set_key, last_key) + end + end + + it "adds new tasks to Redis when they are added to schedule" do + with_redis_enabled do + prefix = "test-reconcile-add-#{rand(100000)}" + redis = backend.redis_client + + # Create two tasks + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task_existing", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + task2 = task_struct.new(key: "task_new", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + + # Simulate that only task1 existed previously in Redis + set_key = "#{prefix}:recurring:tasks" + redis.call("SADD", set_key, "task_existing") + + # Now reconcile with both tasks (task2 is new) + reconciler.reconcile([task1, task2], prefix: prefix) + + # Verify both tasks are now in the set + members = redis.call("SMEMBERS", set_key).sort + expect(members).to be == ["task_existing", "task_new"] + + # Cleanup + redis.call("DEL", set_key) + end + end + + it "handles empty removed and added lists gracefully" do + with_redis_enabled do + prefix = "test-reconcile-noop-#{rand(100000)}" + redis = backend.redis_client + + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task1", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + + set_key = "#{prefix}:recurring:tasks" + redis.call("SADD", set_key, "task1") + + # Reconcile with same task (no changes) + reconciler.reconcile([task1], prefix: prefix) + + # Should still have the task + members = redis.call("SMEMBERS", set_key) + expect(members).to be == ["task1"] + + # Cleanup + redis.call("DEL", set_key) + end + end + + it "scans and deletes keys matching a pattern" do + with_redis_enabled do + prefix = "test-scan-#{rand(100000)}" + redis = backend.redis_client + + # Create multiple keys matching the pattern + redis.call("SET", "#{prefix}:recurring:exec:task1:111", "v1") + redis.call("SET", "#{prefix}:recurring:exec:task1:222", "v2") + redis.call("SET", "#{prefix}:recurring:exec:task1:333", "v3") + # Create a key that doesn't match + redis.call("SET", "#{prefix}:recurring:exec:task2:444", "v4") + + # Scan and delete task1 keys + pattern = "#{prefix}:recurring:exec:task1:*" + reconciler.scan_and_delete(redis, pattern) + + # Verify task1 keys are gone + keys = redis.call("KEYS", "#{prefix}:recurring:exec:task1:*") + expect(keys).to be == [] + + # Verify task2 key still exists + keys = redis.call("KEYS", "#{prefix}:recurring:exec:task2:*") + expect(keys.length).to be == 1 + + # Cleanup + redis.call("DEL", "#{prefix}:recurring:exec:task2:444") + end + end + end + + it "returns early when Redis is not enabled" do + orig = ENV.to_h + begin + ENV.delete("REDIS_URL") + ENV["JOBS_DEDUP_BACKEND"] = "memory" + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task1", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + + # Should return early without error + reconciler.reconcile([task1], prefix: "test") + + # Test passes if we got here + expect(true).to be == true + ensure + ENV.replace(orig) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + end + end + + with "Redis integration" do + it "logs warning when reconcile encounters Redis error" do + with_redis_enabled do + prefix = "test-reconcile-error-#{rand(100000)}" + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task1", klass: nil, command: nil, queue: "default", priority: 0, args: nil, cron: cron) + + # Stub redis_client to return a mock that raises on SMEMBERS + redis_orig = backend.method(:redis_client) + failing_redis = Object.new + def failing_redis.call(*args) + raise "Redis connection failed!" + end + + backend.define_singleton_method(:redis_client) {failing_redis} + + # Should rescue and log warning + reconciler.reconcile([task1], prefix: prefix) + + # Check that warning was logged + expect_console.to have_logged(message: be(:include?, "[recurring] reconcile failed")) + + # Test passes if we got here without exception + expect(true).to be == true + ensure + backend.define_singleton_method(:redis_client, redis_orig) if redis_orig + end + end + end +end diff --git a/test/async/job/adapter/active_job/recurring/scheduler.rb b/test/async/job/adapter/active_job/recurring/scheduler.rb new file mode 100644 index 0000000..e044f58 --- /dev/null +++ b/test/async/job/adapter/active_job/recurring/scheduler.rb @@ -0,0 +1,471 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "sus/fixtures/async/reactor_context" +require "sus/fixtures/console" + +require "fugit" +require "securerandom" + +require "async/job/adapter/active_job/recurring/task" +require "async/job/adapter/active_job/recurring/scheduler" + +describe Async::Job::Adapter::ActiveJob::Recurring::Scheduler do + include Sus::Fixtures::Async::ReactorContext + include Sus::Fixtures::Console::CapturedLogger + + let(:task_struct) {Async::Job::Adapter::ActiveJob::Recurring::Task} + let(:scheduler_class) {Async::Job::Adapter::ActiveJob::Recurring::Scheduler} + + # Minimal stub Active Job class which records perform_later calls. + class StubJob + @calls = [] + class << self + attr_reader :calls + def reset!; @calls = []; end + def set(queue: nil, priority: nil); self; end + def perform_later(*args); @calls << args; end + end + end + + before {StubJob.reset!} + + it "enqueues a job on schedule (memory dedup, no Redis)" do + # Every 1 second. + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "test", klass: StubJob, command: nil, queue: "default", priority: 0, args: ["a"], cron: cron) + + scheduler = scheduler_class.new([task], prefix: "test-job") + + # Run a single task loop in the reactor and stop after we observe at least one call. + child = Async do + scheduler.send(:run_task, task) + end + + # Wait up to ~1.5s for the first enqueue. + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while StubJob.calls.empty? && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) < 1.6 + sleep 0.05 + end + + child.stop + + expect(StubJob.calls.length).to be >= 1 + expect(StubJob.calls.first).to be == ["a"] + end + + it "writes last-run to Rails.cache when Redis is not enabled" do + # Install a minimal Rails.cache stub. + cache = Class.new do + attr_reader :writes + def initialize; @writes = {}; end + def write(key, val, **_) @writes[key] = val; end + end.new + + Object.const_set(:Rails, Module.new) unless defined?(::Rails) + ::Rails.singleton_class.define_method(:cache) {cache} + + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "cache_test", klass: StubJob, queue: "default", args: nil, cron: cron) + scheduler = scheduler_class.new([task], prefix: "test-job") + + # Call the private writer directly to avoid timing concerns: + t = Time.at(123) + scheduler.send(:write_last, task.key, t) + + expect(cache.writes.keys).to be(:include?, "test-job:recurring:last:cache_test") + expect(cache.writes["test-job:recurring:last:cache_test"]).to be == 123 + ensure + # Clean up Rails constant if we created it: + if defined?(::Rails) && ::Rails.singleton_class.method_defined?(:cache) && ::Rails.cache == cache + # leave Rails defined for subsequent tests, only remove the writer if needed + end + end + + it "selects dedup and last backends from env aliases" do + orig = ENV.to_h + ENV["JOBS_DEDUP_BACKEND"] = "memory" + ENV["JOBS_LAST_BACKEND"] = "cache" + + # These are module methods on Backend: + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + expect(backend.dedup_backend).to be == "memory" + expect(backend.last_backend).to be == "cache" + ensure + ENV.replace(orig) + end + + it "selects dedup and last backends with ASYNC_JOB env vars" do + orig = ENV.to_h + ENV["ASYNC_JOB_RECURRING_DEDUP"] = "memory" + ENV["ASYNC_JOB_RECURRING_LAST"] = "cache" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + expect(backend.dedup_backend).to be == "memory" + expect(backend.last_backend).to be == "cache" + ensure + ENV.replace(orig) + end + + it "returns 'redis' when explicitly set for dedup" do + orig = ENV.to_h + ENV["JOBS_DEDUP_BACKEND"] = "redis" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + # Will return "redis" if env says redis (even if Redis not available) + expect(backend.dedup_backend).to be == "redis" + ensure + ENV.replace(orig) + end + + it "returns 'redis' when explicitly set for last" do + orig = ENV.to_h + ENV["JOBS_LAST_BACKEND"] = "redis" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + expect(backend.last_backend).to be == "redis" + ensure + ENV.replace(orig) + end + + it "enqueues job with nil args" do + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "nil_args", klass: StubJob, queue: "default", args: nil, cron: cron) + + scheduler = scheduler_class.new([task], prefix: "test-job") + + child = Async do + scheduler.send(:run_task, task) + end + + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while StubJob.calls.empty? && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) < 1.6 + sleep 0.05 + end + + child.stop + + expect(StubJob.calls.length).to be >= 1 + expect(StubJob.calls.first).to be == [] + end + + it "enqueues job with single non-array arg" do + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "single_arg", klass: StubJob, queue: "default", args: "hello", cron: cron) + + scheduler = scheduler_class.new([task], prefix: "test-job") + + child = Async do + scheduler.send(:run_task, task) + end + + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while StubJob.calls.empty? && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) < 1.6 + sleep 0.05 + end + + child.stop + + expect(StubJob.calls.length).to be >= 1 + expect(StubJob.calls.first).to be == ["hello"] + end + + it "enqueues job with queue and priority set" do + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "with_opts", klass: StubJob, queue: "high", priority: 10, args: ["a"], cron: cron) + + scheduler = scheduler_class.new([task], prefix: "test-job") + + # We're just checking that set() is called - StubJob.set returns self + child = Async do + scheduler.send(:run_task, task) + end + + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while StubJob.calls.empty? && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) < 1.6 + sleep 0.05 + end + + child.stop + + expect(StubJob.calls.length).to be >= 1 + end + + it "runs task with command instead of class" do + $test_command_ran = false + + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "command", klass: nil, command: "$test_command_ran = true", queue: nil, args: nil, cron: cron) + + scheduler = scheduler_class.new([task], prefix: "test-job") + + child = Async do + scheduler.send(:run_task, task) + end + + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while !$test_command_ran && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) < 1.6 + sleep 0.05 + end + + child.stop + + expect($test_command_ran).to be == true + end + + it "handles errors in run_task gracefully" do + # Create a job class that raises an error + failing_job = Class.new do + def self.perform_later(*args) + raise "Job failed!" + end + end + + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "failing", klass: failing_job, queue: nil, args: nil, cron: cron) + + scheduler = scheduler_class.new([task], prefix: "test-job") + + # Run one iteration and verify it doesn't crash + child = Async do + scheduler.send(:run_task, task) + end + + sleep 1.2 + child.stop + + # Test passes if we got here without exception + expect(true).to be == true + end + + it "handles errors in claim gracefully" do + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "claim_err", klass: StubJob, args: nil, cron: cron) + scheduler = scheduler_class.new([task], prefix: "test-job") + + # Force claim to raise by stubbing Backend.redis_enabled? + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + original_method = backend.method(:redis_enabled?) + backend.define_singleton_method(:redis_enabled?) {raise "Redis error"} + + # claim should rescue and return true + result = scheduler.send(:claim, task.key, Time.now) + expect(result).to be == true + ensure + backend.define_singleton_method(:redis_enabled?, original_method) + end + + it "handles errors in write_last gracefully" do + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "write_err", klass: StubJob, args: nil, cron: cron) + scheduler = scheduler_class.new([task], prefix: "test-job") + + # Force write_last to fail by stubbing Backend.redis_enabled? + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + original_method = backend.method(:redis_enabled?) + backend.define_singleton_method(:redis_enabled?) {raise "Write error"} + + # Should not raise + scheduler.send(:write_last, task.key, Time.now) + # Test passes if we got here + expect(true).to be == true + ensure + backend.define_singleton_method(:redis_enabled?, original_method) + end + + it "runs multiple tasks in parallel via public run method" do + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task1", klass: StubJob, queue: nil, args: ["a"], cron: cron) + task2 = task_struct.new(key: "task2", klass: StubJob, queue: nil, args: ["b"], cron: cron) + + scheduler = scheduler_class.new([task1, task2], prefix: "test-job") + + # Run scheduler in background and stop after tasks execute + child = Async do + scheduler.run + end + + started = Process.clock_gettime(Process::CLOCK_MONOTONIC) + while StubJob.calls.length < 2 && (Process.clock_gettime(Process::CLOCK_MONOTONIC) - started) < 1.6 + sleep 0.05 + end + + child.stop + + # Should have enqueued both tasks + expect(StubJob.calls.length).to be >= 2 + end + + it "uses auto backend detection when Redis not available" do + orig = ENV.to_h + ENV.delete("ASYNC_JOB_RECURRING_DEDUP") + ENV.delete("JOBS_DEDUP_BACKEND") + ENV.delete("ASYNC_JOB_RECURRING_LAST") + ENV.delete("JOBS_LAST_BACKEND") + ENV.delete("REDIS_URL") + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + # With auto and no Redis, should fall back to memory/cache + expect(backend.dedup_backend).to be == "memory" + expect(backend.last_backend).to be == "cache" + ensure + ENV.replace(orig) + end + + it "returns nil from redis_client when redis not enabled" do + orig = ENV.to_h + ENV["JOBS_DEDUP_BACKEND"] = "memory" + ENV["JOBS_LAST_BACKEND"] = "cache" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + # Remove any cached redis client + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + expect(backend.redis_client).to be == nil + ensure + ENV.replace(orig) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + end + + it "returns cached redis client on subsequent calls" do + skip "Requires async-redis gem and Redis server" unless defined?(Async::Redis::Client) + + orig_env = ENV.to_h + orig_endpoint_local = Async::Redis::Endpoint.method(:local) rescue nil + orig_client_new = Async::Redis::Client.method(:new) rescue nil + + begin + ENV["JOBS_DEDUP_BACKEND"] = "redis" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + # Remove any cached redis + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + # Mock the Redis client + mock_redis = Object.new + mock_endpoint = Object.new + + # Stub Async::Redis::Endpoint.local + Async::Redis::Endpoint.define_singleton_method(:local) {mock_endpoint} + Async::Redis::Client.define_singleton_method(:new) {|*args| mock_redis} + + client1 = backend.redis_client + client2 = backend.redis_client + + expect(client1).to be == client2 + ensure + ENV.replace(orig_env) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + # Restore original methods + if orig_endpoint_local + Async::Redis::Endpoint.define_singleton_method(:local, orig_endpoint_local) + end + if orig_client_new + Async::Redis::Client.define_singleton_method(:new, orig_client_new) + end + end + end + + with "Redis integration" do + it "uses Redis for claim when redis_enabled" do + orig = ENV.to_h + begin + ENV["REDIS_URL"] = "redis://localhost:6379" + ENV["JOBS_DEDUP_BACKEND"] = "redis" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + prefix = "test-redis-#{SecureRandom.hex(8)}" + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "redis_claim", klass: StubJob, args: nil, cron: cron) + scheduler = scheduler_class.new([task], prefix: prefix) + + # Use fixed timestamp so both claims use same dedup key + run_at = Time.now + + # First claim should succeed + result1 = scheduler.send(:claim, task.key, run_at) + expect(result1).to be == true + + # Second claim with same key+time should fail (deduplicated) + result2 = scheduler.send(:claim, task.key, run_at) + expect(result2).to be == false + ensure + ENV.replace(orig) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + end + end + + it "uses Redis for write_last when redis_enabled" do + orig = ENV.to_h + begin + ENV["REDIS_URL"] = "redis://localhost:6379" + ENV["JOBS_LAST_BACKEND"] = "redis" + ENV["JOBS_DEDUP_BACKEND"] = "redis" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + prefix = "test-redis-#{SecureRandom.hex(8)}" + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "redis_last", klass: StubJob, args: nil, cron: cron) + scheduler = scheduler_class.new([task], prefix: prefix) + + t = Time.at(12345) + scheduler.send(:write_last, task.key, t) + + # Verify the value was written to Redis + client = Async::Redis::Client.new + stored_value = client.call("HGET", "#{prefix}:recurring:last", task.key) + expect(stored_value).to be == "12345" + + # Clean up + client.call("DEL", "#{prefix}:recurring:last") + ensure + ENV.replace(orig) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + end + end + + it "writes and reads last-run timestamp via Redis" do + orig = ENV.to_h + begin + ENV["REDIS_URL"] = "redis://localhost:6379" + ENV["JOBS_LAST_BACKEND"] = "redis" + ENV["JOBS_DEDUP_BACKEND"] = "redis" + + backend = Async::Job::Adapter::ActiveJob::Recurring::Backend + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + + prefix = "test-redis-#{SecureRandom.hex(8)}" + cron = Fugit.parse("*/1 * * * * *") + task1 = task_struct.new(key: "task_a", klass: StubJob, args: nil, cron: cron) + task2 = task_struct.new(key: "task_b", klass: StubJob, args: nil, cron: cron) + scheduler = scheduler_class.new([task1, task2], prefix: prefix) + + # Write last-run times for both tasks + t1 = Time.at(100) + t2 = Time.at(200) + scheduler.send(:write_last, task1.key, t1) + scheduler.send(:write_last, task2.key, t2) + + # Read back and verify + client = Async::Redis::Client.new + val1 = client.call("HGET", "#{prefix}:recurring:last", task1.key) + val2 = client.call("HGET", "#{prefix}:recurring:last", task2.key) + + expect(val1).to be == "100" + expect(val2).to be == "200" + + # Clean up + client.call("DEL", "#{prefix}:recurring:last") + ensure + ENV.replace(orig) + backend.remove_instance_variable(:@redis) if backend.instance_variable_defined?(:@redis) + end + end + end +end diff --git a/test/async/job/adapter/active_job/recurring/service.rb b/test/async/job/adapter/active_job/recurring/service.rb new file mode 100644 index 0000000..45f14ab --- /dev/null +++ b/test/async/job/adapter/active_job/recurring/service.rb @@ -0,0 +1,263 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "tmpdir" +require "yaml" +require "fugit" + +require "sus/fixtures/async/reactor_context" +require "sus/fixtures/console" + +require "async/service/generic" +require "async" + +require "async/job/adapter/active_job/recurring/service" +require "async/job/adapter/active_job/recurring/loader" +require "async/job/adapter/active_job/recurring/task" + +describe Async::Job::Adapter::ActiveJob::Recurring::Service do + include Sus::Fixtures::Async::ReactorContext + include Sus::Fixtures::Console::CapturedLogger + + let(:service_class) {Async::Job::Adapter::ActiveJob::Recurring::Service} + FakeEvaluator = Struct.new(:name, :health_check_timeout, keyword_init: true) + FakeEnvironment = Struct.new(:evaluator, keyword_init: true) + let(:loader_mod) {Async::Job::Adapter::ActiveJob::Recurring::Loader} + let(:task_struct) {Async::Job::Adapter::ActiveJob::Recurring::Task} + + # A minimal container that records runs and provides a controllable task. + class FakeInstance + attr_reader :ready_calls + def initialize + @ready_calls = 0 + end + def ready! + @ready_calls += 1 + end + end + + class FakeContainer + attr_reader :tasks, :instances, :runs + def initialize + @tasks = [] + @instances = [] + @runs = [] + end + + # Mimic Async::Container#run(name:, count:) { |instance| ... } + def run(name:, count: 1) + @runs << {name:, count:} + inst = FakeInstance.new + @instances << inst + # Execute the service logic within the reactor, but keep control to stop it. + t = Async do + yield inst + end + @tasks << t + t + end + end + + # Ensure ENV is restored after each example. + def with_env(updated) + orig = ENV.to_h + ENV.replace(orig.merge(updated)) + yield + ensure + ENV.replace(orig) + end + + it "logs boot failure via ASYNC_JOB_BOOT and continues (rescue)" do + Dir.mktmpdir("svc-spec-boot") do |root| + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test", "ASYNC_JOB_BOOT" => "no/such/file", "ASYNC_JOB_SKIP_RECURRING" => "true") do + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + # Confirm warning about boot failure was logged. + expect_console.to have_logged(message: be(:include?, "Failed to boot application")) + # Skip branch should mark instance ready. + expect(container.instances.first.ready_calls).to be == 1 + # Clean up any running tasks. + container.tasks.each(&:stop) + end + end + end + + it "skips scheduler when ASYNC_JOB_SKIP_RECURRING=true" do + Dir.mktmpdir("svc-spec-skip") do |root| + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test", "ASYNC_JOB_SKIP_RECURRING" => "true") do + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + expect_console.to have_logged(message: be(:include?, "Recurring scheduler disabled via env.")) + expect(container.instances.first.ready_calls).to be == 1 + container.tasks.each(&:stop) + end + end + end + + it "skip branch completes immediately when sleep is stubbed" do + Dir.mktmpdir("svc-spec-skip-fast") do |root| + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test", "ASYNC_JOB_SKIP_RECURRING" => "true") do + # Temporarily stub Kernel.sleep to return immediately in this example: + begin + Kernel.singleton_class.alias_method(:__orig_sleep, :sleep) + Kernel.define_singleton_method(:sleep) {|*| 0} + + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + expect(container.instances.first.ready_calls).to be == 1 + ensure + # Restore original sleep: + Kernel.singleton_class.alias_method(:sleep, :__orig_sleep) + Kernel.singleton_class.remove_method(:__orig_sleep) rescue nil + container&.tasks&.each(&:stop) + end + end + end + end + + it "auto-detects and requires config/environment.rb when present" do + Dir.mktmpdir("svc-spec-autoboot") do |root| + # Create a minimal boot file that does nothing when required: + FileUtils.mkdir_p(File.join(root, "config")) + File.write(File.join(root, "config/environment.rb"), "# noop boot\n") + + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test", "ASYNC_JOB_SKIP_RECURRING" => "true") do + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + # Should have skipped, but after attempting to require the boot file (no warning expected): + expect(container.instances.first.ready_calls).to be == 1 + container.tasks.each(&:stop) + end + end + end + + it "logs 'No recurring tasks loaded' when schedule file is missing" do + Dir.mktmpdir("svc-spec-missing") do |root| + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test") do + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + expect_console.to have_logged(message: be(:include?, "No recurring tasks loaded (missing file).")) + expect(container.instances.first.ready_calls).to be == 1 + container.tasks.each(&:stop) + end + end + end + + it "logs 'No valid recurring tasks' when schedule present but invalid" do + Dir.mktmpdir("svc-spec-invalid") do |root| + # Write a schedule file with an invalid cron entry for env=test + path = File.join(root, "config/recurring.yml") + FileUtils.mkdir_p(File.dirname(path)) + File.write(path, <<~YAML) + test: + bad_task: + class: 'ActiveJob::Base' + schedule: 'not a valid cron' + YAML + + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test") do + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + expect_console.to have_logged(message: be(:include?, "No valid recurring tasks after parsing schedule.")) + expect(container.instances.first.ready_calls).to be == 1 + container.tasks.each(&:stop) + end + end + end + + it "starts scheduler for valid tasks and logs task banner" do + Dir.mktmpdir("svc-spec-ok") do |root| + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test") do + # Stub loader to return one valid task without hitting filesystem. + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "example", klass: nil, command: nil, queue: "high", priority: 5, args: nil, cron: cron) + + backend = Async::Job::Adapter::ActiveJob::Recurring + loader_orig = loader_mod.method(:load) + scheduler_orig = backend.const_get(:Scheduler) + + begin + loader_mod.define_singleton_method(:load) {|root:, env:| [task]} + # Replace Scheduler with a stub that returns immediately. + stub_scheduler = Class.new do + DEFAULT_PREFIX = "test-prefix" + def initialize(*); end + def run; end + end + backend.send(:remove_const, :Scheduler) + backend.const_set(:Scheduler, stub_scheduler) + # Ensure DEFAULT_PREFIX exists on the installed constant path: + backend::Scheduler.const_set(:DEFAULT_PREFIX, "test-prefix") unless backend::Scheduler.const_defined?(:DEFAULT_PREFIX) + + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + expect_console.to have_logged(message: be(:include?, "Starting recurring scheduler.")) + # Per-task banner log: + expect_console.to have_logged(message: be == "[scheduler] task") + expect(container.instances.first.ready_calls).to be == 1 + ensure + # Restore stubs + backend.send(:remove_const, :Scheduler) + backend.const_set(:Scheduler, scheduler_orig) + backend.define_singleton_method(:load, loader_orig) rescue nil + # Stop any tasks created by container + container&.tasks&.each(&:stop) + end + end + end + end + + it "logs warning when reconciliation fails" do + Dir.mktmpdir("svc-spec-reconcile-fail") do |root| + with_env("RAILS_ROOT" => root, "ASYNC_JOB_ENV" => "test") do + # Stub loader to return one valid task + cron = Fugit.parse("*/1 * * * * *") + task = task_struct.new(key: "example", klass: nil, command: nil, queue: "high", priority: 5, args: nil, cron: cron) + + backend = Async::Job::Adapter::ActiveJob::Recurring + loader_orig = loader_mod.method(:load) + reconciler_orig = backend.const_get(:Reconciler).method(:reconcile) + scheduler_class = backend.const_get(:Scheduler) + + begin + loader_mod.define_singleton_method(:load) {|root:, env:| [task]} + # Stub Reconciler.reconcile to raise an error + backend::Reconciler.define_singleton_method(:reconcile) {|*args| raise "Reconcile failed!"} + # Stub Scheduler to return immediately without removing the class + scheduler_class.define_method(:run) {} + + container = FakeContainer.new + env = FakeEnvironment.new(evaluator: FakeEvaluator.new(name: "scheduler", health_check_timeout: nil)) + service_class.new(env).setup(container) + + # Should log warning about reconciliation failure + expect_console.to have_logged(message: be(:include?, "Failed to reconcile recurring tasks.")) + # Should continue and start scheduler anyway + expect_console.to have_logged(message: be(:include?, "Starting recurring scheduler.")) + expect(container.instances.first.ready_calls).to be == 1 + ensure + # Restore stubs + backend::Reconciler.define_singleton_method(:reconcile, reconciler_orig) + loader_mod.define_singleton_method(:load, loader_orig) + # Stop any tasks created by container + container&.tasks&.each(&:stop) + end + end + end + end +end diff --git a/test/async/job/adapter/active_job/recurring/task.rb b/test/async/job/adapter/active_job/recurring/task.rb new file mode 100644 index 0000000..5f4b946 --- /dev/null +++ b/test/async/job/adapter/active_job/recurring/task.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +# Released under the MIT License. + +require "async/job/adapter/active_job/recurring/task" + +describe Async::Job::Adapter::ActiveJob::Recurring::Helper do + let(:helper) {Async::Job::Adapter::ActiveJob::Recurring::Helper} + + it "normalizes 'every N seconds'" do + result = helper.normalize_schedule("every 5 seconds") + expect(result).to be == "*/5 * * * * *" + end + + it "normalizes 'every N minutes'" do + result = helper.normalize_schedule("every 10 minutes") + expect(result).to be == "0 */10 * * * *" + end + + it "normalizes 'every N hours'" do + result = helper.normalize_schedule("every 3 hours") + expect(result).to be == "0 0 */3 * * *" + end + + it "returns empty string for empty input" do + result = helper.normalize_schedule("") + expect(result).to be == "" + end + + it "passes through non-matching schedule as-is" do + cron_str = "0 0 * * *" + result = helper.normalize_schedule(cron_str) + expect(result).to be == cron_str + end + + it "parses valid cron string" do + cron = helper.parse_cron("0 0 * * *") + expect(cron).not.to be == nil + expect(cron).to be_a(Fugit::Cron) + end + + it "returns nil for invalid cron string" do + cron = helper.parse_cron("not a valid cron") + expect(cron).to be == nil + end + + it "constantizes valid constant name" do + klass = helper.constantize("ActiveJob::Base") + expect(klass).to be == ActiveJob::Base + end + + it "returns nil for non-existent constant" do + klass = helper.constantize("NonExistentClass") + expect(klass).to be == nil + end +end