Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,27 @@
- Refactor users model to reuse `find_by_api_key` in `Authenticable` ([#1706](https://github.com/loco-rs/loco/pull/1706))
- Split error detail generic parameters ([#1709](https://github.com/loco-rs/loco/pull/1709))
- Update `loco-new` for new Rhai version ([#1704](https://github.com/loco-rs/loco/pull/1704))
- Add priority support to SQLite and Redis background workers ([#1693](https://github.com/loco-rs/loco/pull/1693))
- SQLite: Priority column automatically migrated on startup
- Redis: **BREAKING CHANGE** - Migrated from Lists to Sorted Sets (ZSET) for priority support
- Redis upgrade action required: drain Redis queues before upgrading, or accept queued job loss
- Priority API: Use `perform_later_with_priority(ctx, args, Some(priority))` or pass priority to `queue.enqueue()`
- Higher numbers = higher priority (consistent across all backends)

### Breaking Changes

**Redis Background Worker Queue Migration**

PR: [#1693](https://github.com/loco-rs/loco/pull/1693)

The Redis queue implementation has been migrated from Lists to Sorted Sets (ZSET) to support job priorities. Jobs stored in the old List format are not compatible with the new ZSET format. See the [upgrade guide](docs-site/content/docs/extras/upgrades.md) for migration instructions.

Upgrade instructions:
- Option 1 (recommended): stop enqueueing new jobs, let workers drain all existing Redis queue jobs, then upgrade and restart workers.
- Option 2: upgrade immediately and accept that Redis jobs already queued in List format will not be processed after upgrade.

**View Engine Initializer**

In file `src/initializers/view_engine.rs`, modify the code lines in `after_routes`:

Before
Expand Down
50 changes: 50 additions & 0 deletions docs-site/content/docs/extras/upgrades.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,56 @@ These are the major ones:
- [SeaORM](https://www.sea-ql.org/SeaORM), [CHANGELOG](https://github.com/SeaQL/sea-orm/blob/master/CHANGELOG.md)
- [Axum](https://github.com/tokio-rs/axum), [CHANGELOG](https://github.com/tokio-rs/axum/blob/main/axum/CHANGELOG.md)

## Upgrade to v0.16.5(??) (Unreleased)

### Redis Background Worker Queue Migration

PR: [#1693](https://github.com/loco-rs/loco/pull/1693)

**This is a breaking change for Redis background worker queues.**

The Redis queue implementation has been migrated from Lists to Sorted Sets (ZSET) to support job priorities. Jobs stored in the old List format are not compatible with the new ZSET format.

#### Migration Strategy

**Option 1: Drain queues before upgrade (Recommended)**
1. Stop enqueueing new jobs
2. Let all workers process existing jobs until queues are empty
3. Upgrade to the new version
4. Resume normal operation

**Option 2: Manual migration**
If you have long-running or scheduled jobs that can't be drained:

```bash
# Before upgrading, export jobs from Redis
redis-cli --scan --pattern "queue:*" | while read key; do
redis-cli LRANGE "$key" 0 -1 >> jobs_backup.txt
done

# After upgrading, you'll need to re-enqueue jobs via your application
# The job data is still stored in "job:*" keys
```

**Option 3: Accept job loss**

If losing queued jobs is acceptable for your use case, simply upgrade. Existing queued jobs will remain in List format but won't be processed. They can be manually cleaned up with:

```bash
redis-cli --scan --pattern "queue:*" | xargs redis-cli DEL
```

#### What Changed

- Queue storage: `LPUSH`/`LPOP` → `ZADD`/`ZPOPMIN`
- Priority support: Jobs now dequeue based on priority (higher = first)
- Performance: Minimal impact (O(1) → O(log N), still sub-millisecond)
- Memory: ~20-30% increase due to score storage

#### SQLite and Postgres Background Workers

SQLite and Postgres workers also gained priority support but with **no breaking changes**. The priority column is automatically added to existing databases on startup.

## Upgrade from 0.15.x to 0.16.x

### Use `AppContext` instead of `Config` in `init_logger` in the `Hooks` trait
Expand Down
7 changes: 6 additions & 1 deletion docs-site/content/docs/processing/mailers.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mailer:
password: "your-sendgrid-api-key"
```

### Default Email Address
### Default Email Address and Queue Priority

Other than specifying email addresses for every email sending task, you can override a default email address per-mailer.

Expand All @@ -91,12 +91,17 @@ impl Mailer for AuthMailer {
fn opts() -> MailerOpts {
MailerOpts {
from: // set your from email,
priority: 100, // default is 100
..Default::default()
}
}
}
```

The `priority` value controls the queue priority used by `MailerWorker`.
By default, mailers enqueue jobs with priority `100`, and you can override it per mailer via `MailerOpts`.
Higher priority values are processed sooner (for example, priority `100` is processed before priority `10`).

### Using a mail catcher in development

You can use an app like `MailHog` or `mailtutan` (written in Rust):
Expand Down
15 changes: 15 additions & 0 deletions docs-site/content/docs/processing/workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ To use a worker, we mainly think about adding a job to the queue, so you `use` t

Unlike Rails and Ruby, with Rust you can enjoy _strongly typed_ job arguments which gets serialized and pushed into the queue.

### Priority Semantics

For queue-backed workers (`Redis`, `Postgres`, and `SQLite`), priorities follow the same rules:

1. Higher number means more urgent.
2. Valid priority range is full `i32` (`-2_147_483_648..=2_147_483_647`).
3. If priorities are equal, jobs with earlier `run_at` are processed first.
4. If both priority and `run_at` are equal, ordering is deterministic using job id.

You can enqueue with an explicit priority using:

```rust
DownloadWorker::perform_later_with_priority(&ctx, args, Some(42)).await?;
```

### Assigning Tags to Jobs

When enqueueing a job, you can optionally assign tags to it. The job will then only be processed by workers that match at least one of its tags:
Expand Down
40 changes: 34 additions & 6 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,25 @@ impl Queue {
/// # Errors
///
/// This function will return an error if fails
///
/// Priority semantics for all queue backends:
/// - Higher value means higher urgency.
/// - Valid range is full `i32` (`-2_147_483_648..=2_147_483_647`).
/// - Ties are resolved by earlier `run_at`, then by stable job id ordering.
#[allow(unused_variables)]
pub async fn enqueue<A: Serialize + Send + Sync>(
&self,
class: String,
queue: Option<String>,
args: A,
tags: Option<Vec<String>>,
priority: Option<i32>,
) -> Result<()> {
tracing::debug!(worker = class, queue = ?queue, tags = ?tags, "Enqueuing background job");
match self {
#[cfg(feature = "bg_redis")]
Self::Redis(pool, _, _, _) => {
redis::enqueue(pool, class, queue, args, tags).await?;
redis::enqueue(pool, class, queue, args, tags, priority).await?;
}
#[cfg(feature = "bg_pg")]
Self::Postgres(pool, _, _, _) => {
Expand All @@ -117,6 +123,7 @@ impl Queue {
chrono::Utc::now(),
None,
tags,
priority,
)
.await
.map_err(Box::from)?;
Expand All @@ -130,6 +137,7 @@ impl Queue {
chrono::Utc::now(),
None,
tags,
priority,
)
.await
.map_err(Box::from)?;
Expand Down Expand Up @@ -553,7 +561,8 @@ impl Queue {
Self::Postgres(_, _, _, _) => {
let jobs: Vec<pg::Job> = serde_yaml::from_reader(File::open(path)?)?;
for job in jobs {
self.enqueue(job.name.clone(), None, job.data, None).await?;
self.enqueue(job.name.clone(), None, job.data, None, Some(job.priority))
.await?;
}

Ok(())
Expand All @@ -562,15 +571,17 @@ impl Queue {
Self::Sqlite(_, _, _, _) => {
let jobs: Vec<sqlt::Job> = serde_yaml::from_reader(File::open(path)?)?;
for job in jobs {
self.enqueue(job.name.clone(), None, job.data, None).await?;
self.enqueue(job.name.clone(), None, job.data, None, Some(job.priority))
.await?;
}
Ok(())
}
#[cfg(feature = "bg_redis")]
Self::Redis(_, _, _, _) => {
let jobs: Vec<redis::Job> = serde_yaml::from_reader(File::open(path)?)?;
for job in jobs {
self.enqueue(job.name.clone(), None, job.data, None).await?;
self.enqueue(job.name.clone(), None, job.data, None, Some(job.priority))
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -613,6 +624,17 @@ pub trait BackgroundWorker<A: Send + Sync + serde::Serialize + 'static>: Send +
name.to_upper_camel_case()
}
async fn perform_later(ctx: &AppContext, args: A) -> crate::Result<()>
where
Self: Sized,
{
Self::perform_later_with_priority(ctx, args, None).await
}

async fn perform_later_with_priority(
ctx: &AppContext,
args: A,
priority: Option<i32>,
) -> crate::Result<()>
where
Self: Sized,
{
Expand All @@ -621,8 +643,14 @@ pub trait BackgroundWorker<A: Send + Sync + serde::Serialize + 'static>: Send +
if let Some(p) = &ctx.queue_provider {
let tags = Self::tags();
let tags_option = if tags.is_empty() { None } else { Some(tags) };
p.enqueue(Self::class_name(), Self::queue(), args, tags_option)
.await?;
p.enqueue(
Self::class_name(),
Self::queue(),
args,
tags_option,
priority,
)
.await?;
} else {
tracing::error!(
"perform_later: background queue is selected, but queue was not populated \
Expand Down
Loading
Loading