Skip to content

Commit fd14d7a

Browse files
authored
Merge pull request #9963 from Turbo87/async-expiry
worker/jobs/expiry_notification: Migrate to `diesel-async` queries
2 parents 2f4ed8a + fb7ef89 commit fd14d7a

File tree

2 files changed

+52
-48
lines changed

2 files changed

+52
-48
lines changed

src/models/user.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,17 @@ impl User {
113113
.first(conn)
114114
.optional()
115115
}
116+
117+
/// Queries for the email belonging to a particular user
118+
pub async fn async_email(&self, conn: &mut AsyncPgConnection) -> QueryResult<Option<String>> {
119+
use diesel_async::RunQueryDsl;
120+
121+
Email::belonging_to(self)
122+
.select(emails::email)
123+
.first(conn)
124+
.await
125+
.optional()
126+
}
116127
}
117128

118129
/// Represents a new user record insertable to the `users` table

src/worker/jobs/expiry_notification.rs

Lines changed: 41 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
use crate::models::ApiToken;
22
use crate::schema::api_tokens;
3-
use crate::tasks::spawn_blocking;
4-
use crate::util::diesel::Conn;
53
use crate::{email::Email, models::User, worker::Environment, Emails};
64
use chrono::SecondsFormat;
75
use crates_io_worker::BackgroundJob;
86
use diesel::dsl::now;
97
use diesel::prelude::*;
10-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
8+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
119
use std::sync::Arc;
1210

1311
/// The threshold for the expiry notification.
@@ -27,24 +25,20 @@ impl BackgroundJob for SendTokenExpiryNotifications {
2725

2826
#[instrument(skip(env), err)]
2927
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
30-
let conn = env.deadpool.get().await?;
31-
spawn_blocking(move || {
32-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
28+
let mut conn = env.deadpool.get().await?;
3329

34-
// Check if the token is about to expire
35-
// If the token is about to expire, trigger a notification.
36-
check(&env.emails, conn)
37-
})
38-
.await
30+
// Check if the token is about to expire
31+
// If the token is about to expire, trigger a notification.
32+
check(&env.emails, &mut conn).await
3933
}
4034
}
4135

4236
/// Find tokens that are about to expire and send notifications to their owners.
43-
fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
37+
async fn check(emails: &Emails, conn: &mut AsyncPgConnection) -> anyhow::Result<()> {
4438
let before = chrono::Utc::now() + EXPIRY_THRESHOLD;
4539
info!("Searching for tokens that will expire before {before}…");
4640

47-
let expired_tokens = find_expiring_tokens(conn, before)?;
41+
let expired_tokens = find_expiring_tokens(conn, before).await?;
4842
let num_tokens = expired_tokens.len();
4943
if num_tokens == 0 {
5044
info!("Found no tokens that will expire before {before}. Skipping expiry notifications.");
@@ -59,7 +53,7 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
5953

6054
let mut success = 0;
6155
for token in &expired_tokens {
62-
if let Err(e) = handle_expiring_token(conn, token, emails) {
56+
if let Err(e) = handle_expiring_token(conn, token, emails).await {
6357
error!(?e, "Failed to handle expiring token");
6458
} else {
6559
success += 1;
@@ -72,16 +66,16 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
7266
}
7367

7468
/// Send an email to the user associated with the token.
75-
fn handle_expiring_token(
76-
conn: &mut impl Conn,
69+
async fn handle_expiring_token(
70+
conn: &mut AsyncPgConnection,
7771
token: &ApiToken,
7872
emails: &Emails,
7973
) -> Result<(), anyhow::Error> {
8074
debug!("Looking up user {} for token {}…", token.user_id, token.id);
81-
let user = User::find(conn, token.user_id)?;
75+
let user = User::async_find(conn, token.user_id).await?;
8276

8377
debug!("Looking up email address for user {}…", user.id);
84-
let recipient = user.email(conn)?;
78+
let recipient = user.async_email(conn).await?;
8579
if let Some(recipient) = recipient {
8680
debug!("Sending expiry notification to {}…", recipient);
8781
let email = ExpiryNotificationEmail {
@@ -90,7 +84,7 @@ fn handle_expiring_token(
9084
token_name: &token.name,
9185
expiry_date: token.expired_at.unwrap().and_utc(),
9286
};
93-
emails.send(&recipient, email)?;
87+
emails.async_send(&recipient, email).await?;
9488
} else {
9589
info!(
9690
"User {} has no email address set. Skipping expiry notification.",
@@ -102,7 +96,8 @@ fn handle_expiring_token(
10296
debug!("Marking token {} as notified…", token.id);
10397
diesel::update(token)
10498
.set(api_tokens::expiry_notification_at.eq(now.nullable()))
105-
.execute(conn)?;
99+
.execute(conn)
100+
.await?;
106101

107102
Ok(())
108103
}
@@ -112,8 +107,8 @@ fn handle_expiring_token(
112107
/// also ignored.
113108
///
114109
/// This function returns at most `MAX_ROWS` tokens.
115-
pub fn find_expiring_tokens(
116-
conn: &mut impl Conn,
110+
pub async fn find_expiring_tokens(
111+
conn: &mut AsyncPgConnection,
117112
before: chrono::DateTime<chrono::Utc>,
118113
) -> QueryResult<Vec<ApiToken>> {
119114
api_tokens::table
@@ -131,6 +126,7 @@ pub fn find_expiring_tokens(
131126
.order_by(api_tokens::expired_at.asc()) // The most urgent tokens first
132127
.limit(MAX_ROWS)
133128
.get_results(conn)
129+
.await
134130
}
135131

136132
#[derive(Debug, Clone)]
@@ -171,24 +167,27 @@ The crates.io team"#,
171167
mod tests {
172168
use super::*;
173169
use crate::models::NewUser;
174-
use crate::{
175-
models::token::ApiToken, schema::api_tokens, test_util::test_db_connection,
176-
util::token::PlainToken,
177-
};
170+
use crate::tasks::spawn_blocking;
171+
use crate::{models::token::ApiToken, schema::api_tokens, util::token::PlainToken};
172+
use crates_io_test_db::TestDatabase;
178173
use diesel::dsl::IntervalDsl;
174+
use diesel_async::AsyncConnection;
179175
use lettre::Address;
180176

181177
#[tokio::test]
182178
async fn test_expiry_notification() -> anyhow::Result<()> {
183179
let emails = Emails::new_in_memory();
184-
let (_test_db, mut conn) = test_db_connection();
180+
181+
let test_db = TestDatabase::new();
182+
let mut conn = AsyncPgConnection::establish(test_db.url()).await?;
185183

186184
// Set up a user and a token that is about to expire.
187-
let (user, mut conn) = spawn_blocking(move || {
185+
let mut sync_conn = test_db.connect();
186+
let user = spawn_blocking(move || {
188187
let user = NewUser::new(0, "a", None, None, "token");
189188
let emails = Emails::new_in_memory();
190-
let user = user.create_or_update(Some("[email protected]"), &emails, &mut conn)?;
191-
Ok::<_, anyhow::Error>((user, conn))
189+
let user = user.create_or_update(Some("[email protected]"), &emails, &mut sync_conn)?;
190+
Ok::<_, anyhow::Error>(user)
192191
})
193192
.await?;
194193

@@ -202,7 +201,8 @@ mod tests {
202201
api_tokens::expired_at.eq(now.nullable() + (EXPIRY_THRESHOLD.num_days() - 1).day()),
203202
))
204203
.returning(ApiToken::as_returning())
205-
.get_result(&mut conn)?;
204+
.get_result(&mut conn)
205+
.await?;
206206

207207
// Insert a few tokens that are not set to expire.
208208
let not_expired_offset = EXPIRY_THRESHOLD.num_days() + 1;
@@ -216,18 +216,12 @@ mod tests {
216216
api_tokens::expired_at.eq(now.nullable() + not_expired_offset.day()),
217217
))
218218
.returning(ApiToken::as_returning())
219-
.get_result(&mut conn)?;
219+
.get_result(&mut conn)
220+
.await?;
220221
}
221222

222223
// Check that the token is about to expire.
223-
let mut conn = spawn_blocking({
224-
let emails = emails.clone();
225-
move || {
226-
check(&emails, &mut conn)?;
227-
Ok::<_, anyhow::Error>(conn)
228-
}
229-
})
230-
.await?;
224+
check(&emails, &mut conn).await?;
231225

232226
// Check that an email was sent.
233227
let sent_mail = emails.mails_in_memory().await.unwrap();
@@ -241,15 +235,17 @@ mod tests {
241235
.filter(api_tokens::id.eq(token.id))
242236
.filter(api_tokens::expiry_notification_at.is_not_null())
243237
.select(ApiToken::as_select())
244-
.first::<ApiToken>(&mut conn)?;
238+
.first::<ApiToken>(&mut conn)
239+
.await?;
245240
assert_eq!(updated_token.name, "test_token".to_owned());
246241

247242
// Check that the token is not about to expire.
248243
let tokens = api_tokens::table
249244
.filter(api_tokens::revoked.eq(false))
250245
.filter(api_tokens::expiry_notification_at.is_null())
251246
.select(ApiToken::as_select())
252-
.load::<ApiToken>(&mut conn)?;
247+
.load::<ApiToken>(&mut conn)
248+
.await?;
253249
assert_eq!(tokens.len(), 3);
254250

255251
// Insert a already expired token.
@@ -262,14 +258,11 @@ mod tests {
262258
api_tokens::expired_at.eq(now.nullable() - 1.day()),
263259
))
264260
.returning(ApiToken::as_returning())
265-
.get_result(&mut conn)?;
261+
.get_result(&mut conn)
262+
.await?;
266263

267264
// Check that the token is not about to expire.
268-
spawn_blocking({
269-
let emails = emails.clone();
270-
move || check(&emails, &mut conn)
271-
})
272-
.await?;
265+
check(&emails, &mut conn).await?;
273266

274267
// Check that no email was sent.
275268
let sent_mail = emails.mails_in_memory().await.unwrap();

0 commit comments

Comments
 (0)