diff --git a/Cargo.lock b/Cargo.lock index 4278ff82b..7d5cfa7a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2002,7 +2002,6 @@ dependencies = [ "rayon", "regex", "reqwest", - "rusqlite", "rustwide", "semver", "sentry", @@ -2171,18 +2170,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "fallible-iterator" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" - -[[package]] -name = "fallible-streaming-iterator" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" - [[package]] name = "faster-hex" version = "0.10.0" @@ -4006,15 +3993,6 @@ dependencies = [ "foldhash", ] -[[package]] -name = "hashlink" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" -dependencies = [ - "hashbrown 0.14.5", -] - [[package]] name = "hashlink" version = "0.10.0" @@ -6080,20 +6058,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "rusqlite" -version = "0.32.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" -dependencies = [ - "bitflags 2.9.4", - "fallible-iterator", - "fallible-streaming-iterator", - "hashlink 0.9.1", - "libsqlite3-sys", - "smallvec", -] - [[package]] name = "rustc-demangle" version = "0.1.26" @@ -6953,7 +6917,7 @@ dependencies = [ "futures-io", "futures-util", "hashbrown 0.15.5", - "hashlink 0.10.0", + "hashlink", "indexmap 2.11.1", "log", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 4035a73a8..4d431018f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ crates-index-diff = { version = "28.0.0", features = [ "max-performance" ]} reqwest = { version = "0.12", features = ["json", "gzip"] } semver = { version = "1.0.4", features = ["serde"] } slug = "0.1.1" -sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "chrono" ] } +sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "sqlite", "chrono" ] } url = { version = "2.1.1", features = ["serde"] } docsrs-metadata = { path = "crates/metadata" } anyhow = { version = "1.0.42", features = ["backtrace"]} @@ -58,7 +58,6 @@ zip = {version = "5.1.1", default-features = false, features = ["bzip2"]} bzip2 = "0.6.0" getrandom = "0.3.1" itertools = { version = "0.14.0" } -rusqlite = { version = "0.32.1", features = ["bundled"] } hex = "0.4.3" derive_more = { version = "2.0.0", features = ["display"] } diff --git a/src/storage/archive_index.rs b/src/storage/archive_index.rs index 96843bf12..ca0793ed7 100644 --- a/src/storage/archive_index.rs +++ b/src/storage/archive_index.rs @@ -1,7 +1,8 @@ use crate::error::Result; use crate::storage::{FileRange, compression::CompressionAlgorithm}; use anyhow::{Context as _, bail}; -use rusqlite::{Connection, OpenFlags, OptionalExtension}; +use itertools::Itertools as _; +use sqlx::{Acquire as _, QueryBuilder, Row as _, Sqlite}; use std::{fs, io, path::Path}; use tracing::instrument; @@ -20,24 +21,56 @@ impl FileInfo { } } +/// crates a new empty SQLite database, and returns a configured connection +/// pool to connect to the DB. +/// Any existing DB at the given path will be deleted first. +async fn sqlite_create>(path: P) -> Result { + let path = path.as_ref(); + if path.exists() { + fs::remove_file(path)?; + } + + sqlx::SqlitePool::connect_with( + sqlx::sqlite::SqliteConnectOptions::new() + .filename(path) + .read_only(false) + .pragma("synchronous", "full") + .create_if_missing(true), + ) + .await + .map_err(Into::into) +} + +/// open existing SQLite database, return a configured connection poll +/// to connect to the DB. +/// Will error when the database doesn't exist at that path. +async fn sqlite_open>(path: P) -> Result { + sqlx::SqlitePool::connect_with( + sqlx::sqlite::SqliteConnectOptions::new() + .filename(path) + .read_only(true) + .pragma("synchronous", "off") // not needed for readonly db + .serialized(false) // same as OPEN_NOMUTEX + .create_if_missing(false), + ) + .await + .map_err(Into::into) +} + /// create an archive index based on a zipfile. /// /// Will delete the destination file if it already exists. #[instrument(skip(zipfile))] -pub(crate) fn create + std::fmt::Debug>( +pub(crate) async fn create + std::fmt::Debug>( zipfile: &mut R, destination: P, ) -> Result<()> { - let destination = destination.as_ref(); - if destination.exists() { - fs::remove_file(destination)?; - } + let pool = sqlite_create(destination).await?; + let mut conn = pool.acquire().await?; + let mut tx = conn.begin().await?; - let conn = rusqlite::Connection::open(destination)?; - conn.execute("PRAGMA synchronous = FULL", ())?; - conn.execute("BEGIN", ())?; - conn.execute( - " + sqlx::query( + r#" CREATE TABLE files ( id INTEGER PRIMARY KEY, path TEXT UNIQUE, @@ -45,72 +78,99 @@ pub(crate) fn create + std::fmt::Debug>( end INTEGER, compression INTEGER ); - ", - (), - )?; + "#, + ) + .execute(&mut *tx) + .await?; let mut archive = zip::ZipArchive::new(zipfile)?; let compression_bzip = CompressionAlgorithm::Bzip2 as i32; - for i in 0..archive.len() { - let zf = archive.by_index(i)?; - - conn.execute( - "INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)", - ( - zf.name(), - zf.data_start(), - zf.data_start() + zf.compressed_size() - 1, - match zf.compression() { - zip::CompressionMethod::Bzip2 => compression_bzip, - c => bail!("unsupported compression algorithm {} in zip-file", c), - }, - ), - )?; + const CHUNKS: usize = 1000; + for chunk in &(0..archive.len()).chunks(CHUNKS) { + for i in chunk { + let mut insert_stmt = + QueryBuilder::::new("INSERT INTO files (path, start, end, compression) "); + + let entry = archive.by_index(i)?; + + let start = entry.data_start() as i64; + let end = (entry.data_start() + entry.compressed_size() - 1) as i64; + let compression_raw = match entry.compression() { + zip::CompressionMethod::Bzip2 => compression_bzip, + c => bail!("unsupported compression algorithm {} in zip-file", c), + }; + + insert_stmt.push_values([()], |mut b, _| { + b.push_bind(entry.name()) + .push_bind(start) + .push_bind(end) + .push_bind(compression_raw); + }); + insert_stmt + .build() + .persistent(false) + .execute(&mut *tx) + .await?; + } } - conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?; - conn.execute("END", ())?; - conn.execute("VACUUM", ())?; + + sqlx::query("CREATE INDEX idx_files_path ON files (path);") + .execute(&mut *tx) + .await?; + + // Commit the transaction before VACUUM (VACUUM cannot run inside a transaction) + tx.commit().await?; + + // VACUUM outside the transaction + sqlx::query("VACUUM").execute(&mut *conn).await?; + Ok(()) } -fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result> { - let mut stmt = conn.prepare( +async fn find_in_sqlite_index<'e, E>(executor: E, search_for: &str) -> Result> +where + E: sqlx::Executor<'e, Database = sqlx::Sqlite>, +{ + let row = sqlx::query( " SELECT start, end, compression FROM files WHERE path = ? ", - )?; - - stmt.query_row((search_for,), |row| { - let compression: i32 = row.get(2)?; - - Ok(FileInfo { - range: row.get(0)?..=row.get(1)?, - compression: compression.try_into().map_err(|value| { - rusqlite::Error::FromSqlConversionFailure( - 2, - rusqlite::types::Type::Integer, - format!("invalid compression algorithm '{value}' in database").into(), - ) + ) + .bind(search_for) + .fetch_optional(executor) + .await + .context("error fetching SQLite data")?; + + if let Some(row) = row { + let start: u64 = row.try_get(0)?; + let end: u64 = row.try_get(1)?; + let compression_raw: i32 = row.try_get(2)?; + + Ok(Some(FileInfo { + range: start..=end, + compression: compression_raw.try_into().map_err(|value| { + anyhow::anyhow!(format!( + "invalid compression algorithm '{value}' in database" + )) })?, - }) - }) - .optional() - .context("error fetching SQLite data") + })) + } else { + Ok(None) + } } #[instrument] -pub(crate) fn find_in_file + std::fmt::Debug>( +pub(crate) async fn find_in_file + std::fmt::Debug>( archive_index_path: P, search_for: &str, ) -> Result> { - let connection = Connection::open_with_flags( - archive_index_path, - OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, - )?; - find_in_sqlite_index(&connection, search_for) + let pool = sqlite_open(archive_index_path).await?; + let mut conn = pool.acquire().await?; + + find_in_sqlite_index(&mut *conn, search_for).await } #[cfg(test)] @@ -138,43 +198,38 @@ mod tests { tf } - #[test] - fn index_create_save_load_sqlite() { + #[tokio::test] + async fn index_create_save_load_sqlite() -> Result<()> { let mut tf = create_test_archive(1); let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); - create(&mut tf, &tempfile).unwrap(); + create(&mut tf, &tempfile).await?; - let fi = find_in_file(&tempfile, "testfile0").unwrap().unwrap(); + let fi = find_in_file(&tempfile, "testfile0").await?.unwrap(); assert_eq!(fi.range, FileRange::new(39, 459)); assert_eq!(fi.compression, CompressionAlgorithm::Bzip2); - assert!( - find_in_file(&tempfile, "some_other_file",) - .unwrap() - .is_none() - ); + assert!(find_in_file(&tempfile, "some_other_file",).await?.is_none()); + Ok(()) } - #[test] - fn archive_with_more_than_65k_files() { + #[tokio::test] + async fn archive_with_more_than_65k_files() -> Result<()> { let mut tf = create_test_archive(100_000); - let tempfile = tempfile::NamedTempFile::new().unwrap().into_temp_path(); - create(&mut tf, &tempfile).unwrap(); - - let connection = Connection::open_with_flags( - tempfile, - OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX, - ) - .unwrap(); - let mut stmt = connection.prepare("SELECT count(*) FROM files").unwrap(); - - let count = stmt - .query_row([], |row| Ok(row.get::<_, usize>(0))) - .unwrap() - .unwrap(); - assert_eq!(count, 100_000); + let tempfile = tempfile::NamedTempFile::new()?.into_temp_path(); + create(&mut tf, &tempfile).await?; + + let pool = sqlite_open(&tempfile).await?; + let mut conn = pool.acquire().await?; + + let row = sqlx::query("SELECT count(*) FROM files") + .fetch_one(&mut *conn) + .await?; + + assert_eq!(row.get::(0), 100_000); + + Ok(()) } } diff --git a/src/storage/compression.rs b/src/storage/compression.rs index 6ea093231..4c635fd06 100644 --- a/src/storage/compression.rs +++ b/src/storage/compression.rs @@ -7,6 +7,7 @@ use std::{ io::{self, Read}, }; use strum::{Display, EnumIter, EnumString, FromRepr}; +use tokio::io::{AsyncRead, AsyncWrite}; pub type CompressionAlgorithms = HashSet; @@ -89,6 +90,52 @@ pub fn compress(content: impl Read, algorithm: CompressionAlgorithm) -> Result( + output_sink: impl AsyncWrite + Unpin + Send + 'a, + algorithm: CompressionAlgorithm, +) -> Box { + use async_compression::tokio::write; + use tokio::io; + + match algorithm { + CompressionAlgorithm::Zstd => { + Box::new(io::BufWriter::new(write::ZstdEncoder::new(output_sink))) + } + CompressionAlgorithm::Bzip2 => { + Box::new(io::BufWriter::new(write::BzEncoder::new(output_sink))) + } + CompressionAlgorithm::Gzip => { + Box::new(io::BufWriter::new(write::GzipEncoder::new(output_sink))) + } + } +} + +/// Wrap an AsyncRead for decompression. +/// +/// You provide an AsyncRead that gives us the compressed data. With the +/// wrapper we return you can then read decompressed data from the wrapper. +pub fn wrap_reader_for_decompression<'a>( + input: impl AsyncRead + Unpin + Send + 'a, + algorithm: CompressionAlgorithm, +) -> Box { + use async_compression::tokio::bufread; + use tokio::io; + + match algorithm { + CompressionAlgorithm::Zstd => { + Box::new(bufread::ZstdDecoder::new(io::BufReader::new(input))) + } + CompressionAlgorithm::Bzip2 => Box::new(bufread::BzDecoder::new(io::BufReader::new(input))), + CompressionAlgorithm::Gzip => { + Box::new(bufread::GzipDecoder::new(io::BufReader::new(input))) + } + } +} + pub fn decompress( content: impl Read, algorithm: CompressionAlgorithm, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e4e696474..1b4dcdd92 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,6 +4,7 @@ mod database; mod s3; pub use self::compression::{CompressionAlgorithm, CompressionAlgorithms, compress, decompress}; +use self::compression::{wrap_reader_for_decompression, wrap_writer_for_compression}; use self::database::DatabaseBackend; use self::s3::S3Backend; use crate::{ @@ -25,7 +26,7 @@ use path_slash::PathExt; use std::{ fmt, fs::{self, File}, - io::{self, BufReader, Write as _}, + io::{self, BufReader}, num::ParseIntError, ops::RangeInclusive, path::{Path, PathBuf}, @@ -33,7 +34,7 @@ use std::{ }; use std::{iter, str::FromStr}; use tokio::{ - io::{AsyncRead, AsyncReadExt as _, AsyncWriteExt}, + io::{AsyncRead, AsyncWriteExt}, runtime::Runtime, }; use tracing::{error, info_span, instrument, trace}; @@ -88,23 +89,7 @@ impl StreamingBlob { return self; }; - match alg { - CompressionAlgorithm::Zstd => { - self.content = Box::new(async_compression::tokio::bufread::ZstdDecoder::new( - tokio::io::BufReader::new(self.content), - )) - } - CompressionAlgorithm::Bzip2 => { - self.content = Box::new(async_compression::tokio::bufread::BzDecoder::new( - tokio::io::BufReader::new(self.content), - )) - } - CompressionAlgorithm::Gzip => { - self.content = Box::new(async_compression::tokio::bufread::GzipDecoder::new( - tokio::io::BufReader::new(self.content), - )) - } - }; + self.content = wrap_reader_for_decompression(self.content, alg); self.compression = None; self } @@ -115,14 +100,7 @@ impl StreamingBlob { let mut content = crate::utils::sized_buffer::SizedBuffer::new(max_size); content.reserve(self.content_length); - let mut buf = [0u8; 8 * 1024]; - loop { - let n = self.content.read(&mut buf).await?; - if n == 0 { - break; - } - content.write_all(&buf[..n])?; - } + tokio::io::copy(&mut self.content, &mut content).await?; Ok(Blob { path: self.path, @@ -335,13 +313,9 @@ impl AsyncStorage { .download_archive_index(archive_path, latest_build_id) .await { - Ok(index_filename) => Ok({ - let path = path.to_owned(); - spawn_blocking(move || { - Ok(archive_index::find_in_file(index_filename, &path)?.is_some()) - }) + Ok(index_filename) => Ok(archive_index::find_in_file(index_filename, path) .await? - }), + .is_some()), Err(err) => { if err.downcast_ref::().is_some() { Ok(false) @@ -447,11 +421,9 @@ impl AsyncStorage { .download_archive_index(archive_path, latest_build_id) .await?; - let info = { - let path = path.to_owned(); - spawn_blocking(move || archive_index::find_in_file(index_filename, &path)).await - }? - .ok_or(PathNotFoundError)?; + let info = archive_index::find_in_file(index_filename, path) + .await? + .ok_or(PathNotFoundError)?; let blob = self .get_range( @@ -483,11 +455,9 @@ impl AsyncStorage { .download_archive_index(archive_path, latest_build_id) .await?; - let info = { - let path = path.to_owned(); - spawn_blocking(move || archive_index::find_in_file(index_filename, &path)).await - }? - .ok_or(PathNotFoundError)?; + let info = archive_index::find_in_file(index_filename, path) + .await? + .ok_or(PathNotFoundError)?; let blob = self .get_range_stream(archive_path, info.range(), Some(info.compression())) @@ -510,11 +480,10 @@ impl AsyncStorage { archive_path: &str, root_dir: &Path, ) -> Result<(Vec, CompressionAlgorithm)> { - let (zip_content, compressed_index_content, alg, remote_index_path, file_paths) = + let (mut zip_content, file_paths) = spawn_blocking({ let archive_path = archive_path.to_owned(); let root_dir = root_dir.to_owned(); - let temp_dir = self.config.temp_dir.clone(); move || { let mut file_paths = Vec::new(); @@ -530,7 +499,7 @@ impl AsyncStorage { // also has to be added as supported algorithm for storage compression, together // with a mapping in `storage::archive_index::Index::new_from_zip`. - let mut zip_content = { + let zip_content = { let _span = info_span!("create_zip_archive", %archive_path, root_dir=%root_dir.display()).entered(); @@ -550,32 +519,35 @@ impl AsyncStorage { zip.finish()?.into_inner() }; - let remote_index_path = format!("{}.index", &archive_path); - let alg = CompressionAlgorithm::default(); - let compressed_index_content = { - let _span = info_span!("create_archive_index", %remote_index_path).entered(); - - fs::create_dir_all(&temp_dir)?; - let local_index_path = - tempfile::NamedTempFile::new_in(&temp_dir)?.into_temp_path(); - archive_index::create( - &mut io::Cursor::new(&mut zip_content), - &local_index_path, - )?; - - compress(BufReader::new(fs::File::open(&local_index_path)?), alg)? - }; Ok(( zip_content, - compressed_index_content, - alg, - remote_index_path, - file_paths, + file_paths )) } }) .await?; + let alg = CompressionAlgorithm::default(); + let remote_index_path = format!("{}.index", &archive_path); + let compressed_index_content = { + let _span = info_span!("create_archive_index", %remote_index_path).entered(); + + tokio::fs::create_dir_all(&self.config.temp_dir).await?; + let local_index_path = + tempfile::NamedTempFile::new_in(&self.config.temp_dir)?.into_temp_path(); + + archive_index::create(&mut io::Cursor::new(&mut zip_content), &local_index_path) + .await?; + + let mut buf: Vec = Vec::new(); + tokio::io::copy( + &mut tokio::io::BufReader::new(tokio::fs::File::open(&local_index_path).await?), + &mut wrap_writer_for_compression(&mut buf, alg), + ) + .await?; + buf + }; + self.store_inner(vec![ Blob { path: archive_path.to_string(), diff --git a/src/utils/sized_buffer.rs b/src/utils/sized_buffer.rs index 981f8ad80..2c71b12ea 100644 --- a/src/utils/sized_buffer.rs +++ b/src/utils/sized_buffer.rs @@ -1,4 +1,9 @@ -use std::io::{Error as IoError, Write}; +use std::{ + io::{self, Write}, + pin::Pin, + task, +}; +use tokio::io::AsyncWrite; pub(crate) struct SizedBuffer { inner: Vec, @@ -27,19 +32,48 @@ impl SizedBuffer { } impl Write for SizedBuffer { - fn write(&mut self, buf: &[u8]) -> Result { + fn write(&mut self, buf: &[u8]) -> io::Result { if self.inner.len() + buf.len() > self.limit { - Err(IoError::other(crate::error::SizeLimitReached)) + Err(io::Error::other(crate::error::SizeLimitReached)) } else { self.inner.write(buf) } } - fn flush(&mut self) -> Result<(), IoError> { + fn flush(&mut self) -> io::Result<()> { self.inner.flush() } } +impl AsyncWrite for SizedBuffer { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &[u8], + ) -> task::Poll> { + let mut this = self.get_mut(); + match io::Write::write(&mut this, buf) { + Ok(n) => task::Poll::Ready(Ok(n)), + Err(e) => task::Poll::Ready(Err(e)), + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> task::Poll> { + let mut this = self.get_mut(); + match io::Write::flush(&mut this) { + Ok(()) => task::Poll::Ready(Ok(())), + Err(e) => task::Poll::Ready(Err(e)), + } + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> task::Poll> { + task::Poll::Ready(Ok(())) + } +} + #[cfg(test)] mod tests { use super::*;