Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions src/bin/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@
//! cargo run --bin monitor

use anyhow::Result;
use crates_io::tasks::spawn_blocking;
use crates_io::worker::jobs;
use crates_io::{admin::on_call, db, schema::*};
use crates_io_env_vars::{var, var_parsed};
use crates_io_worker::BackgroundJob;
use diesel::prelude::*;
use diesel_async::{AsyncPgConnection, RunQueryDsl};

fn main() -> Result<()> {
let conn = &mut db::oneoff_connection()?;
#[tokio::main]
async fn main() -> Result<()> {
let conn = &mut db::oneoff_async_connection().await?;

Check warning on line 18 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L17-L18

Added lines #L17 - L18 were not covered by tests

check_failing_background_jobs(conn)?;
check_stalled_update_downloads(conn)?;
check_spam_attack(conn)?;
check_failing_background_jobs(conn).await?;
check_stalled_update_downloads(conn).await?;
check_spam_attack(conn).await?;

Check warning on line 22 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L20-L22

Added lines #L20 - L22 were not covered by tests
Ok(())
}

Expand All @@ -28,7 +31,7 @@
///
/// Within the default 15 minute time, a job should have already had several
/// failed retry attempts.
fn check_failing_background_jobs(conn: &mut PgConnection) -> Result<()> {
async fn check_failing_background_jobs(conn: &mut AsyncPgConnection) -> Result<()> {

Check warning on line 34 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L34

Added line #L34 was not covered by tests
use diesel::dsl::*;
use diesel::sql_types::Integer;

Expand All @@ -45,7 +48,8 @@
.filter(background_jobs::priority.ge(0))
.for_update()
.skip_locked()
.load(conn)?;
.load(conn)
.await?;

Check warning on line 52 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L51-L52

Added lines #L51 - L52 were not covered by tests

let stalled_job_count = stalled_jobs.len();

Expand All @@ -63,12 +67,13 @@
}
};

log_and_trigger_event(event)?;
spawn_blocking(move || log_and_trigger_event(event)).await?;

Check warning on line 70 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L70

Added line #L70 was not covered by tests

Ok(())
}

/// Check for an `update_downloads` job that has run longer than expected
fn check_stalled_update_downloads(conn: &mut PgConnection) -> Result<()> {
async fn check_stalled_update_downloads(conn: &mut AsyncPgConnection) -> Result<()> {

Check warning on line 76 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L76

Added line #L76 was not covered by tests
use chrono::{DateTime, NaiveDateTime, Utc};

const EVENT_KEY: &str = "update_downloads_stalled";
Expand All @@ -81,28 +86,35 @@
let start_time: Result<NaiveDateTime, _> = background_jobs::table
.filter(background_jobs::job_type.eq(jobs::UpdateDownloads::JOB_NAME))
.select(background_jobs::created_at)
.first(conn);
.first(conn)
.await;

Check warning on line 90 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L89-L90

Added lines #L89 - L90 were not covered by tests

if let Ok(start_time) = start_time {
let start_time = DateTime::<Utc>::from_naive_utc_and_offset(start_time, Utc);
let minutes = Utc::now().signed_duration_since(start_time).num_minutes();

if minutes > max_job_time {
return log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {minutes} minutes"),
});
return spawn_blocking(move || {
log_and_trigger_event(on_call::Event::Trigger {
incident_key: Some(EVENT_KEY.into()),
description: format!("update_downloads job running for {minutes} minutes"),
})
})
.await;

Check warning on line 103 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L97-L103

Added lines #L97 - L103 were not covered by tests
}
};

log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
spawn_blocking(move || {
log_and_trigger_event(on_call::Event::Resolve {
incident_key: EVENT_KEY.into(),
description: Some("No stalled update_downloads job".into()),
})

Check warning on line 111 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L107-L111

Added lines #L107 - L111 were not covered by tests
})
.await

Check warning on line 113 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L113

Added line #L113 was not covered by tests
}

/// Check for known spam patterns
fn check_spam_attack(conn: &mut PgConnection) -> Result<()> {
async fn check_spam_attack(conn: &mut AsyncPgConnection) -> Result<()> {

Check warning on line 117 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L117

Added line #L117 was not covered by tests
use crates_io::sql::canon_crate_name;

const EVENT_KEY: &str = "spam_attack";
Expand All @@ -121,6 +133,7 @@
.filter(canon_crate_name(crates::name).eq_any(bad_crate_names))
.select(crates::name)
.first(conn)
.await

Check warning on line 136 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L136

Added line #L136 was not covered by tests
.optional()?;

if let Some(bad_crate) = bad_crate {
Expand All @@ -139,7 +152,7 @@
}
};

log_and_trigger_event(event)?;
spawn_blocking(move || log_and_trigger_event(event)).await?;

Check warning on line 155 in src/bin/monitor.rs

View check run for this annotation

Codecov / codecov/patch

src/bin/monitor.rs#L155

Added line #L155 was not covered by tests
Ok(())
}

Expand Down
14 changes: 13 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use diesel::{Connection, ConnectionResult, PgConnection, QueryResult};
use diesel_async::pooled_connection::deadpool::{Hook, HookError};
use diesel_async::pooled_connection::ManagerConfig;
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
use native_tls::{Certificate, TlsConnector};
use postgres_native_tls::MakeTlsConnector;
use secrecy::ExposeSecret;
Expand All @@ -23,6 +23,18 @@
oneoff_connection_with_config(&config).map_err(Into::into)
}

pub async fn oneoff_async_connection_with_config(
config: &config::DatabasePools,
) -> ConnectionResult<AsyncPgConnection> {
let url = connection_url(config, config.primary.url.expose_secret());
AsyncPgConnection::establish(&url).await
}

Check warning on line 31 in src/db.rs

View check run for this annotation

Codecov / codecov/patch

src/db.rs#L26-L31

Added lines #L26 - L31 were not covered by tests

pub async fn oneoff_async_connection() -> anyhow::Result<AsyncPgConnection> {
let config = config::DatabasePools::full_from_environment(&config::Base::from_environment()?)?;
Ok(oneoff_async_connection_with_config(&config).await?)
}

Check warning on line 36 in src/db.rs

View check run for this annotation

Codecov / codecov/patch

src/db.rs#L33-L36

Added lines #L33 - L36 were not covered by tests

pub fn connection_url(config: &config::DatabasePools, url: &str) -> String {
let mut url = Url::parse(url).expect("Invalid database URL");

Expand Down