Skip to content

Commit 813f349

Browse files
authored
worker/jobs/sync_admins: Reduce spawn_blocking() usage (#9654)
1 parent 0f5bc97 commit 813f349

File tree

1 file changed

+105
-102
lines changed

1 file changed

+105
-102
lines changed

src/worker/jobs/sync_admins.rs

Lines changed: 105 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ use crate::tasks::spawn_blocking;
44
use crate::worker::Environment;
55
use crates_io_worker::BackgroundJob;
66
use diesel::prelude::*;
7-
use diesel::RunQueryDsl;
8-
use diesel_async::async_connection_wrapper::AsyncConnectionWrapper;
7+
use diesel_async::RunQueryDsl;
98
use std::collections::HashSet;
109
use std::fmt::{Display, Formatter};
1110
use std::sync::Arc;
@@ -31,117 +30,119 @@ impl BackgroundJob for SyncAdmins {
3130
.map(|m| m.github_id)
3231
.collect::<HashSet<_>>();
3332

34-
let conn = ctx.deadpool.get().await?;
35-
spawn_blocking(move || {
36-
let conn: &mut AsyncConnectionWrapper<_> = &mut conn.into();
33+
let mut conn = ctx.deadpool.get().await?;
34+
35+
let format_repo_admins = |github_ids: &HashSet<i32>| {
36+
repo_admins
37+
.iter()
38+
.filter(|m| github_ids.contains(&m.github_id))
39+
.map(|m| format!("{} (github_id: {})", m.github, m.github_id))
40+
.collect::<Vec<_>>()
41+
};
3742

38-
let format_repo_admins = |github_ids: &HashSet<i32>| {
39-
repo_admins
40-
.iter()
41-
.filter(|m| github_ids.contains(&m.github_id))
42-
.map(|m| format!("{} (github_id: {})", m.github, m.github_id))
43-
.collect::<Vec<_>>()
44-
};
43+
// Existing admins from the database.
4544

46-
// Existing admins from the database.
45+
let database_admins = users::table
46+
.left_join(emails::table)
47+
.select((users::gh_id, users::gh_login, emails::email.nullable()))
48+
.filter(users::is_admin.eq(true))
49+
.get_results::<(i32, String, Option<String>)>(&mut conn)
50+
.await?;
4751

48-
let database_admins = users::table
49-
.left_join(emails::table)
50-
.select((users::gh_id, users::gh_login, emails::email.nullable()))
51-
.filter(users::is_admin.eq(true))
52-
.get_results::<(i32, String, Option<String>)>(conn)?;
52+
let database_admin_ids = database_admins
53+
.iter()
54+
.map(|(gh_id, _, _)| *gh_id)
55+
.collect::<HashSet<_>>();
5356

54-
let database_admin_ids = database_admins
57+
let format_database_admins = |github_ids: &HashSet<i32>| {
58+
database_admins
5559
.iter()
56-
.map(|(gh_id, _, _)| *gh_id)
57-
.collect::<HashSet<_>>();
58-
59-
let format_database_admins = |github_ids: &HashSet<i32>| {
60-
database_admins
61-
.iter()
62-
.filter(|(gh_id, _, _)| github_ids.contains(gh_id))
63-
.map(|(gh_id, login, _)| format!("{} (github_id: {})", login, gh_id))
64-
.collect::<Vec<_>>()
65-
};
66-
67-
// New admins from the team repo that don't have admin access yet.
68-
69-
let new_admin_ids = repo_admin_ids
70-
.difference(&database_admin_ids)
71-
.copied()
72-
.collect::<HashSet<_>>();
73-
74-
let added_admin_ids = if new_admin_ids.is_empty() {
75-
Vec::new()
76-
} else {
77-
let new_admins = format_repo_admins(&new_admin_ids).join(", ");
78-
debug!("Granting admin access: {new_admins}");
79-
80-
diesel::update(users::table)
81-
.filter(users::gh_id.eq_any(&new_admin_ids))
82-
.set(users::is_admin.eq(true))
83-
.returning(users::gh_id)
84-
.get_results::<i32>(conn)?
85-
};
86-
87-
// New admins from the team repo that have been granted admin
88-
// access now.
89-
90-
let added_admin_ids = HashSet::from_iter(added_admin_ids);
91-
if !added_admin_ids.is_empty() {
92-
let added_admins = format_repo_admins(&added_admin_ids).join(", ");
93-
info!("Granted admin access: {added_admins}");
94-
}
60+
.filter(|(gh_id, _, _)| github_ids.contains(gh_id))
61+
.map(|(gh_id, login, _)| format!("{} (github_id: {})", login, gh_id))
62+
.collect::<Vec<_>>()
63+
};
9564

96-
// New admins from the team repo that don't have a crates.io
97-
// account yet.
65+
// New admins from the team repo that don't have admin access yet.
9866

99-
let skipped_new_admin_ids = new_admin_ids
100-
.difference(&added_admin_ids)
101-
.copied()
102-
.collect::<HashSet<_>>();
67+
let new_admin_ids = repo_admin_ids
68+
.difference(&database_admin_ids)
69+
.copied()
70+
.collect::<HashSet<_>>();
10371

104-
if !skipped_new_admin_ids.is_empty() {
105-
let skipped_new_admins = format_repo_admins(&skipped_new_admin_ids).join(", ");
106-
info!("Skipped missing admins: {skipped_new_admins}");
107-
}
72+
let added_admin_ids = if new_admin_ids.is_empty() {
73+
Vec::new()
74+
} else {
75+
let new_admins = format_repo_admins(&new_admin_ids).join(", ");
76+
debug!("Granting admin access: {new_admins}");
77+
78+
diesel::update(users::table)
79+
.filter(users::gh_id.eq_any(&new_admin_ids))
80+
.set(users::is_admin.eq(true))
81+
.returning(users::gh_id)
82+
.get_results::<i32>(&mut conn)
83+
.await?
84+
};
85+
86+
// New admins from the team repo that have been granted admin
87+
// access now.
88+
89+
let added_admin_ids = HashSet::from_iter(added_admin_ids);
90+
if !added_admin_ids.is_empty() {
91+
let added_admins = format_repo_admins(&added_admin_ids).join(", ");
92+
info!("Granted admin access: {added_admins}");
93+
}
10894

109-
// Existing admins from the database that are no longer in the
110-
// team repo.
111-
112-
let obsolete_admin_ids = database_admin_ids
113-
.difference(&repo_admin_ids)
114-
.copied()
115-
.collect::<HashSet<_>>();
116-
117-
let removed_admin_ids = if obsolete_admin_ids.is_empty() {
118-
Vec::new()
119-
} else {
120-
let obsolete_admins = format_database_admins(&obsolete_admin_ids).join(", ");
121-
debug!("Revoking admin access: {obsolete_admins}");
122-
123-
diesel::update(users::table)
124-
.filter(users::gh_id.eq_any(&obsolete_admin_ids))
125-
.set(users::is_admin.eq(false))
126-
.returning(users::gh_id)
127-
.get_results::<i32>(conn)?
128-
};
129-
130-
let removed_admin_ids = HashSet::from_iter(removed_admin_ids);
131-
if !removed_admin_ids.is_empty() {
132-
let removed_admins = format_database_admins(&removed_admin_ids).join(", ");
133-
info!("Revoked admin access: {removed_admins}");
134-
}
95+
// New admins from the team repo that don't have a crates.io
96+
// account yet.
13597

136-
if added_admin_ids.is_empty() && removed_admin_ids.is_empty() {
137-
return Ok(());
138-
}
98+
let skipped_new_admin_ids = new_admin_ids
99+
.difference(&added_admin_ids)
100+
.copied()
101+
.collect::<HashSet<_>>();
139102

140-
let added_admins = format_repo_admins(&added_admin_ids);
141-
let removed_admins = format_database_admins(&removed_admin_ids);
103+
if !skipped_new_admin_ids.is_empty() {
104+
let skipped_new_admins = format_repo_admins(&skipped_new_admin_ids).join(", ");
105+
info!("Skipped missing admins: {skipped_new_admins}");
106+
}
107+
108+
// Existing admins from the database that are no longer in the
109+
// team repo.
110+
111+
let obsolete_admin_ids = database_admin_ids
112+
.difference(&repo_admin_ids)
113+
.copied()
114+
.collect::<HashSet<_>>();
115+
116+
let removed_admin_ids = if obsolete_admin_ids.is_empty() {
117+
Vec::new()
118+
} else {
119+
let obsolete_admins = format_database_admins(&obsolete_admin_ids).join(", ");
120+
debug!("Revoking admin access: {obsolete_admins}");
121+
122+
diesel::update(users::table)
123+
.filter(users::gh_id.eq_any(&obsolete_admin_ids))
124+
.set(users::is_admin.eq(false))
125+
.returning(users::gh_id)
126+
.get_results::<i32>(&mut conn)
127+
.await?
128+
};
129+
130+
let removed_admin_ids = HashSet::from_iter(removed_admin_ids);
131+
if !removed_admin_ids.is_empty() {
132+
let removed_admins = format_database_admins(&removed_admin_ids).join(", ");
133+
info!("Revoked admin access: {removed_admins}");
134+
}
142135

143-
let email = AdminAccountEmail::new(added_admins, removed_admins);
136+
if added_admin_ids.is_empty() && removed_admin_ids.is_empty() {
137+
return Ok(());
138+
}
139+
140+
let added_admins = format_repo_admins(&added_admin_ids);
141+
let removed_admins = format_database_admins(&removed_admin_ids);
142+
143+
let email = AdminAccountEmail::new(added_admins, removed_admins);
144144

145+
spawn_blocking(move || {
145146
for database_admin in &database_admins {
146147
let (_, _, email_address) = database_admin;
147148
if let Some(email_address) = email_address {
@@ -159,9 +160,11 @@ impl BackgroundJob for SyncAdmins {
159160
}
160161
}
161162

162-
Ok(())
163+
Ok::<_, anyhow::Error>(())
163164
})
164-
.await
165+
.await?;
166+
167+
Ok(())
165168
}
166169
}
167170

0 commit comments

Comments
 (0)