@@ -5,38 +5,41 @@ use crates_io_worker::{BackgroundJob, Runner};
55use diesel:: prelude:: * ;
66use diesel_async:: pooled_connection:: deadpool:: Pool ;
77use diesel_async:: pooled_connection:: AsyncDieselConnectionManager ;
8- use diesel_async:: AsyncPgConnection ;
8+ use diesel_async:: { AsyncConnection , AsyncPgConnection , RunQueryDsl } ;
99use insta:: assert_compact_json_snapshot;
1010use serde:: { Deserialize , Serialize } ;
1111use serde_json:: Value ;
1212use std:: sync:: atomic:: { AtomicU8 , Ordering } ;
1313use std:: sync:: Arc ;
1414use tokio:: sync:: Barrier ;
1515
16- fn all_jobs ( conn : & mut PgConnection ) -> Vec < ( String , Value ) > {
16+ async fn all_jobs ( conn : & mut AsyncPgConnection ) -> Vec < ( String , Value ) > {
1717 background_jobs:: table
1818 . select ( ( background_jobs:: job_type, background_jobs:: data) )
1919 . get_results ( conn)
20+ . await
2021 . unwrap ( )
2122}
2223
23- fn job_exists ( id : i64 , conn : & mut PgConnection ) -> bool {
24+ async fn job_exists ( id : i64 , conn : & mut AsyncPgConnection ) -> bool {
2425 background_jobs:: table
2526 . find ( id)
2627 . select ( background_jobs:: id)
2728 . get_result :: < i64 > ( conn)
29+ . await
2830 . optional ( )
2931 . unwrap ( )
3032 . is_some ( )
3133}
3234
33- fn job_is_locked ( id : i64 , conn : & mut PgConnection ) -> bool {
35+ async fn job_is_locked ( id : i64 , conn : & mut AsyncPgConnection ) -> bool {
3436 background_jobs:: table
3537 . find ( id)
3638 . select ( background_jobs:: id)
3739 . for_update ( )
3840 . skip_locked ( )
3941 . get_result :: < i64 > ( conn)
42+ . await
4043 . optional ( )
4144 . unwrap ( )
4245 . is_none ( )
@@ -73,22 +76,25 @@ async fn jobs_are_locked_when_fetched() {
7376
7477 let runner = runner ( test_database. url ( ) , test_context. clone ( ) ) . register_job_type :: < TestJob > ( ) ;
7578
76- let mut conn = test_database. connect ( ) ;
77- let job_id = TestJob . enqueue ( & mut conn) . unwrap ( ) . unwrap ( ) ;
79+ let mut conn = AsyncPgConnection :: establish ( test_database. url ( ) )
80+ . await
81+ . unwrap ( ) ;
82+
83+ let job_id = TestJob . async_enqueue ( & mut conn) . await . unwrap ( ) . unwrap ( ) ;
7884
79- assert ! ( job_exists( job_id, & mut conn) ) ;
80- assert ! ( !job_is_locked( job_id, & mut conn) ) ;
85+ assert ! ( job_exists( job_id, & mut conn) . await ) ;
86+ assert ! ( !job_is_locked( job_id, & mut conn) . await ) ;
8187
8288 let runner = runner. start ( ) ;
8389 test_context. job_started_barrier . wait ( ) . await ;
8490
85- assert ! ( job_exists( job_id, & mut conn) ) ;
86- assert ! ( job_is_locked( job_id, & mut conn) ) ;
91+ assert ! ( job_exists( job_id, & mut conn) . await ) ;
92+ assert ! ( job_is_locked( job_id, & mut conn) . await ) ;
8793
8894 test_context. assertions_finished_barrier . wait ( ) . await ;
8995 runner. wait_for_shutdown ( ) . await ;
9096
91- assert ! ( !job_exists( job_id, & mut conn) ) ;
97+ assert ! ( !job_exists( job_id, & mut conn) . await ) ;
9298}
9399
94100#[ tokio:: test( flavor = "multi_thread" ) ]
@@ -105,26 +111,30 @@ async fn jobs_are_deleted_when_successfully_run() {
105111 }
106112 }
107113
108- fn remaining_jobs ( conn : & mut PgConnection ) -> i64 {
114+ async fn remaining_jobs ( conn : & mut AsyncPgConnection ) -> i64 {
109115 background_jobs:: table
110116 . count ( )
111117 . get_result ( & mut * conn)
118+ . await
112119 . unwrap ( )
113120 }
114121
115122 let test_database = TestDatabase :: new ( ) ;
116123
117124 let runner = runner ( test_database. url ( ) , ( ) ) . register_job_type :: < TestJob > ( ) ;
118125
119- let mut conn = test_database. connect ( ) ;
120- assert_eq ! ( remaining_jobs( & mut conn) , 0 ) ;
126+ let mut conn = AsyncPgConnection :: establish ( test_database. url ( ) )
127+ . await
128+ . unwrap ( ) ;
129+
130+ assert_eq ! ( remaining_jobs( & mut conn) . await , 0 ) ;
121131
122- TestJob . enqueue ( & mut conn) . unwrap ( ) ;
123- assert_eq ! ( remaining_jobs( & mut conn) , 1 ) ;
132+ TestJob . async_enqueue ( & mut conn) . await . unwrap ( ) ;
133+ assert_eq ! ( remaining_jobs( & mut conn) . await , 1 ) ;
124134
125135 let runner = runner. start ( ) ;
126136 runner. wait_for_shutdown ( ) . await ;
127- assert_eq ! ( remaining_jobs( & mut conn) , 0 ) ;
137+ assert_eq ! ( remaining_jobs( & mut conn) . await , 0 ) ;
128138}
129139
130140#[ tokio:: test( flavor = "multi_thread" ) ]
@@ -155,8 +165,11 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
155165
156166 let runner = runner ( test_database. url ( ) , test_context. clone ( ) ) . register_job_type :: < TestJob > ( ) ;
157167
158- let mut conn = test_database. connect ( ) ;
159- TestJob . enqueue ( & mut conn) . unwrap ( ) ;
168+ let mut conn = AsyncPgConnection :: establish ( test_database. url ( ) )
169+ . await
170+ . unwrap ( ) ;
171+
172+ TestJob . async_enqueue ( & mut conn) . await . unwrap ( ) ;
160173
161174 let runner = runner. start ( ) ;
162175 test_context. job_started_barrier . wait ( ) . await ;
@@ -169,15 +182,17 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
169182 . select ( background_jobs:: id)
170183 . filter ( background_jobs:: retries. eq ( 0 ) )
171184 . for_update ( )
172- . load :: < i64 > ( & mut * conn)
185+ . load :: < i64 > ( & mut conn)
186+ . await
173187 . unwrap ( ) ;
174188 assert_eq ! ( available_jobs. len( ) , 0 ) ;
175189
176190 // Sanity check to make sure the job actually is there
177191 let total_jobs_including_failed = background_jobs:: table
178192 . select ( background_jobs:: id)
179193 . for_update ( )
180- . load :: < i64 > ( & mut * conn)
194+ . load :: < i64 > ( & mut conn)
195+ . await
181196 . unwrap ( ) ;
182197 assert_eq ! ( total_jobs_including_failed. len( ) , 1 ) ;
183198
@@ -202,9 +217,11 @@ async fn panicking_in_jobs_updates_retry_counter() {
202217
203218 let runner = runner ( test_database. url ( ) , ( ) ) . register_job_type :: < TestJob > ( ) ;
204219
205- let mut conn = test_database. connect ( ) ;
220+ let mut conn = AsyncPgConnection :: establish ( test_database. url ( ) )
221+ . await
222+ . unwrap ( ) ;
206223
207- let job_id = TestJob . enqueue ( & mut conn) . unwrap ( ) . unwrap ( ) ;
224+ let job_id = TestJob . async_enqueue ( & mut conn) . await . unwrap ( ) . unwrap ( ) ;
208225
209226 let runner = runner. start ( ) ;
210227 runner. wait_for_shutdown ( ) . await ;
@@ -213,7 +230,8 @@ async fn panicking_in_jobs_updates_retry_counter() {
213230 . find ( job_id)
214231 . select ( background_jobs:: retries)
215232 . for_update ( )
216- . first :: < i32 > ( & mut * conn)
233+ . first :: < i32 > ( & mut conn)
234+ . await
217235 . unwrap ( ) ;
218236 assert_eq ! ( tries, 1 ) ;
219237}
@@ -264,33 +282,35 @@ async fn jobs_can_be_deduplicated() {
264282
265283 let runner = runner ( test_database. url ( ) , test_context. clone ( ) ) . register_job_type :: < TestJob > ( ) ;
266284
267- let mut conn = test_database. connect ( ) ;
285+ let mut conn = AsyncPgConnection :: establish ( test_database. url ( ) )
286+ . await
287+ . unwrap ( ) ;
268288
269289 // Enqueue first job
270- assert_some ! ( TestJob :: new( "foo" ) . enqueue ( & mut conn) . unwrap( ) ) ;
271- assert_compact_json_snapshot ! ( all_jobs( & mut conn) , @r#"[["test", {"value": "foo"}]]"# ) ;
290+ assert_some ! ( TestJob :: new( "foo" ) . async_enqueue ( & mut conn) . await . unwrap( ) ) ;
291+ assert_compact_json_snapshot ! ( all_jobs( & mut conn) . await , @r#"[["test", {"value": "foo"}]]"# ) ;
272292
273293 // Try to enqueue the same job again, which should be deduplicated
274- assert_none ! ( TestJob :: new( "foo" ) . enqueue ( & mut conn) . unwrap( ) ) ;
275- assert_compact_json_snapshot ! ( all_jobs( & mut conn) , @r#"[["test", {"value": "foo"}]]"# ) ;
294+ assert_none ! ( TestJob :: new( "foo" ) . async_enqueue ( & mut conn) . await . unwrap( ) ) ;
295+ assert_compact_json_snapshot ! ( all_jobs( & mut conn) . await , @r#"[["test", {"value": "foo"}]]"# ) ;
276296
277297 // Start processing the first job
278298 let runner = runner. start ( ) ;
279299 test_context. job_started_barrier . wait ( ) . await ;
280300
281301 // Enqueue the same job again, which should NOT be deduplicated,
282302 // since the first job already still running
283- assert_some ! ( TestJob :: new( "foo" ) . enqueue ( & mut conn) . unwrap( ) ) ;
284- assert_compact_json_snapshot ! ( all_jobs( & mut conn) , @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"# ) ;
303+ assert_some ! ( TestJob :: new( "foo" ) . async_enqueue ( & mut conn) . await . unwrap( ) ) ;
304+ assert_compact_json_snapshot ! ( all_jobs( & mut conn) . await , @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"# ) ;
285305
286306 // Try to enqueue the same job again, which should be deduplicated again
287- assert_none ! ( TestJob :: new( "foo" ) . enqueue ( & mut conn) . unwrap( ) ) ;
288- assert_compact_json_snapshot ! ( all_jobs( & mut conn) , @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"# ) ;
307+ assert_none ! ( TestJob :: new( "foo" ) . async_enqueue ( & mut conn) . await . unwrap( ) ) ;
308+ assert_compact_json_snapshot ! ( all_jobs( & mut conn) . await , @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"# ) ;
289309
290310 // Enqueue the same job but with different data, which should
291311 // NOT be deduplicated
292- assert_some ! ( TestJob :: new( "bar" ) . enqueue ( & mut conn) . unwrap( ) ) ;
293- assert_compact_json_snapshot ! ( all_jobs( & mut conn) , @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"# ) ;
312+ assert_some ! ( TestJob :: new( "bar" ) . async_enqueue ( & mut conn) . await . unwrap( ) ) ;
313+ assert_compact_json_snapshot ! ( all_jobs( & mut conn) . await , @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"# ) ;
294314
295315 // Resolve the final barrier to finish the test
296316 test_context. assertions_finished_barrier . wait ( ) . await ;
0 commit comments