Skip to content

Commit 4c167c5

Browse files
committed
typosquat: Asyncify all the things
1 parent 6a5cf9f commit 4c167c5

File tree

4 files changed

+89
-69
lines changed

4 files changed

+89
-69
lines changed

src/typosquat/cache.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1+
use diesel_async::AsyncPgConnection;
12
use std::sync::Arc;
2-
3-
use crate::util::diesel::Conn;
43
use thiserror::Error;
54
use typomania::{
65
checks::{Bitflips, Omitted, SwappedWords, Typos},
@@ -28,7 +27,7 @@ impl Cache {
2827
/// addresses to send notifications to, then invokes [`Cache::new`] to read popular crates from
2928
/// the database.
3029
#[instrument(skip_all, err)]
31-
pub fn from_env(conn: &mut impl Conn) -> Result<Self, Error> {
30+
pub async fn from_env(conn: &mut AsyncPgConnection) -> Result<Self, Error> {
3231
let emails: Vec<String> = crates_io_env_vars::var(NOTIFICATION_EMAILS_ENV)
3332
.map_err(|e| Error::Environment {
3433
name: NOTIFICATION_EMAILS_ENV.into(),
@@ -49,15 +48,15 @@ impl Cache {
4948
})
5049
} else {
5150
// Otherwise, let's go get the top crates and build a corpus.
52-
Self::new(emails, conn)
51+
Self::new(emails, conn).await
5352
}
5453
}
5554

5655
/// Instantiates a cache by querying popular crates and building them into a typomania harness.
5756
///
5857
/// This relies on configuration in the `super::config` module.
59-
pub fn new(emails: Vec<String>, conn: &mut impl Conn) -> Result<Self, Error> {
60-
let top = TopCrates::new(conn, config::TOP_CRATES)?;
58+
pub async fn new(emails: Vec<String>, conn: &mut AsyncPgConnection) -> Result<Self, Error> {
59+
let top = TopCrates::new(conn, config::TOP_CRATES).await?;
6160

6261
Ok(Self {
6362
emails,

src/typosquat/database.rs

Lines changed: 55 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::{
55
collections::{BTreeMap, HashMap, HashSet},
66
};
77

8-
use crate::util::diesel::Conn;
9-
use diesel::{connection::DefaultLoadingMode, QueryResult};
8+
use crate::util::diesel::prelude::*;
9+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
10+
use futures_util::TryStreamExt;
1011
use typomania::{AuthorSet, Corpus, Package};
1112

1213
/// A corpus of the current top crates on crates.io, as determined by their download counts, along
@@ -18,12 +19,11 @@ pub struct TopCrates {
1819

1920
impl TopCrates {
2021
/// Retrieves the `num` top crates from the database.
21-
pub fn new(conn: &mut impl Conn, num: i64) -> QueryResult<Self> {
22+
pub async fn new(conn: &mut AsyncPgConnection, num: i64) -> QueryResult<Self> {
2223
use crate::{
2324
models,
2425
schema::{crate_downloads, crate_owners},
2526
};
26-
use diesel::prelude::*;
2727

2828
// We have to build up a data structure that contains the top crates, their owners in some
2929
// form that is easily compared, and that can be indexed by the crate name.
@@ -42,45 +42,51 @@ impl TopCrates {
4242
// Once we have the results of those queries, we can glom it all together into one happy
4343
// data structure.
4444

45-
let mut crates: BTreeMap<i32, (String, Crate)> = BTreeMap::new();
46-
for result in models::Crate::all()
45+
let crates: BTreeMap<i32, (String, Crate)> = BTreeMap::new();
46+
let crates = models::Crate::all()
4747
.inner_join(crate_downloads::table)
4848
.order(crate_downloads::downloads.desc())
4949
.limit(num)
50-
.load_iter::<models::Crate, DefaultLoadingMode>(conn)?
51-
{
52-
let krate = result?;
53-
crates.insert(
54-
krate.id,
55-
(
56-
krate.name,
57-
Crate {
58-
owners: HashSet::new(),
59-
},
60-
),
61-
);
62-
}
50+
.load_stream::<models::Crate>(conn)
51+
.await?
52+
.try_fold(crates, |mut crates, krate| {
53+
crates.insert(
54+
krate.id,
55+
(
56+
krate.name,
57+
Crate {
58+
owners: HashSet::new(),
59+
},
60+
),
61+
);
62+
63+
futures_util::future::ready(Ok(crates))
64+
})
65+
.await?;
6366

6467
// This query might require more low level knowledge of crate_owners than we really want
6568
// outside of the models module. It would probably make more sense in the long term to have
6669
// this live in the Owner type, but for now I want to keep the typosquatting logic as
6770
// self-contained as possible in case we decide not to go ahead with this in the longer
6871
// term.
69-
for result in crate_owners::table
72+
let crates = crate_owners::table
7073
.filter(crate_owners::deleted.eq(false))
7174
.filter(crate_owners::crate_id.eq_any(crates.keys().cloned().collect::<Vec<_>>()))
7275
.select((
7376
crate_owners::crate_id,
7477
crate_owners::owner_id,
7578
crate_owners::owner_kind,
7679
))
77-
.load_iter::<(i32, i32, i32), DefaultLoadingMode>(conn)?
78-
{
79-
let (crate_id, owner_id, owner_kind) = result?;
80-
crates.entry(crate_id).and_modify(|(_name, krate)| {
81-
krate.owners.insert(Owner::new(owner_id, owner_kind));
82-
});
83-
}
80+
.load_stream::<(i32, i32, i32)>(conn)
81+
.await?
82+
.try_fold(crates, |mut crates, (crate_id, owner_id, owner_kind)| {
83+
crates.entry(crate_id).and_modify(|(_name, krate)| {
84+
krate.owners.insert(Owner::new(owner_id, owner_kind));
85+
});
86+
87+
futures_util::future::ready(Ok(crates))
88+
})
89+
.await?;
8490

8591
Ok(Self {
8692
crates: crates.into_values().collect(),
@@ -104,12 +110,16 @@ pub struct Crate {
104110

105111
impl Crate {
106112
/// Hydrates a crate and its owners from the database given the crate name.
107-
pub fn from_name(conn: &mut impl Conn, name: &str) -> QueryResult<Self> {
113+
pub async fn from_name(conn: &mut AsyncPgConnection, name: &str) -> QueryResult<Self> {
108114
use crate::models;
109-
use diesel::prelude::*;
110115

111-
let krate = models::Crate::by_exact_name(name).first(conn)?;
112-
let owners = krate.owners(conn)?.into_iter().map(Owner::from).collect();
116+
let krate = models::Crate::by_exact_name(name).first(conn).await?;
117+
let owners = krate
118+
.async_owners(conn)
119+
.await?
120+
.into_iter()
121+
.map(Owner::from)
122+
.collect();
113123

114124
Ok(Self { owners })
115125
}
@@ -163,14 +173,16 @@ impl From<crate::models::Owner> for Owner {
163173

164174
#[cfg(test)]
165175
mod tests {
166-
use crate::{test_util::test_db_connection, typosquat::test_util::faker};
167-
use thiserror::Error;
168-
169176
use super::*;
177+
use crate::typosquat::test_util::faker;
178+
use crates_io_test_db::TestDatabase;
179+
use diesel_async::AsyncConnection;
180+
use thiserror::Error;
170181

171-
#[test]
172-
fn top_crates() -> Result<(), Error> {
173-
let (_test_db, mut conn) = test_db_connection();
182+
#[tokio::test]
183+
async fn top_crates() -> Result<(), Error> {
184+
let test_db = TestDatabase::new();
185+
let mut conn = test_db.connect();
174186

175187
// Set up two users.
176188
let user_a = faker::user(&mut conn, "a")?;
@@ -186,7 +198,8 @@ mod tests {
186198
faker::add_crate_to_team(&mut conn, &user_b, &top_b, &not_the_a_team)?;
187199
faker::add_crate_to_team(&mut conn, &user_b, &not_top_c, &not_the_a_team)?;
188200

189-
let top_crates = TopCrates::new(&mut conn, 2)?;
201+
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?;
202+
let top_crates = TopCrates::new(&mut async_conn, 2).await?;
190203

191204
// Let's ensure the top crates include what we expect (which is a and b, since we asked for
192205
// 2 crates and they're the most downloaded).
@@ -200,7 +213,7 @@ mod tests {
200213
assert!(!pkg_a.shared_authors(pkg_b.authors()));
201214

202215
// Now let's go get package c and pretend it's a new package.
203-
let pkg_c = Crate::from_name(&mut conn, "c")?;
216+
let pkg_c = Crate::from_name(&mut async_conn, "c").await?;
204217

205218
// c _does_ have an author in common with a.
206219
assert!(pkg_a.shared_authors(pkg_c.authors()));
@@ -226,5 +239,8 @@ mod tests {
226239

227240
#[error(transparent)]
228241
Diesel(#[from] diesel::result::Error),
242+
243+
#[error(transparent)]
244+
Connection(#[from] ConnectionError),
229245
}
230246
}

src/worker/environment.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::cloudfront::CloudFront;
22
use crate::fastly::Fastly;
33
use crate::storage::Storage;
44
use crate::typosquat;
5-
use crate::util::diesel::Conn;
65
use crate::Emails;
76
use anyhow::Context;
87
use bon::Builder;
@@ -13,8 +12,9 @@ use diesel_async::AsyncPgConnection;
1312
use object_store::ObjectStore;
1413
use parking_lot::{Mutex, MutexGuard};
1514
use std::ops::{Deref, DerefMut};
16-
use std::sync::{Arc, OnceLock};
15+
use std::sync::Arc;
1716
use std::time::Instant;
17+
use tokio::sync::OnceCell;
1818

1919
#[derive(Builder)]
2020
pub struct Environment {
@@ -33,7 +33,7 @@ pub struct Environment {
3333

3434
/// A lazily initialised cache of the most popular crates ready to use in typosquatting checks.
3535
#[builder(skip)]
36-
typosquat_cache: OnceLock<Result<typosquat::Cache, typosquat::CacheError>>,
36+
typosquat_cache: OnceCell<Result<typosquat::Cache, typosquat::CacheError>>,
3737
}
3838

3939
impl Environment {
@@ -78,9 +78,9 @@ impl Environment {
7878
}
7979

8080
/// Returns the typosquatting cache, initialising it if required.
81-
pub(crate) fn typosquat_cache(
81+
pub(crate) async fn typosquat_cache(
8282
&self,
83-
conn: &mut impl Conn,
83+
conn: &mut AsyncPgConnection,
8484
) -> Result<&typosquat::Cache, typosquat::CacheError> {
8585
// We have to pass conn back in here because the caller might be in a transaction, and
8686
// getting a new connection here to query crates can result in a deadlock.
@@ -90,6 +90,7 @@ impl Environment {
9090
// generated if initialising the cache fails.
9191
self.typosquat_cache
9292
.get_or_init(|| typosquat::Cache::from_env(conn))
93+
.await
9394
.as_ref()
9495
.map_err(|e| e.clone())
9596
}

src/worker/jobs/typosquat.rs

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use std::sync::Arc;
22

33
use crates_io_worker::BackgroundJob;
4-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
4+
use diesel_async::AsyncPgConnection;
55
use typomania::Package;
66

77
use crate::email::Email;
8-
use crate::tasks::spawn_blocking;
9-
use crate::util::diesel::Conn;
108
use crate::{
119
typosquat::{Cache, Crate},
1210
worker::Environment,
@@ -36,22 +34,23 @@ impl BackgroundJob for CheckTyposquat {
3634
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
3735
let crate_name = self.name.clone();
3836

39-
let conn = env.deadpool.get().await?;
40-
spawn_blocking(move || {
41-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
37+
let mut conn = env.deadpool.get().await?;
4238

43-
let cache = env.typosquat_cache(conn)?;
44-
check(&env.emails, cache, conn, &crate_name)
45-
})
46-
.await
39+
let cache = env.typosquat_cache(&mut conn).await?;
40+
check(&env.emails, cache, &mut conn, &crate_name).await
4741
}
4842
}
4943

50-
fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> anyhow::Result<()> {
44+
async fn check(
45+
emails: &Emails,
46+
cache: &Cache,
47+
conn: &mut AsyncPgConnection,
48+
name: &str,
49+
) -> anyhow::Result<()> {
5150
if let Some(harness) = cache.get_harness() {
5251
info!(name, "Checking new crate for potential typosquatting");
5352

54-
let krate: Box<dyn Package> = Box::new(Crate::from_name(conn, name)?);
53+
let krate: Box<dyn Package> = Box::new(Crate::from_name(conn, name).await?);
5554
let squats = harness.check_package(name, krate)?;
5655
if !squats.is_empty() {
5756
// Well, well, well. For now, the only action we'll take is to e-mail people who
@@ -122,22 +121,27 @@ Specific squat checks that triggered:
122121

123122
#[cfg(test)]
124123
mod tests {
125-
use crate::{test_util::test_db_connection, typosquat::test_util::faker};
124+
use crate::typosquat::test_util::faker;
125+
use crates_io_test_db::TestDatabase;
126+
use diesel_async::AsyncConnection;
126127
use lettre::Address;
127128

128129
use super::*;
129130

130-
#[test]
131-
fn integration() -> anyhow::Result<()> {
131+
#[tokio::test]
132+
async fn integration() -> anyhow::Result<()> {
132133
let emails = Emails::new_in_memory();
133-
let (_test_db, mut conn) = test_db_connection();
134+
135+
let test_db = TestDatabase::new();
136+
let mut conn = test_db.connect();
134137

135138
// Set up a user and a popular crate to match against.
136139
let user = faker::user(&mut conn, "a")?;
137140
faker::crate_and_version(&mut conn, "my-crate", "It's awesome", &user, 100)?;
138141

139142
// Prime the cache so it only includes the crate we just created.
140-
let cache = Cache::new(vec!["[email protected]".to_string()], &mut conn)?;
143+
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?;
144+
let cache = Cache::new(vec!["[email protected]".to_string()], &mut async_conn).await?;
141145

142146
// Now we'll create new crates: one problematic, one not so.
143147
let other_user = faker::user(&mut conn, "b")?;
@@ -157,11 +161,11 @@ mod tests {
157161
)?;
158162

159163
// Run the check with a crate that shouldn't cause problems.
160-
check(&emails, &cache, &mut conn, &angel.name)?;
164+
check(&emails, &cache, &mut async_conn, &angel.name).await?;
161165
assert!(emails.mails_in_memory().unwrap().is_empty());
162166

163167
// Now run the check with a less innocent crate.
164-
check(&emails, &cache, &mut conn, &demon.name)?;
168+
check(&emails, &cache, &mut async_conn, &demon.name).await?;
165169
let sent_mail = emails.mails_in_memory().unwrap();
166170
assert!(!sent_mail.is_empty());
167171
let sent = sent_mail.into_iter().next().unwrap();

0 commit comments

Comments
 (0)