Skip to content

Commit bdb6c63

Browse files
committed
move remaining state to db, allow multiple build servers
1 parent 1dd73f4 commit bdb6c63

File tree

9 files changed

+351
-153
lines changed

9 files changed

+351
-153
lines changed

src/bin/cratesfyi.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::sync::Arc;
66
use anyhow::{anyhow, Context as _, Error, Result};
77
use docs_rs::db::{self, add_path_into_database, Pool, PoolClient};
88
use docs_rs::repositories::RepositoryStatsUpdater;
9-
use docs_rs::utils::{remove_crate_priority, set_crate_priority};
9+
use docs_rs::utils::{
10+
get_config, queue_builder, remove_crate_priority, set_crate_priority, ConfigName,
11+
};
1012
use docs_rs::{
1113
BuildQueue, Config, Context, Index, Metrics, PackageKind, RustwideBuilder, Server, Storage,
1214
};
@@ -93,6 +95,18 @@ enum CommandLine {
9395
socket_addr: String,
9496
},
9597

98+
StartRegistryWatcher {
99+
/// Enable or disable the repository stats updater
100+
#[structopt(
101+
long = "repository-stats-updater",
102+
default_value = "disabled",
103+
possible_values(Toggle::VARIANTS)
104+
)]
105+
repository_stats_updater: Toggle,
106+
},
107+
108+
StartBuildServer,
109+
96110
/// Starts the daemon
97111
Daemon {
98112
/// Enable or disable the registry watcher to automatically enqueue newly published crates
@@ -123,6 +137,20 @@ impl CommandLine {
123137

124138
match self {
125139
Self::Build(build) => build.handle_args(ctx)?,
140+
Self::StartRegistryWatcher {
141+
repository_stats_updater,
142+
} => {
143+
if repository_stats_updater == Toggle::Enabled {
144+
docs_rs::utils::daemon::start_background_repository_stats_updater(&ctx)?;
145+
}
146+
147+
docs_rs::utils::watch_registry(ctx.build_queue()?, ctx.config()?, ctx.index()?)?;
148+
}
149+
Self::StartBuildServer => {
150+
let build_queue = ctx.build_queue()?;
151+
let rustwide_builder = RustwideBuilder::init(&ctx)?;
152+
queue_builder(rustwide_builder, build_queue)?;
153+
}
126154
Self::StartWebServer { socket_addr } => {
127155
// Blocks indefinitely
128156
let _ = Server::start(Some(&socket_addr), &ctx)?;
@@ -336,10 +364,8 @@ impl BuildSubcommand {
336364
.pool()?
337365
.get()
338366
.context("failed to get a database connection")?;
339-
let res =
340-
conn.query("SELECT * FROM config WHERE name = 'rustc_version';", &[])?;
341367

342-
if !res.is_empty() {
368+
if get_config(&mut conn, ConfigName::RustcVersion)?.is_string() {
343369
println!("update-toolchain was already called in the past, exiting");
344370
return Ok(());
345371
}

src/build_queue.rs

Lines changed: 134 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ use crate::db::{delete_crate, Pool};
22
use crate::docbuilder::PackageKind;
33
use crate::error::Result;
44
use crate::storage::Storage;
5-
use crate::utils::{get_crate_priority, report_error};
5+
use crate::utils::{get_config, get_crate_priority, report_error, set_config, ConfigName};
66
use crate::{Config, Index, Metrics, RustwideBuilder};
77
use anyhow::Context;
88

99
use crates_index_diff::Change;
1010
use log::{debug, info};
1111

12-
use std::fs;
13-
use std::path::PathBuf;
12+
use git2::Oid;
1413
use std::sync::Arc;
1514

1615
#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize)]
@@ -48,6 +47,31 @@ impl BuildQueue {
4847
}
4948
}
5049

50+
pub fn last_seen_reference(&self) -> Result<Option<Oid>> {
51+
let mut conn = self.db.get()?;
52+
if let Some(value) = get_config(&mut conn, ConfigName::LastSeenIndexReference)?.as_str() {
53+
match Oid::from_str(value) {
54+
Ok(oid) => return Ok(Some(oid)),
55+
Err(err) => {
56+
log::error!("queue locked because of invalid last_seen_index_reference \"{}\" in database: {}", value, err);
57+
self.lock()?;
58+
return Ok(None);
59+
}
60+
}
61+
}
62+
Ok(None)
63+
}
64+
65+
fn set_last_seen_reference(&self, oid: Oid) -> Result<()> {
66+
let mut conn = self.db.get()?;
67+
set_config(
68+
&mut conn,
69+
ConfigName::LastSeenIndexReference,
70+
oid.to_string(),
71+
)?;
72+
Ok(())
73+
}
74+
5175
pub fn add_crate(
5276
&self,
5377
name: &str,
@@ -118,14 +142,36 @@ impl BuildQueue {
118142
f: impl FnOnce(&QueuedCrate) -> Result<()>,
119143
) -> Result<()> {
120144
let mut conn = self.db.get()?;
121-
122-
let queued = self.queued_crates()?;
123-
let to_process = match queued.get(0) {
145+
let mut transaction = conn.transaction()?;
146+
147+
// fetch the next available crate from the queue table.
148+
// We are using `SELECT FOR UPDATE` inside a transaction so
149+
// the QueuedCrate is locked until we are finished with it.
150+
// `SKIP LOCKED` here will enable another build-server to just
151+
// skip over taken (=locked) rows and start building the first
152+
// available one.
153+
let to_process = match transaction
154+
.query_opt(
155+
"SELECT id, name, version, priority, registry
156+
FROM queue
157+
WHERE attempt < $1
158+
ORDER BY priority ASC, attempt ASC, id ASC
159+
LIMIT 1
160+
FOR UPDATE SKIP LOCKED",
161+
&[&self.max_attempts],
162+
)?
163+
.map(|row| QueuedCrate {
164+
id: row.get("id"),
165+
name: row.get("name"),
166+
version: row.get("version"),
167+
priority: row.get("priority"),
168+
registry: row.get("registry"),
169+
}) {
124170
Some(krate) => krate,
125171
None => return Ok(()),
126172
};
127173

128-
let res = f(to_process).with_context(|| {
174+
let res = f(&to_process).with_context(|| {
129175
format!(
130176
"Failed to build package {}-{} from queue",
131177
to_process.name, to_process.version
@@ -134,15 +180,16 @@ impl BuildQueue {
134180
self.metrics.total_builds.inc();
135181
match res {
136182
Ok(()) => {
137-
conn.execute("DELETE FROM queue WHERE id = $1;", &[&to_process.id])?;
183+
transaction.execute("DELETE FROM queue WHERE id = $1;", &[&to_process.id])?;
138184
}
139185
Err(e) => {
140186
// Increase attempt count
141-
let rows = conn.query(
142-
"UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;",
143-
&[&to_process.id],
144-
)?;
145-
let attempt: i32 = rows[0].get(0);
187+
let attempt: i32 = transaction
188+
.query_one(
189+
"UPDATE queue SET attempt = attempt + 1 WHERE id = $1 RETURNING attempt;",
190+
&[&to_process.id],
191+
)?
192+
.get(0);
146193

147194
if attempt >= self.max_attempts {
148195
self.metrics.failed_builds.inc();
@@ -152,39 +199,33 @@ impl BuildQueue {
152199
}
153200
}
154201

202+
transaction.commit()?;
203+
155204
Ok(())
156205
}
157206
}
158207

159208
/// Locking functions.
160209
impl BuildQueue {
161-
pub(crate) fn lock_path(&self) -> PathBuf {
162-
self.config.prefix.join("docsrs.lock")
163-
}
210+
/// Checks for the lock and returns whether it currently exists.
211+
pub fn is_locked(&self) -> Result<bool> {
212+
let mut conn = self.db.get()?;
164213

165-
/// Checks for the lock file and returns whether it currently exists.
166-
pub fn is_locked(&self) -> bool {
167-
self.lock_path().exists()
214+
Ok(get_config(&mut conn, ConfigName::QueueLocked)?
215+
.as_bool()
216+
.unwrap_or(false))
168217
}
169218

170-
/// Creates a lock file. Daemon will check this lock file and stop operating if it exists.
219+
/// lock the queue. Daemon will check this lock and stop operating if it exists.
171220
pub fn lock(&self) -> Result<()> {
172-
let path = self.lock_path();
173-
if !path.exists() {
174-
fs::OpenOptions::new().write(true).create(true).open(path)?;
175-
}
176-
177-
Ok(())
221+
let mut conn = self.db.get()?;
222+
set_config(&mut conn, ConfigName::QueueLocked, true)
178223
}
179224

180-
/// Removes lock file.
225+
/// unlock the queue.
181226
pub fn unlock(&self) -> Result<()> {
182-
let path = self.lock_path();
183-
if path.exists() {
184-
fs::remove_file(path)?;
185-
}
186-
187-
Ok(())
227+
let mut conn = self.db.get()?;
228+
set_config(&mut conn, ConfigName::QueueLocked, false)
188229
}
189230
}
190231

@@ -266,8 +307,15 @@ impl BuildQueue {
266307
}
267308
}
268309

310+
// store the last seen reference as git reference in
311+
// the local crates.io index repo.
269312
diff.set_last_seen_reference(oid)?;
270313

314+
// additionally set the reference in the database
315+
// so this survives recreating the registry watcher
316+
// server.
317+
self.set_last_seen_reference(oid)?;
318+
271319
Ok(crates_added)
272320
}
273321

@@ -559,4 +607,57 @@ mod tests {
559607
Ok(())
560608
});
561609
}
610+
611+
#[test]
612+
fn test_last_seen_reference_in_db() {
613+
crate::test::wrapper(|env| {
614+
let queue = env.build_queue();
615+
queue.unlock()?;
616+
assert!(!queue.is_locked()?);
617+
// initial db ref is empty
618+
assert_eq!(queue.last_seen_reference()?, None);
619+
assert!(!queue.is_locked()?);
620+
621+
let oid = git2::Oid::from_str("ffffffff")?;
622+
queue.set_last_seen_reference(oid)?;
623+
624+
assert_eq!(queue.last_seen_reference()?, Some(oid));
625+
assert!(!queue.is_locked()?);
626+
627+
Ok(())
628+
});
629+
}
630+
631+
#[test]
632+
fn test_broken_db_reference_locks_queue() {
633+
crate::test::wrapper(|env| {
634+
let mut conn = env.db().conn();
635+
set_config(&mut conn, ConfigName::LastSeenIndexReference, "invalid")?;
636+
637+
let queue = env.build_queue();
638+
queue.unlock()?;
639+
assert!(!queue.is_locked()?);
640+
assert_eq!(queue.last_seen_reference()?, None);
641+
assert!(queue.is_locked()?);
642+
643+
Ok(())
644+
});
645+
}
646+
647+
#[test]
648+
fn test_queue_lock() {
649+
crate::test::wrapper(|env| {
650+
let queue = env.build_queue();
651+
// unlocked without config
652+
assert!(!queue.is_locked()?);
653+
654+
queue.lock()?;
655+
assert!(queue.is_locked()?);
656+
657+
queue.unlock()?;
658+
assert!(!queue.is_locked()?);
659+
660+
Ok(())
661+
});
662+
}
562663
}

src/docbuilder/rustwide_builder.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ use crate::error::Result;
88
use crate::index::api::ReleaseData;
99
use crate::repositories::RepositoryStatsUpdater;
1010
use crate::storage::{rustdoc_archive_path, source_archive_path};
11-
use crate::utils::{copy_dir_all, parse_rustc_version, queue_builder, CargoMetadata};
11+
use crate::utils::{
12+
copy_dir_all, parse_rustc_version, queue_builder, set_config, CargoMetadata, ConfigName,
13+
};
1214
use crate::{db::blacklist::is_blacklisted, utils::MetadataPackage};
1315
use crate::{Config, Context, Index, Metrics, Storage};
1416
use anyhow::{anyhow, bail, Error};
@@ -20,7 +22,6 @@ use rustwide::cmd::{Command, CommandError, SandboxBuilder, SandboxImage};
2022
use rustwide::logging::{self, LogStorage};
2123
use rustwide::toolchain::ToolchainError;
2224
use rustwide::{AlternativeRegistry, Build, Crate, Toolchain, Workspace, WorkspaceBuilder};
23-
use serde_json::Value;
2425
use std::collections::{HashMap, HashSet};
2526
use std::path::Path;
2627
use std::sync::Arc;
@@ -225,12 +226,12 @@ impl RustwideBuilder {
225226
.tempdir()?;
226227
copy_dir_all(source, &dest)?;
227228
add_path_into_database(&self.storage, "", &dest)?;
228-
conn.query(
229-
"INSERT INTO config (name, value) VALUES ('rustc_version', $1) \
230-
ON CONFLICT (name) DO UPDATE SET value = $1;",
231-
&[&Value::String(self.rustc_version.clone())],
232-
)?;
233229

230+
set_config(
231+
&mut conn,
232+
ConfigName::RustcVersion,
233+
self.rustc_version.clone(),
234+
)?;
234235
Ok(())
235236
})()
236237
.map_err(|e| failure::Error::from_boxed_compat(e.into()))
@@ -806,6 +807,7 @@ pub(crate) struct BuildResult {
806807
mod tests {
807808
use super::*;
808809
use crate::test::{assert_redirect, assert_success, wrapper};
810+
use serde_json::Value;
809811

810812
#[test]
811813
#[ignore]

src/metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl Metrics {
158158
self.idle_db_connections.set(pool.idle_connections() as i64);
159159
self.used_db_connections.set(pool.used_connections() as i64);
160160
self.max_db_connections.set(pool.max_size() as i64);
161-
self.queue_is_locked.set(queue.is_locked() as i64);
161+
self.queue_is_locked.set(queue.is_locked()? as i64);
162162

163163
self.queued_crates_count.set(queue.pending_count()? as i64);
164164
self.prioritized_crates_count

0 commit comments

Comments
 (0)