11use crate :: config:: CdnLogQueueConfig ;
22use crate :: sqs:: { MockSqsQueue , SqsQueue , SqsQueueImpl } ;
3- use crate :: tasks:: spawn_blocking;
4- use crate :: util:: diesel:: Conn ;
53use crate :: worker:: jobs:: ProcessCdnLog ;
64use crate :: worker:: Environment ;
75use anyhow:: Context ;
86use aws_credential_types:: Credentials ;
97use aws_sdk_sqs:: config:: Region ;
108use aws_sdk_sqs:: types:: Message ;
119use crates_io_worker:: BackgroundJob ;
12- use diesel_async:: async_connection_wrapper:: AsyncConnectionWrapper ;
1310use diesel_async:: pooled_connection:: deadpool:: Pool ;
1411use diesel_async:: AsyncPgConnection ;
1512use std:: sync:: Arc ;
@@ -156,12 +153,9 @@ async fn process_body(body: &str, connection_pool: &Pool<AsyncPgConnection>) ->
156153 }
157154
158155 let conn = connection_pool. get ( ) . await ;
159- let conn = conn. context ( "Failed to acquire database connection" ) ?;
160- spawn_blocking ( move || {
161- let conn: & mut AsyncConnectionWrapper < _ > = & mut conn. into ( ) ;
162- enqueue_jobs ( jobs, conn)
163- } )
164- . await
156+ let mut conn = conn. context ( "Failed to acquire database connection" ) ?;
157+
158+ enqueue_jobs ( jobs, & mut conn) . await
165159}
166160
167161/// Extracts a list of [`ProcessCdnLog`] jobs from a message.
@@ -207,12 +201,16 @@ fn is_ignored_path(path: &str) -> bool {
207201 path. contains ( "/index.staging.crates.io/" ) || path. contains ( "/index.crates.io/" )
208202}
209203
210- fn enqueue_jobs ( jobs : Vec < ProcessCdnLog > , conn : & mut impl Conn ) -> anyhow:: Result < ( ) > {
204+ async fn enqueue_jobs (
205+ jobs : Vec < ProcessCdnLog > ,
206+ conn : & mut AsyncPgConnection ,
207+ ) -> anyhow:: Result < ( ) > {
211208 for job in jobs {
212209 let path = & job. path ;
213210
214211 info ! ( "Enqueuing processing job… ({path})" ) ;
215- job. enqueue ( conn)
212+ job. async_enqueue ( conn)
213+ . await
216214 . context ( "Failed to enqueue processing job" ) ?;
217215
218216 debug ! ( "Enqueued processing job" ) ;
@@ -230,8 +228,8 @@ mod tests {
230228 use crates_io_test_db:: TestDatabase ;
231229 use crates_io_worker:: schema:: background_jobs;
232230 use diesel:: prelude:: * ;
233- use diesel:: QueryDsl ;
234231 use diesel_async:: pooled_connection:: AsyncDieselConnectionManager ;
232+ use diesel_async:: RunQueryDsl ;
235233 use insta:: assert_snapshot;
236234 use parking_lot:: Mutex ;
237235
@@ -262,7 +260,7 @@ mod tests {
262260 assert_ok ! ( run( & queue, 100 , & connection_pool) . await ) ;
263261
264262 assert_snapshot ! ( deleted_handles. lock( ) . join( "," ) , @"123" ) ;
265- assert_snapshot ! ( open_jobs( & mut test_database . connect ( ) ) , @"us-west-1 | bucket | path" ) ;
263+ assert_snapshot ! ( open_jobs( & mut connection_pool . get ( ) . await . unwrap ( ) ) . await , @"us-west-1 | bucket | path" ) ;
266264 }
267265
268266 #[ tokio:: test]
@@ -310,7 +308,7 @@ mod tests {
310308 assert_ok ! ( run( & queue, 100 , & connection_pool) . await ) ;
311309
312310 assert_snapshot ! ( deleted_handles. lock( ) . join( "," ) , @"1,2,3,4,5,6,7,8,9,10,11" ) ;
313- assert_snapshot ! ( open_jobs( & mut test_database . connect ( ) ) , @r###"
311+ assert_snapshot ! ( open_jobs( & mut connection_pool . get ( ) . await . unwrap ( ) ) . await , @r###"
314312 us-west-1 | bucket | path1
315313 us-west-1 | bucket | path2
316314 us-west-1 | bucket | path3
@@ -358,7 +356,7 @@ mod tests {
358356 assert_ok ! ( run( & queue, 100 , & connection_pool) . await ) ;
359357
360358 assert_snapshot ! ( deleted_handles. lock( ) . join( "," ) , @"1" ) ;
361- assert_snapshot ! ( open_jobs( & mut test_database . connect ( ) ) , @"" ) ;
359+ assert_snapshot ! ( open_jobs( & mut connection_pool . get ( ) . await . unwrap ( ) ) . await , @"" ) ;
362360 }
363361
364362 #[ test]
@@ -419,10 +417,11 @@ mod tests {
419417 . build ( )
420418 }
421419
422- fn open_jobs ( conn : & mut impl Conn ) -> String {
420+ async fn open_jobs ( conn : & mut AsyncPgConnection ) -> String {
423421 let jobs = background_jobs:: table
424422 . select ( ( background_jobs:: job_type, background_jobs:: data) )
425423 . load :: < ( String , serde_json:: Value ) > ( conn)
424+ . await
426425 . unwrap ( ) ;
427426
428427 jobs. into_iter ( )
0 commit comments