Skip to content

Commit 1032ce9

Browse files
authored
typosquat: Asyncify all the things (#9937)
1 parent ba41e57 commit 1032ce9

File tree

4 files changed

+78
-75
lines changed

4 files changed

+78
-75
lines changed

src/typosquat/cache.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1+
use diesel_async::AsyncPgConnection;
12
use std::sync::Arc;
2-
3-
use crate::util::diesel::Conn;
43
use thiserror::Error;
54
use typomania::{
65
checks::{Bitflips, Omitted, SwappedWords, Typos},
@@ -28,7 +27,7 @@ impl Cache {
2827
/// addresses to send notifications to, then invokes [`Cache::new`] to read popular crates from
2928
/// the database.
3029
#[instrument(skip_all, err)]
31-
pub fn from_env(conn: &mut impl Conn) -> Result<Self, Error> {
30+
pub async fn from_env(conn: &mut AsyncPgConnection) -> Result<Self, Error> {
3231
let emails: Vec<String> = crates_io_env_vars::var(NOTIFICATION_EMAILS_ENV)
3332
.map_err(|e| Error::Environment {
3433
name: NOTIFICATION_EMAILS_ENV.into(),
@@ -49,15 +48,15 @@ impl Cache {
4948
})
5049
} else {
5150
// Otherwise, let's go get the top crates and build a corpus.
52-
Self::new(emails, conn)
51+
Self::new(emails, conn).await
5352
}
5453
}
5554

5655
/// Instantiates a cache by querying popular crates and building them into a typomania harness.
5756
///
5857
/// This relies on configuration in the `super::config` module.
59-
pub fn new(emails: Vec<String>, conn: &mut impl Conn) -> Result<Self, Error> {
60-
let top = TopCrates::new(conn, config::TOP_CRATES)?;
58+
pub async fn new(emails: Vec<String>, conn: &mut AsyncPgConnection) -> Result<Self, Error> {
59+
let top = TopCrates::new(conn, config::TOP_CRATES).await?;
6160

6261
Ok(Self {
6362
emails,

src/typosquat/database.rs

Lines changed: 50 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::{
55
collections::{BTreeMap, HashMap, HashSet},
66
};
77

8-
use crate::util::diesel::Conn;
9-
use diesel::{connection::DefaultLoadingMode, QueryResult};
8+
use crate::util::diesel::prelude::*;
9+
use diesel_async::{AsyncPgConnection, RunQueryDsl};
10+
use futures_util::TryStreamExt;
1011
use typomania::{AuthorSet, Corpus, Package};
1112

1213
/// A corpus of the current top crates on crates.io, as determined by their download counts, along
@@ -18,12 +19,11 @@ pub struct TopCrates {
1819

1920
impl TopCrates {
2021
/// Retrieves the `num` top crates from the database.
21-
pub fn new(conn: &mut impl Conn, num: i64) -> QueryResult<Self> {
22+
pub async fn new(conn: &mut AsyncPgConnection, num: i64) -> QueryResult<Self> {
2223
use crate::{
2324
models,
2425
schema::{crate_downloads, crate_owners},
2526
};
26-
use diesel::prelude::*;
2727

2828
// We have to build up a data structure that contains the top crates, their owners in some
2929
// form that is easily compared, and that can be indexed by the crate name.
@@ -42,45 +42,51 @@ impl TopCrates {
4242
// Once we have the results of those queries, we can glom it all together into one happy
4343
// data structure.
4444

45-
let mut crates: BTreeMap<i32, (String, Crate)> = BTreeMap::new();
46-
for result in models::Crate::all()
45+
let crates: BTreeMap<i32, (String, Crate)> = BTreeMap::new();
46+
let crates = models::Crate::all()
4747
.inner_join(crate_downloads::table)
4848
.order(crate_downloads::downloads.desc())
4949
.limit(num)
50-
.load_iter::<models::Crate, DefaultLoadingMode>(conn)?
51-
{
52-
let krate = result?;
53-
crates.insert(
54-
krate.id,
55-
(
56-
krate.name,
57-
Crate {
58-
owners: HashSet::new(),
59-
},
60-
),
61-
);
62-
}
50+
.load_stream::<models::Crate>(conn)
51+
.await?
52+
.try_fold(crates, |mut crates, krate| {
53+
crates.insert(
54+
krate.id,
55+
(
56+
krate.name,
57+
Crate {
58+
owners: HashSet::new(),
59+
},
60+
),
61+
);
62+
63+
futures_util::future::ready(Ok(crates))
64+
})
65+
.await?;
6366

6467
// This query might require more low level knowledge of crate_owners than we really want
6568
// outside of the models module. It would probably make more sense in the long term to have
6669
// this live in the Owner type, but for now I want to keep the typosquatting logic as
6770
// self-contained as possible in case we decide not to go ahead with this in the longer
6871
// term.
69-
for result in crate_owners::table
72+
let crates = crate_owners::table
7073
.filter(crate_owners::deleted.eq(false))
7174
.filter(crate_owners::crate_id.eq_any(crates.keys().cloned().collect::<Vec<_>>()))
7275
.select((
7376
crate_owners::crate_id,
7477
crate_owners::owner_id,
7578
crate_owners::owner_kind,
7679
))
77-
.load_iter::<(i32, i32, i32), DefaultLoadingMode>(conn)?
78-
{
79-
let (crate_id, owner_id, owner_kind) = result?;
80-
crates.entry(crate_id).and_modify(|(_name, krate)| {
81-
krate.owners.insert(Owner::new(owner_id, owner_kind));
82-
});
83-
}
80+
.load_stream::<(i32, i32, i32)>(conn)
81+
.await?
82+
.try_fold(crates, |mut crates, (crate_id, owner_id, owner_kind)| {
83+
crates.entry(crate_id).and_modify(|(_name, krate)| {
84+
krate.owners.insert(Owner::new(owner_id, owner_kind));
85+
});
86+
87+
futures_util::future::ready(Ok(crates))
88+
})
89+
.await?;
8490

8591
Ok(Self {
8692
crates: crates.into_values().collect(),
@@ -104,12 +110,16 @@ pub struct Crate {
104110

105111
impl Crate {
106112
/// Hydrates a crate and its owners from the database given the crate name.
107-
pub fn from_name(conn: &mut impl Conn, name: &str) -> QueryResult<Self> {
113+
pub async fn from_name(conn: &mut AsyncPgConnection, name: &str) -> QueryResult<Self> {
108114
use crate::models;
109-
use diesel::prelude::*;
110115

111-
let krate = models::Crate::by_exact_name(name).first(conn)?;
112-
let owners = krate.owners(conn)?.into_iter().map(Owner::from).collect();
116+
let krate = models::Crate::by_exact_name(name).first(conn).await?;
117+
let owners = krate
118+
.async_owners(conn)
119+
.await?
120+
.into_iter()
121+
.map(Owner::from)
122+
.collect();
113123

114124
Ok(Self { owners })
115125
}
@@ -166,10 +176,11 @@ mod tests {
166176
use super::*;
167177
use crate::typosquat::test_util::faker;
168178
use crates_io_test_db::TestDatabase;
179+
use diesel_async::AsyncConnection;
169180
use thiserror::Error;
170181

171-
#[test]
172-
fn top_crates() -> Result<(), Error> {
182+
#[tokio::test]
183+
async fn top_crates() -> Result<(), Error> {
173184
let test_db = TestDatabase::new();
174185
let mut conn = test_db.connect();
175186

@@ -187,7 +198,8 @@ mod tests {
187198
faker::add_crate_to_team(&mut conn, &user_b, &top_b, &not_the_a_team)?;
188199
faker::add_crate_to_team(&mut conn, &user_b, &not_top_c, &not_the_a_team)?;
189200

190-
let top_crates = TopCrates::new(&mut conn, 2)?;
201+
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?;
202+
let top_crates = TopCrates::new(&mut async_conn, 2).await?;
191203

192204
// Let's ensure the top crates include what we expect (which is a and b, since we asked for
193205
// 2 crates and they're the most downloaded).
@@ -201,7 +213,7 @@ mod tests {
201213
assert!(!pkg_a.shared_authors(pkg_b.authors()));
202214

203215
// Now let's go get package c and pretend it's a new package.
204-
let pkg_c = Crate::from_name(&mut conn, "c")?;
216+
let pkg_c = Crate::from_name(&mut async_conn, "c").await?;
205217

206218
// c _does_ have an author in common with a.
207219
assert!(pkg_a.shared_authors(pkg_c.authors()));
@@ -227,5 +239,8 @@ mod tests {
227239

228240
#[error(transparent)]
229241
Diesel(#[from] diesel::result::Error),
242+
243+
#[error(transparent)]
244+
Connection(#[from] ConnectionError),
230245
}
231246
}

src/worker/environment.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use crate::cloudfront::CloudFront;
22
use crate::fastly::Fastly;
33
use crate::storage::Storage;
44
use crate::typosquat;
5-
use crate::util::diesel::Conn;
65
use crate::Emails;
76
use anyhow::Context;
87
use bon::Builder;
@@ -13,8 +12,9 @@ use diesel_async::AsyncPgConnection;
1312
use object_store::ObjectStore;
1413
use parking_lot::{Mutex, MutexGuard};
1514
use std::ops::{Deref, DerefMut};
16-
use std::sync::{Arc, OnceLock};
15+
use std::sync::Arc;
1716
use std::time::Instant;
17+
use tokio::sync::OnceCell;
1818

1919
#[derive(Builder)]
2020
pub struct Environment {
@@ -33,7 +33,7 @@ pub struct Environment {
3333

3434
/// A lazily initialised cache of the most popular crates ready to use in typosquatting checks.
3535
#[builder(skip)]
36-
typosquat_cache: OnceLock<Result<typosquat::Cache, typosquat::CacheError>>,
36+
typosquat_cache: OnceCell<Result<typosquat::Cache, typosquat::CacheError>>,
3737
}
3838

3939
impl Environment {
@@ -78,9 +78,9 @@ impl Environment {
7878
}
7979

8080
/// Returns the typosquatting cache, initialising it if required.
81-
pub(crate) fn typosquat_cache(
81+
pub(crate) async fn typosquat_cache(
8282
&self,
83-
conn: &mut impl Conn,
83+
conn: &mut AsyncPgConnection,
8484
) -> Result<&typosquat::Cache, typosquat::CacheError> {
8585
// We have to pass conn back in here because the caller might be in a transaction, and
8686
// getting a new connection here to query crates can result in a deadlock.
@@ -90,6 +90,7 @@ impl Environment {
9090
// generated if initialising the cache fails.
9191
self.typosquat_cache
9292
.get_or_init(|| typosquat::Cache::from_env(conn))
93+
.await
9394
.as_ref()
9495
.map_err(|e| e.clone())
9596
}

src/worker/jobs/typosquat.rs

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
use std::sync::Arc;
22

33
use crates_io_worker::BackgroundJob;
4-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
4+
use diesel_async::AsyncPgConnection;
55
use typomania::Package;
66

77
use crate::email::Email;
8-
use crate::tasks::spawn_blocking;
9-
use crate::util::diesel::Conn;
108
use crate::{
119
typosquat::{Cache, Crate},
1210
worker::Environment,
@@ -36,22 +34,23 @@ impl BackgroundJob for CheckTyposquat {
3634
async fn run(&self, env: Self::Context) -> anyhow::Result<()> {
3735
let crate_name = self.name.clone();
3836

39-
let conn = env.deadpool.get().await?;
40-
spawn_blocking(move || {
41-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
37+
let mut conn = env.deadpool.get().await?;
4238

43-
let cache = env.typosquat_cache(conn)?;
44-
check(&env.emails, cache, conn, &crate_name)
45-
})
46-
.await
39+
let cache = env.typosquat_cache(&mut conn).await?;
40+
check(&env.emails, cache, &mut conn, &crate_name).await
4741
}
4842
}
4943

50-
fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> anyhow::Result<()> {
44+
async fn check(
45+
emails: &Emails,
46+
cache: &Cache,
47+
conn: &mut AsyncPgConnection,
48+
name: &str,
49+
) -> anyhow::Result<()> {
5150
if let Some(harness) = cache.get_harness() {
5251
info!(name, "Checking new crate for potential typosquatting");
5352

54-
let krate: Box<dyn Package> = Box::new(Crate::from_name(conn, name)?);
53+
let krate: Box<dyn Package> = Box::new(Crate::from_name(conn, name).await?);
5554
let squats = harness.check_package(name, krate)?;
5655
if !squats.is_empty() {
5756
// Well, well, well. For now, the only action we'll take is to e-mail people who
@@ -65,7 +64,7 @@ fn check(emails: &Emails, cache: &Cache, conn: &mut impl Conn, name: &str) -> an
6564
};
6665

6766
for recipient in cache.iter_emails() {
68-
if let Err(error) = emails.send(recipient, email.clone()) {
67+
if let Err(error) = emails.async_send(recipient, email.clone()).await {
6968
error!(
7069
?error,
7170
?recipient,
@@ -125,6 +124,7 @@ mod tests {
125124
use super::*;
126125
use crate::typosquat::test_util::faker;
127126
use crates_io_test_db::TestDatabase;
127+
use diesel_async::AsyncConnection;
128128
use lettre::Address;
129129

130130
#[tokio::test]
@@ -138,7 +138,8 @@ mod tests {
138138
faker::crate_and_version(&mut conn, "my-crate", "It's awesome", &user, 100)?;
139139

140140
// Prime the cache so it only includes the crate we just created.
141-
let cache = Cache::new(vec!["[email protected]".to_string()], &mut conn)?;
141+
let mut async_conn = AsyncPgConnection::establish(test_db.url()).await?;
142+
let cache = Cache::new(vec!["[email protected]".to_string()], &mut async_conn).await?;
142143
let cache = Arc::new(cache);
143144

144145
// Now we'll create new crates: one problematic, one not so.
@@ -159,24 +160,11 @@ mod tests {
159160
)?;
160161

161162
// Run the check with a crate that shouldn't cause problems.
162-
let mut conn = spawn_blocking({
163-
let emails = emails.clone();
164-
let cache = cache.clone();
165-
move || {
166-
check(&emails, &cache, &mut conn, &angel.name)?;
167-
Ok::<_, anyhow::Error>(conn)
168-
}
169-
})
170-
.await?;
163+
check(&emails, &cache, &mut async_conn, &angel.name).await?;
171164
assert!(emails.mails_in_memory().await.unwrap().is_empty());
172165

173166
// Now run the check with a less innocent crate.
174-
spawn_blocking({
175-
let emails = emails.clone();
176-
let cache = cache.clone();
177-
move || check(&emails, &cache, &mut conn, &demon.name)
178-
})
179-
.await?;
167+
check(&emails, &cache, &mut async_conn, &demon.name).await?;
180168
let sent_mail = emails.mails_in_memory().await.unwrap();
181169
assert!(!sent_mail.is_empty());
182170
let sent = sent_mail.into_iter().next().unwrap();

0 commit comments

Comments
 (0)