+++ title = "Workers" description = "" date = 2021-05-01T18:10:00+00:00 updated = 2021-05-01T18:10:00+00:00 draft = false weight = 1 sort_by = "weight" template = "docs/page.html"
[extra] lead = "" toc = true top = false flair =[] +++
Loco provides the following options for background jobs:
- Redis backed
- Postgres backed
- SQLite backed
- Tokio-async based (same-process, evented thread based background jobs)
You enqueue and perform jobs without knowledge of the actual background queue implementation, similar to Rails' ActiveJob, so you can switch with a simple change of configuration and no code change.
When you generated a new app, you might have selected the default async configuration for workers. This means workers spin off jobs in Tokio's async pool, which gives you proper background processes in the same running server.
You might want to configure jobs to run in a separate process backed by a queue, in order to distribute the load across servers.
First, switch to BackgroundQueue:
# Worker Configuration
workers:
# specifies the worker mode. Options:
# - BackgroundQueue - Workers operate asynchronously in the background, processing queued.
# - ForegroundBlocking - Workers operate in the foreground and block until tasks are completed.
# - BackgroundAsync - Workers operate asynchronously in the background, processing tasks with async capabilities.
mode: BackgroundQueueThen, configure a Redis based queue backend:
queue:
kind: Redis
# Redis connection URI.
uri: "{{ get_env(name="REDIS_URL", default="redis://127.0.0.1") }}"
# Dangerously flush all data.
dangerously_flush: false
# represents the number of tasks a worker can handle simultaneously.
num_workers: 2Or a Postgres based queue backend:
queue:
kind: Postgres
# Postgres Queue connection URI.
uri: "{{ get_env(name="PGQ_URL", default="postgres://localhost:5432/mydb") }}"
# Dangerously flush all data.
dangerously_flush: false
# represents the number of tasks a worker can handle simultaneously.
num_workers: 2Or a SQLite based queue backend:
queue:
kind: Sqlite
# SQLite Queue connection URI.
uri: "{{ get_env(name="SQLTQ_URL", default="sqlite://loco_development.sqlite?
mode=rwc") }}"
# Dangerously flush all data.
dangerously_flush: false
# represents the number of tasks a worker can handle simultaneously.
num_workers: 2You can run in two ways, depending on which setting you chose for background workers:
Usage: demo_app start [OPTIONS]
Options:
-w, --worker [<WORKER>...] Start worker. Optionally provide tags to run specific jobs (e.g. --worker=tag1,tag2)
-s, --server-and-worker start same-process server and worker
Choose --worker when you configured a real Redis queue and you want a process for doing just background jobs. You can use a single process per server. In this case, you can run your main Web or API server using just cargo loco start.
$ cargo loco start --worker # starts a standalone worker job executing process
$ cargo loco start # starts a standalone API service or Web server, no workersChoose -s when you configured async background workers, and jobs will execute as part of the current running server process.
For example, running --server-and-worker:
$ cargo loco start --server-and-worker # both API service and workers will executeLoco supports tag-based job filtering, allowing you to create specialized workers that only process specific types of jobs. This is particularly useful for distributing workloads or creating dedicated workers for resource-intensive tasks.
When starting a worker, you can specify which tags it should process:
# Start a worker that only processes jobs with no tags
$ cargo loco start --worker
# Start a worker that only processes jobs with the "email" tag
$ cargo loco start --worker email
# Start a worker that processes jobs with either "report" or "analytics" tags
$ cargo loco start --worker report,analyticsImportant notes about tag-based processing:
- Workers with no tags (
cargo loco start --worker) will only process jobs that have no tags - Workers with tags will only process jobs that have at least one matching tag
- The
--alland--server-and-workermodes don't support filtering by tags and will only process untagged jobs - Tags are case-sensitive
To use a worker, we mainly think about adding a job to the queue, so you use the worker and perform later:
// .. in your controller ..
let job_id = DownloadWorker::perform_later(
&ctx,
DownloadWorkerArgs {
user_guid: "foo".to_string(),
},
)
.await?;
// The job ID can be used for tracking job status
if let Some(id) = job_id {
println!("Job queued with ID: {}", id);
// You can store this ID to check job status later
}Unlike Rails and Ruby, with Rust you can enjoy strongly typed job arguments which gets serialized and pushed into the queue.
When enqueueing a job, you can optionally assign tags to it. The job will then only be processed by workers that match at least one of its tags:
// To create a job with a tag, define the tags in your worker:
struct DownloadWorker;
#[async_trait]
impl BackgroundWorker<DownloadWorkerArgs> for DownloadWorker {
// Define tags for this worker
fn tags() -> Vec<String> {
vec!["download".to_string(), "network".to_string()]
}
// ... other implementation details
}
// When you call perform_later, the job will automatically be tagged
DownloadWorker::perform_later(&ctx, args).await?;See How to have global state, but generally you use a single shared state by using something like lazy_static and then simply refer to it from the worker.
If this state can be serializable, strongly prefer to pass it through the WorkerArgs.
Adding a worker meaning coding the background job logic to take the arguments and perform a job. We also need to let loco know about it and register it into the global job processor.
Add a worker to workers/:
#[async_trait]
impl BackgroundWorker<DownloadWorkerArgs> for DownloadWorker {
fn build(ctx: &AppContext) -> Self {
Self { ctx: ctx.clone() }
}
// Optional: Define tags for this worker
fn tags() -> Vec<String> {
vec!["download".to_string()]
}
async fn perform(&self, args: DownloadWorkerArgs) -> Result<()> {
println!("================================================");
println!("Sending payment report to user {}", args.user_guid);
// TODO: Some actual work goes here...
println!("================================================");
Ok(())
}
}And register it in app.rs:
#[async_trait]
impl Hooks for App {
//..
async fn connect_workers(ctx: &AppContext, queue: &Queue) -> Result<()> {
queue.register(DownloadWorker::build(ctx)).await?;
Ok(())
}
// ..
}The BackgroundWorker trait is the core interface for defining background workers in Loco. It provides several methods:
build(ctx: &AppContext) -> Self: Creates a new instance of the worker with the provided application context.perform(&self, args: A) -> Result<()>: The main method that executes the job's logic with the provided arguments.queue() -> Option<String>: Optional method to specify a custom queue for the worker (returnsNoneby default).tags() -> Vec<String>: Optional method to specify tags for this worker (returns an empty vector by default).class_name() -> String: Returns the worker's class name (automatically derived from the struct name).perform_later(ctx: &AppContext, args: A) -> Result<Option<String>>: Static method to enqueue a job to be performed later. ReturnsSome(job_id)when using background queue mode with a provider,Noneotherwise.
To automatically add a worker using loco generate, execute the following command:
cargo loco generate worker report_workerThe worker generator creates a worker file associated with your app and generates a test template file, enabling you to verify your worker.
In your config/<environment>.yaml you can specify the worker mode. BackgroundAsync and BackgroundQueue will process jobs in a non-blocking manner, while ForegroundBlocking will process jobs in a blocking manner.
The main difference between BackgroundAsync and BackgroundQueue is that the latter will use Redis/Postgres/SQLite to store the jobs, while the former does not require Redis/Postgres/SQLite and will use async in memory within the same process.
# Worker Configuration
workers:
# specifies the worker mode. Options:
# - BackgroundQueue - Workers operate asynchronously in the background, processing queued.
# - ForegroundBlocking - Workers operate in the foreground and block until tasks are completed.
# - BackgroundAsync - Workers operate asynchronously in the background, processing tasks with async capabilities.
mode: BackgroundQueueYou can manage the jobs queue with the Loco admin job project.

The job queue management feature provides a powerful and flexible way to handle the lifecycle of jobs in your application. It allows you to cancel, clean up, remove outdated jobs, export job details, and import jobs, ensuring efficient and organized job processing.
- Cancel Jobs
Provides the ability to cancel specific jobs by name, updating their status tocancelled. This is useful for stopping jobs that are no longer needed, relevant, or if you want to prevent them from being processed when a bug is detected. - Clean Up Jobs
Enables the removal of jobs that have already been completed or cancelled. This helps maintain a clean and efficient job queue by eliminating unnecessary entries. - Purge Outdated Jobs
Allows you to delete jobs based on their age, measured in days. This is particularly useful for maintaining a lean job queue by removing older, irrelevant jobs.
Note: You can use the--dumpoption to export job details to a file, manually modify the job parameters in the exported file, and then use theimportfeature to reintroduce the updated jobs into the system. - Export Job Details
Supports exporting the details of all jobs to a specified location in file format. This feature is valuable for backups, audits, or further analysis. - Import Jobs
Facilitates importing jobs from external files, making it easy to restore or add new jobs to the system. This ensures seamless integration of external job data into your application's workflow.
To access the job management commands, use the following CLI structure:
Managing jobs queue
Usage: demo_app-cli jobs [OPTIONS] <COMMAND>
Commands:
cancel Cancels jobs with the specified names, setting their status to `cancelled`
tidy Deletes jobs that are either completed or cancelled
purge Deletes jobs based on their age in days
dump Saves the details of all jobs to files in the specified folder
import Imports jobs from a file
help Print this message or the help of the given subcommand(s)
Options:
-e, --environment <ENVIRONMENT> Specify the environment [default: development]
-h, --help Print help
-V, --version Print versionYou can easily test your worker background jobs using Loco. Ensure that your worker is set to the ForegroundBlocking mode, which blocks the job, ensuring it runs synchronously. When testing the worker, the test will wait until your worker is completed, allowing you to verify if the worker accomplished its intended tasks.
It's recommended to implement tests in the tests/workers directory to consolidate all your worker tests in one place.
Additionally, you can leverage the worker generator, which automatically creates tests, saving you time on configuring tests in the library.
Here's an example of how the test should be structured:
use loco_rs::testing::prelude::*;
#[tokio::test]
#[serial]
async fn test_run_report_worker_worker() {
// Set up the test environment
let boot = boot_test::<App, Migrator>().await.unwrap();
// Execute the worker in 'ForegroundBlocking' mode, preventing it from running asynchronously
assert!(
ReportWorkerWorker::perform_later(&boot.app_context, ReportWorkerWorkerArgs {})
.await
.is_ok()
);
// Include additional assert validations after the execution of the worker
}The class_name() function in the BackgroundWorker trait is used to determine the unique identifier for your worker in the job queue. By default, it:
- Takes the struct name (e.g.,
DownloadWorker) - Strips any module paths (e.g.,
my_app::workers::DownloadWorkerbecomes justDownloadWorker) - Converts it to UpperCamelCase format
This is important because when a job is enqueued, it needs a string identifier to match with the appropriate worker when it's time for processing. This function automatically generates that identifier for you, but you can override it if you need a custom naming scheme.
// Example of how class_name works:
struct download_worker;
impl BackgroundWorker<Args> for download_worker {
// class_name() would return "DownloadWorker"
// No need to override this unless you need custom naming
}And register it in app.rs:
#[async_trait]
impl Hooks for App {
//..
async fn connect_workers(ctx: &AppContext, queue: &Queue) -> Result<()> {
queue.register(DownloadWorker::build(ctx)).await?;
Ok(())
}
// ..
}