11use crate :: models:: ApiToken ;
22use crate :: schema:: api_tokens;
3- use crate :: tasks:: spawn_blocking;
4- use crate :: util:: diesel:: Conn ;
53use crate :: { email:: Email , models:: User , worker:: Environment , Emails } ;
64use chrono:: SecondsFormat ;
75use crates_io_worker:: BackgroundJob ;
86use diesel:: dsl:: now;
97use diesel:: prelude:: * ;
10- use diesel_async:: async_connection_wrapper :: AsyncConnectionWrapper ;
8+ use diesel_async:: { AsyncPgConnection , RunQueryDsl } ;
119use std:: sync:: Arc ;
1210
1311/// The threshold for the expiry notification.
@@ -27,24 +25,20 @@ impl BackgroundJob for SendTokenExpiryNotifications {
2725
2826 #[ instrument( skip( env) , err) ]
2927 async fn run ( & self , env : Self :: Context ) -> anyhow:: Result < ( ) > {
30- let conn = env. deadpool . get ( ) . await ?;
31- spawn_blocking ( move || {
32- let conn: & mut AsyncConnectionWrapper < _ > = & mut conn. into ( ) ;
28+ let mut conn = env. deadpool . get ( ) . await ?;
3329
34- // Check if the token is about to expire
35- // If the token is about to expire, trigger a notification.
36- check ( & env. emails , conn)
37- } )
38- . await
30+ // Check if the token is about to expire
31+ // If the token is about to expire, trigger a notification.
32+ check ( & env. emails , & mut conn) . await
3933 }
4034}
4135
4236/// Find tokens that are about to expire and send notifications to their owners.
43- fn check ( emails : & Emails , conn : & mut impl Conn ) -> anyhow:: Result < ( ) > {
37+ async fn check ( emails : & Emails , conn : & mut AsyncPgConnection ) -> anyhow:: Result < ( ) > {
4438 let before = chrono:: Utc :: now ( ) + EXPIRY_THRESHOLD ;
4539 info ! ( "Searching for tokens that will expire before {before}…" ) ;
4640
47- let expired_tokens = find_expiring_tokens ( conn, before) ?;
41+ let expired_tokens = find_expiring_tokens ( conn, before) . await ?;
4842 let num_tokens = expired_tokens. len ( ) ;
4943 if num_tokens == 0 {
5044 info ! ( "Found no tokens that will expire before {before}. Skipping expiry notifications." ) ;
@@ -59,7 +53,7 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
5953
6054 let mut success = 0 ;
6155 for token in & expired_tokens {
62- if let Err ( e) = handle_expiring_token ( conn, token, emails) {
56+ if let Err ( e) = handle_expiring_token ( conn, token, emails) . await {
6357 error ! ( ?e, "Failed to handle expiring token" ) ;
6458 } else {
6559 success += 1 ;
@@ -72,16 +66,16 @@ fn check(emails: &Emails, conn: &mut impl Conn) -> anyhow::Result<()> {
7266}
7367
7468/// Send an email to the user associated with the token.
75- fn handle_expiring_token (
76- conn : & mut impl Conn ,
69+ async fn handle_expiring_token (
70+ conn : & mut AsyncPgConnection ,
7771 token : & ApiToken ,
7872 emails : & Emails ,
7973) -> Result < ( ) , anyhow:: Error > {
8074 debug ! ( "Looking up user {} for token {}…" , token. user_id, token. id) ;
81- let user = User :: find ( conn, token. user_id ) ?;
75+ let user = User :: async_find ( conn, token. user_id ) . await ?;
8276
8377 debug ! ( "Looking up email address for user {}…" , user. id) ;
84- let recipient = user. email ( conn) ?;
78+ let recipient = user. async_email ( conn) . await ?;
8579 if let Some ( recipient) = recipient {
8680 debug ! ( "Sending expiry notification to {}…" , recipient) ;
8781 let email = ExpiryNotificationEmail {
@@ -90,7 +84,7 @@ fn handle_expiring_token(
9084 token_name : & token. name ,
9185 expiry_date : token. expired_at . unwrap ( ) . and_utc ( ) ,
9286 } ;
93- emails. send ( & recipient, email) ?;
87+ emails. async_send ( & recipient, email) . await ?;
9488 } else {
9589 info ! (
9690 "User {} has no email address set. Skipping expiry notification." ,
@@ -102,7 +96,8 @@ fn handle_expiring_token(
10296 debug ! ( "Marking token {} as notified…" , token. id) ;
10397 diesel:: update ( token)
10498 . set ( api_tokens:: expiry_notification_at. eq ( now. nullable ( ) ) )
105- . execute ( conn) ?;
99+ . execute ( conn)
100+ . await ?;
106101
107102 Ok ( ( ) )
108103}
@@ -112,8 +107,8 @@ fn handle_expiring_token(
112107/// also ignored.
113108///
114109/// This function returns at most `MAX_ROWS` tokens.
115- pub fn find_expiring_tokens (
116- conn : & mut impl Conn ,
110+ pub async fn find_expiring_tokens (
111+ conn : & mut AsyncPgConnection ,
117112 before : chrono:: DateTime < chrono:: Utc > ,
118113) -> QueryResult < Vec < ApiToken > > {
119114 api_tokens:: table
@@ -131,6 +126,7 @@ pub fn find_expiring_tokens(
131126 . order_by ( api_tokens:: expired_at. asc ( ) ) // The most urgent tokens first
132127 . limit ( MAX_ROWS )
133128 . get_results ( conn)
129+ . await
134130}
135131
136132#[ derive( Debug , Clone ) ]
@@ -171,24 +167,27 @@ The crates.io team"#,
171167mod tests {
172168 use super :: * ;
173169 use crate :: models:: NewUser ;
174- use crate :: {
175- models:: token:: ApiToken , schema:: api_tokens, test_util:: test_db_connection,
176- util:: token:: PlainToken ,
177- } ;
170+ use crate :: tasks:: spawn_blocking;
171+ use crate :: { models:: token:: ApiToken , schema:: api_tokens, util:: token:: PlainToken } ;
172+ use crates_io_test_db:: TestDatabase ;
178173 use diesel:: dsl:: IntervalDsl ;
174+ use diesel_async:: AsyncConnection ;
179175 use lettre:: Address ;
180176
181177 #[ tokio:: test]
182178 async fn test_expiry_notification ( ) -> anyhow:: Result < ( ) > {
183179 let emails = Emails :: new_in_memory ( ) ;
184- let ( _test_db, mut conn) = test_db_connection ( ) ;
180+
181+ let test_db = TestDatabase :: new ( ) ;
182+ let mut conn = AsyncPgConnection :: establish ( test_db. url ( ) ) . await ?;
185183
186184 // Set up a user and a token that is about to expire.
187- let ( user, mut conn) = spawn_blocking ( move || {
185+ let mut sync_conn = test_db. connect ( ) ;
186+ let user = spawn_blocking ( move || {
188187 let user = NewUser :: new ( 0 , "a" , None , None , "token" ) ;
189188 let emails = Emails :: new_in_memory ( ) ;
190- let user = user
. create_or_update ( Some ( "[email protected] " ) , & emails
, & mut conn ) ?
; 191- Ok :: < _ , anyhow:: Error > ( ( user, conn ) )
189+ let user = user
. create_or_update ( Some ( "[email protected] " ) , & emails
, & mut sync_conn ) ?
; 190+ Ok :: < _ , anyhow:: Error > ( user)
192191 } )
193192 . await ?;
194193
@@ -202,7 +201,8 @@ mod tests {
202201 api_tokens:: expired_at. eq ( now. nullable ( ) + ( EXPIRY_THRESHOLD . num_days ( ) - 1 ) . day ( ) ) ,
203202 ) )
204203 . returning ( ApiToken :: as_returning ( ) )
205- . get_result ( & mut conn) ?;
204+ . get_result ( & mut conn)
205+ . await ?;
206206
207207 // Insert a few tokens that are not set to expire.
208208 let not_expired_offset = EXPIRY_THRESHOLD . num_days ( ) + 1 ;
@@ -216,18 +216,12 @@ mod tests {
216216 api_tokens:: expired_at. eq ( now. nullable ( ) + not_expired_offset. day ( ) ) ,
217217 ) )
218218 . returning ( ApiToken :: as_returning ( ) )
219- . get_result ( & mut conn) ?;
219+ . get_result ( & mut conn)
220+ . await ?;
220221 }
221222
222223 // Check that the token is about to expire.
223- let mut conn = spawn_blocking ( {
224- let emails = emails. clone ( ) ;
225- move || {
226- check ( & emails, & mut conn) ?;
227- Ok :: < _ , anyhow:: Error > ( conn)
228- }
229- } )
230- . await ?;
224+ check ( & emails, & mut conn) . await ?;
231225
232226 // Check that an email was sent.
233227 let sent_mail = emails. mails_in_memory ( ) . await . unwrap ( ) ;
@@ -241,15 +235,17 @@ mod tests {
241235 . filter ( api_tokens:: id. eq ( token. id ) )
242236 . filter ( api_tokens:: expiry_notification_at. is_not_null ( ) )
243237 . select ( ApiToken :: as_select ( ) )
244- . first :: < ApiToken > ( & mut conn) ?;
238+ . first :: < ApiToken > ( & mut conn)
239+ . await ?;
245240 assert_eq ! ( updated_token. name, "test_token" . to_owned( ) ) ;
246241
247242 // Check that the token is not about to expire.
248243 let tokens = api_tokens:: table
249244 . filter ( api_tokens:: revoked. eq ( false ) )
250245 . filter ( api_tokens:: expiry_notification_at. is_null ( ) )
251246 . select ( ApiToken :: as_select ( ) )
252- . load :: < ApiToken > ( & mut conn) ?;
247+ . load :: < ApiToken > ( & mut conn)
248+ . await ?;
253249 assert_eq ! ( tokens. len( ) , 3 ) ;
254250
255251 // Insert a already expired token.
@@ -262,14 +258,11 @@ mod tests {
262258 api_tokens:: expired_at. eq ( now. nullable ( ) - 1 . day ( ) ) ,
263259 ) )
264260 . returning ( ApiToken :: as_returning ( ) )
265- . get_result ( & mut conn) ?;
261+ . get_result ( & mut conn)
262+ . await ?;
266263
267264 // Check that the token is not about to expire.
268- spawn_blocking ( {
269- let emails = emails. clone ( ) ;
270- move || check ( & emails, & mut conn)
271- } )
272- . await ?;
265+ check ( & emails, & mut conn) . await ?;
273266
274267 // Check that no email was sent.
275268 let sent_mail = emails. mails_in_memory ( ) . await . unwrap ( ) ;
0 commit comments