Skip to content

Commit a305a92

Browse files
authored
Merge pull request #9822 from Turbo87/runner-tests
worker: Migrate tests suite to async database queries
2 parents 3fafc87 + e8c1b79 commit a305a92

File tree

2 files changed

+88
-71
lines changed

2 files changed

+88
-71
lines changed

crates/crates_io_worker/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ tracing = "=0.1.40"
2424
claims = "=0.7.1"
2525
crates_io_test_db = { path = "../crates_io_test_db" }
2626
insta = { version = "=1.41.1", features = ["json"] }
27-
tokio = { version = "=1.41.0", features = ["macros", "rt", "rt-multi-thread", "sync"]}
27+
tokio = { version = "=1.41.0", features = ["macros", "sync"]}

crates/crates_io_worker/tests/runner.rs

Lines changed: 87 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -5,45 +5,45 @@ use crates_io_worker::{BackgroundJob, Runner};
55
use diesel::prelude::*;
66
use diesel_async::pooled_connection::deadpool::Pool;
77
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
8-
use diesel_async::AsyncPgConnection;
8+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
99
use insta::assert_compact_json_snapshot;
1010
use serde::{Deserialize, Serialize};
1111
use serde_json::Value;
1212
use std::sync::atomic::{AtomicU8, Ordering};
1313
use std::sync::Arc;
1414
use tokio::sync::Barrier;
1515

16-
fn all_jobs(conn: &mut PgConnection) -> Vec<(String, Value)> {
16+
async fn all_jobs(conn: &mut AsyncPgConnection) -> QueryResult<Vec<(String, Value)>> {
1717
background_jobs::table
1818
.select((background_jobs::job_type, background_jobs::data))
1919
.get_results(conn)
20-
.unwrap()
20+
.await
2121
}
2222

23-
fn job_exists(id: i64, conn: &mut PgConnection) -> bool {
24-
background_jobs::table
23+
async fn job_exists(id: i64, conn: &mut AsyncPgConnection) -> QueryResult<bool> {
24+
Ok(background_jobs::table
2525
.find(id)
2626
.select(background_jobs::id)
2727
.get_result::<i64>(conn)
28-
.optional()
29-
.unwrap()
30-
.is_some()
28+
.await
29+
.optional()?
30+
.is_some())
3131
}
3232

33-
fn job_is_locked(id: i64, conn: &mut PgConnection) -> bool {
34-
background_jobs::table
33+
async fn job_is_locked(id: i64, conn: &mut AsyncPgConnection) -> QueryResult<bool> {
34+
Ok(background_jobs::table
3535
.find(id)
3636
.select(background_jobs::id)
3737
.for_update()
3838
.skip_locked()
3939
.get_result::<i64>(conn)
40-
.optional()
41-
.unwrap()
42-
.is_none()
40+
.await
41+
.optional()?
42+
.is_none())
4343
}
4444

45-
#[tokio::test(flavor = "multi_thread")]
46-
async fn jobs_are_locked_when_fetched() {
45+
#[tokio::test]
46+
async fn jobs_are_locked_when_fetched() -> anyhow::Result<()> {
4747
#[derive(Clone)]
4848
struct TestContext {
4949
job_started_barrier: Arc<Barrier>,
@@ -71,28 +71,32 @@ async fn jobs_are_locked_when_fetched() {
7171
assertions_finished_barrier: Arc::new(Barrier::new(2)),
7272
};
7373

74-
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
74+
let pool = pool(test_database.url())?;
75+
let mut conn = pool.get().await?;
7576

76-
let mut conn = test_database.connect();
77-
let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap();
77+
let runner = runner(pool, test_context.clone()).register_job_type::<TestJob>();
7878

79-
assert!(job_exists(job_id, &mut conn));
80-
assert!(!job_is_locked(job_id, &mut conn));
79+
let job_id = assert_some!(TestJob.async_enqueue(&mut conn).await?);
80+
81+
assert!(job_exists(job_id, &mut conn).await?);
82+
assert!(!job_is_locked(job_id, &mut conn).await?);
8183

8284
let runner = runner.start();
8385
test_context.job_started_barrier.wait().await;
8486

85-
assert!(job_exists(job_id, &mut conn));
86-
assert!(job_is_locked(job_id, &mut conn));
87+
assert!(job_exists(job_id, &mut conn).await?);
88+
assert!(job_is_locked(job_id, &mut conn).await?);
8789

8890
test_context.assertions_finished_barrier.wait().await;
8991
runner.wait_for_shutdown().await;
9092

91-
assert!(!job_exists(job_id, &mut conn));
93+
assert!(!job_exists(job_id, &mut conn).await?);
94+
95+
Ok(())
9296
}
9397

94-
#[tokio::test(flavor = "multi_thread")]
95-
async fn jobs_are_deleted_when_successfully_run() {
98+
#[tokio::test]
99+
async fn jobs_are_deleted_when_successfully_run() -> anyhow::Result<()> {
96100
#[derive(Serialize, Deserialize)]
97101
struct TestJob;
98102

@@ -105,30 +109,31 @@ async fn jobs_are_deleted_when_successfully_run() {
105109
}
106110
}
107111

108-
fn remaining_jobs(conn: &mut PgConnection) -> i64 {
109-
background_jobs::table
110-
.count()
111-
.get_result(&mut *conn)
112-
.unwrap()
112+
async fn remaining_jobs(conn: &mut AsyncPgConnection) -> QueryResult<i64> {
113+
background_jobs::table.count().get_result(conn).await
113114
}
114115

115116
let test_database = TestDatabase::new();
116117

117-
let runner = runner(test_database.url(), ()).register_job_type::<TestJob>();
118+
let pool = pool(test_database.url())?;
119+
let mut conn = pool.get().await?;
118120

119-
let mut conn = test_database.connect();
120-
assert_eq!(remaining_jobs(&mut conn), 0);
121+
let runner = runner(pool, ()).register_job_type::<TestJob>();
121122

122-
TestJob.enqueue(&mut conn).unwrap();
123-
assert_eq!(remaining_jobs(&mut conn), 1);
123+
assert_eq!(remaining_jobs(&mut conn).await?, 0);
124+
125+
TestJob.async_enqueue(&mut conn).await?;
126+
assert_eq!(remaining_jobs(&mut conn).await?, 1);
124127

125128
let runner = runner.start();
126129
runner.wait_for_shutdown().await;
127-
assert_eq!(remaining_jobs(&mut conn), 0);
130+
assert_eq!(remaining_jobs(&mut conn).await?, 0);
131+
132+
Ok(())
128133
}
129134

130-
#[tokio::test(flavor = "multi_thread")]
131-
async fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
135+
#[tokio::test]
136+
async fn failed_jobs_do_not_release_lock_before_updating_retry_time() -> anyhow::Result<()> {
132137
#[derive(Clone)]
133138
struct TestContext {
134139
job_started_barrier: Arc<Barrier>,
@@ -153,10 +158,12 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
153158
job_started_barrier: Arc::new(Barrier::new(2)),
154159
};
155160

156-
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
161+
let pool = pool(test_database.url())?;
162+
let mut conn = pool.get().await?;
157163

158-
let mut conn = test_database.connect();
159-
TestJob.enqueue(&mut conn).unwrap();
164+
let runner = runner(pool, test_context.clone()).register_job_type::<TestJob>();
165+
166+
TestJob.async_enqueue(&mut conn).await?;
160167

161168
let runner = runner.start();
162169
test_context.job_started_barrier.wait().await;
@@ -169,23 +176,25 @@ async fn failed_jobs_do_not_release_lock_before_updating_retry_time() {
169176
.select(background_jobs::id)
170177
.filter(background_jobs::retries.eq(0))
171178
.for_update()
172-
.load::<i64>(&mut *conn)
173-
.unwrap();
179+
.load::<i64>(&mut conn)
180+
.await?;
174181
assert_eq!(available_jobs.len(), 0);
175182

176183
// Sanity check to make sure the job actually is there
177184
let total_jobs_including_failed = background_jobs::table
178185
.select(background_jobs::id)
179186
.for_update()
180-
.load::<i64>(&mut *conn)
181-
.unwrap();
187+
.load::<i64>(&mut conn)
188+
.await?;
182189
assert_eq!(total_jobs_including_failed.len(), 1);
183190

184191
runner.wait_for_shutdown().await;
192+
193+
Ok(())
185194
}
186195

187-
#[tokio::test(flavor = "multi_thread")]
188-
async fn panicking_in_jobs_updates_retry_counter() {
196+
#[tokio::test]
197+
async fn panicking_in_jobs_updates_retry_counter() -> anyhow::Result<()> {
189198
#[derive(Serialize, Deserialize)]
190199
struct TestJob;
191200

@@ -200,11 +209,12 @@ async fn panicking_in_jobs_updates_retry_counter() {
200209

201210
let test_database = TestDatabase::new();
202211

203-
let runner = runner(test_database.url(), ()).register_job_type::<TestJob>();
212+
let pool = pool(test_database.url())?;
213+
let mut conn = pool.get().await?;
204214

205-
let mut conn = test_database.connect();
215+
let runner = runner(pool, ()).register_job_type::<TestJob>();
206216

207-
let job_id = TestJob.enqueue(&mut conn).unwrap().unwrap();
217+
let job_id = assert_some!(TestJob.async_enqueue(&mut conn).await?);
208218

209219
let runner = runner.start();
210220
runner.wait_for_shutdown().await;
@@ -213,13 +223,15 @@ async fn panicking_in_jobs_updates_retry_counter() {
213223
.find(job_id)
214224
.select(background_jobs::retries)
215225
.for_update()
216-
.first::<i32>(&mut *conn)
217-
.unwrap();
226+
.first::<i32>(&mut conn)
227+
.await?;
218228
assert_eq!(tries, 1);
229+
230+
Ok(())
219231
}
220232

221-
#[tokio::test(flavor = "multi_thread")]
222-
async fn jobs_can_be_deduplicated() {
233+
#[tokio::test]
234+
async fn jobs_can_be_deduplicated() -> anyhow::Result<()> {
223235
#[derive(Clone)]
224236
struct TestContext {
225237
runs: Arc<AtomicU8>,
@@ -262,48 +274,53 @@ async fn jobs_can_be_deduplicated() {
262274
assertions_finished_barrier: Arc::new(Barrier::new(2)),
263275
};
264276

265-
let runner = runner(test_database.url(), test_context.clone()).register_job_type::<TestJob>();
277+
let pool = pool(test_database.url())?;
278+
let mut conn = pool.get().await?;
266279

267-
let mut conn = test_database.connect();
280+
let runner = runner(pool, test_context.clone()).register_job_type::<TestJob>();
268281

269282
// Enqueue first job
270-
assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap());
271-
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#);
283+
assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await?);
284+
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}]]"#);
272285

273286
// Try to enqueue the same job again, which should be deduplicated
274-
assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap());
275-
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}]]"#);
287+
assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await?);
288+
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}]]"#);
276289

277290
// Start processing the first job
278291
let runner = runner.start();
279292
test_context.job_started_barrier.wait().await;
280293

281294
// Enqueue the same job again, which should NOT be deduplicated,
282295
// since the first job already still running
283-
assert_some!(TestJob::new("foo").enqueue(&mut conn).unwrap());
284-
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
296+
assert_some!(TestJob::new("foo").async_enqueue(&mut conn).await?);
297+
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
285298

286299
// Try to enqueue the same job again, which should be deduplicated again
287-
assert_none!(TestJob::new("foo").enqueue(&mut conn).unwrap());
288-
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
300+
assert_none!(TestJob::new("foo").async_enqueue(&mut conn).await?);
301+
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}]]"#);
289302

290303
// Enqueue the same job but with different data, which should
291304
// NOT be deduplicated
292-
assert_some!(TestJob::new("bar").enqueue(&mut conn).unwrap());
293-
assert_compact_json_snapshot!(all_jobs(&mut conn), @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#);
305+
assert_some!(TestJob::new("bar").async_enqueue(&mut conn).await?);
306+
assert_compact_json_snapshot!(all_jobs(&mut conn).await?, @r#"[["test", {"value": "foo"}], ["test", {"value": "foo"}], ["test", {"value": "bar"}]]"#);
294307

295308
// Resolve the final barrier to finish the test
296309
test_context.assertions_finished_barrier.wait().await;
297310
runner.wait_for_shutdown().await;
311+
312+
Ok(())
313+
}
314+
315+
fn pool(database_url: &str) -> anyhow::Result<Pool<AsyncPgConnection>> {
316+
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
317+
Ok(Pool::builder(manager).max_size(4).build()?)
298318
}
299319

300320
fn runner<Context: Clone + Send + Sync + 'static>(
301-
database_url: &str,
321+
deadpool: Pool<AsyncPgConnection>,
302322
context: Context,
303323
) -> Runner<Context> {
304-
let manager = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
305-
let deadpool = Pool::builder(manager).max_size(4).build().unwrap();
306-
307324
Runner::new(deadpool, context)
308325
.configure_default_queue(|queue| queue.num_workers(2))
309326
.shutdown_when_queue_empty()

0 commit comments

Comments
 (0)