Skip to content

Commit ad89f26

Browse files
committed
add subcommand to migrate old CBOR archive indexes to SQLite
1 parent 31b02e2 commit ad89f26

File tree

4 files changed

+199
-76
lines changed

4 files changed

+199
-76
lines changed

src/bin/cratesfyi.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use docs_rs::utils::{
1515
remove_crate_priority, set_crate_priority, ConfigName,
1616
};
1717
use docs_rs::{
18-
start_background_metrics_webserver, start_web_server, BuildQueue, Config, Context, Index,
19-
InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage,
18+
migrate_old_archive_indexes, start_background_metrics_webserver, start_web_server, BuildQueue,
19+
Config, Context, Index, InstanceMetrics, PackageKind, RustwideBuilder, ServiceMetrics, Storage,
2020
};
2121
use humantime::Duration;
2222
use once_cell::sync::OnceCell;
@@ -482,6 +482,9 @@ enum DatabaseSubcommand {
482482
/// Backfill GitHub/Gitlab stats for crates.
483483
BackfillRepositoryStats,
484484

485+
/// migrate the old CBOR archive index files to SQLIte
486+
MigrateArchiveIndex,
487+
485488
/// Updates info for a crate from the registry's API
486489
UpdateCrateRegistryFields {
487490
#[arg(name = "CRATE")]
@@ -533,6 +536,10 @@ impl DatabaseSubcommand {
533536
ctx.repository_stats_updater()?.update_all_crates()?;
534537
}
535538

539+
Self::MigrateArchiveIndex => {
540+
migrate_old_archive_indexes(&*ctx.storage()?, &mut *ctx.conn()?)?;
541+
}
542+
536543
Self::BackfillRepositoryStats => {
537544
ctx.repository_stats_updater()?.backfill_repositories()?;
538545
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub use self::docbuilder::PackageKind;
99
pub use self::docbuilder::RustwideBuilder;
1010
pub use self::index::Index;
1111
pub use self::metrics::{InstanceMetrics, ServiceMetrics};
12+
pub use self::storage::migrate_old_archive_indexes;
1213
pub use self::storage::Storage;
1314
pub use self::web::{start_background_metrics_webserver, start_web_server};
1415

src/storage/archive_index.rs

Lines changed: 121 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use crate::error::Result;
22
use crate::storage::{compression::CompressionAlgorithm, FileRange};
3-
use anyhow::{bail, Context as _};
3+
use anyhow::Context as _;
44
use memmap2::MmapOptions;
55
use rusqlite::{Connection, OptionalExtension};
66
use serde::de::DeserializeSeed;
77
use serde::de::{IgnoredAny, MapAccess, Visitor};
88
use serde::{Deserialize, Deserializer, Serialize};
9+
use std::io::BufReader;
910
use std::{collections::HashMap, fmt, fs, fs::File, io, io::Read, path::Path};
11+
use tempfile::TempPath;
1012

1113
use super::sqlite_pool::SqliteConnectionPool;
1214

1315
static SQLITE_FILE_HEADER: &[u8] = b"SQLite format 3\0";
1416

15-
#[derive(Deserialize, Serialize)]
17+
#[derive(Deserialize, Serialize, PartialEq, Eq, Debug)]
1618
pub(crate) struct FileInfo {
1719
range: FileRange,
1820
compression: CompressionAlgorithm,
@@ -27,63 +29,87 @@ impl FileInfo {
2729
}
2830
}
2931

30-
#[derive(Serialize)]
32+
#[derive(Deserialize, Serialize)]
3133
struct Index {
3234
files: HashMap<String, FileInfo>,
3335
}
3436

35-
/// create an archive index based on a zipfile.
36-
///
37-
/// Will delete the destination file if it already exists.
38-
pub(crate) fn create<R: io::Read + io::Seek, P: AsRef<Path>>(
39-
zipfile: &mut R,
40-
destination: P,
41-
) -> Result<()> {
42-
if destination.as_ref().exists() {
43-
fs::remove_file(&destination)?;
44-
}
37+
impl Index {
38+
pub(crate) fn write_sqlite<P: AsRef<Path>>(&self, destination: P) -> Result<()> {
39+
let destination = destination.as_ref();
40+
if destination.exists() {
41+
fs::remove_file(destination)?;
42+
}
4543

46-
let mut archive = zip::ZipArchive::new(zipfile)?;
44+
let conn = rusqlite::Connection::open(destination)?;
45+
conn.execute("PRAGMA synchronous = FULL", ())?;
46+
conn.execute("BEGIN", ())?;
47+
conn.execute(
48+
"
49+
CREATE TABLE files (
50+
id INTEGER PRIMARY KEY,
51+
path TEXT UNIQUE,
52+
start INTEGER,
53+
end INTEGER,
54+
compression INTEGER
55+
);
56+
",
57+
(),
58+
)?;
4759

48-
let conn = rusqlite::Connection::open(&destination)?;
49-
conn.execute("PRAGMA synchronous = FULL", ())?;
50-
conn.execute("BEGIN", ())?;
51-
conn.execute(
52-
"
53-
CREATE TABLE files (
54-
id INTEGER PRIMARY KEY,
55-
path TEXT UNIQUE,
56-
start INTEGER,
57-
end INTEGER,
58-
compression INTEGER
59-
);
60-
",
61-
(),
62-
)?;
60+
for (name, info) in self.files.iter() {
61+
conn.execute(
62+
"INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)",
63+
(
64+
name,
65+
info.range.start(),
66+
info.range.end(),
67+
info.compression as i32,
68+
),
69+
)?;
70+
}
6371

64-
for i in 0..archive.len() {
65-
let zf = archive.by_index(i)?;
72+
conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?;
73+
conn.execute("END", ())?;
74+
conn.execute("VACUUM", ())?;
75+
Ok(())
76+
}
6677

67-
let compression_bzip = CompressionAlgorithm::Bzip2 as i32;
78+
pub(crate) fn from_zip<R: io::Read + io::Seek>(zipfile: &mut R) -> Result<Self> {
79+
let mut archive = zip::ZipArchive::new(zipfile)?;
6880

69-
conn.execute(
70-
"INSERT INTO files (path, start, end, compression) VALUES (?, ?, ?, ?)",
71-
(
72-
zf.name(),
73-
zf.data_start(),
74-
zf.data_start() + zf.compressed_size() - 1,
75-
match zf.compression() {
76-
zip::CompressionMethod::Bzip2 => compression_bzip,
77-
c => bail!("unsupported compression algorithm {} in zip-file", c),
81+
let mut index = Index {
82+
files: HashMap::with_capacity(archive.len()),
83+
};
84+
85+
for i in 0..archive.len() {
86+
let zf = archive.by_index(i)?;
87+
88+
index.files.insert(
89+
zf.name().to_owned(),
90+
FileInfo {
91+
range: FileRange::new(
92+
zf.data_start(),
93+
zf.data_start() + zf.compressed_size() - 1,
94+
),
95+
compression: CompressionAlgorithm::Bzip2,
7896
},
79-
),
80-
)?;
97+
);
98+
}
99+
Ok(index)
81100
}
101+
}
82102

83-
conn.execute("CREATE INDEX idx_files_path ON files (path);", ())?;
84-
conn.execute("END", ())?;
85-
conn.execute("VACUUM", ())?;
86-
103+
/// create an archive index based on a zipfile.
104+
///
105+
/// Will delete the destination file if it already exists.
106+
pub(crate) fn create<R: io::Read + io::Seek, P: AsRef<Path>>(
107+
zipfile: &mut R,
108+
destination: P,
109+
) -> Result<()> {
110+
Index::from_zip(zipfile)?
111+
.write_sqlite(&destination)
112+
.context("error writing SQLite index")?;
87113
Ok(())
88114
}
89115

@@ -227,7 +253,7 @@ fn find_in_sqlite_index(conn: &Connection, search_for: &str) -> Result<Option<Fi
227253
/// > OFFSET SIZE DESCRIPTION
228254
/// > 0 16 Header string: "SQLite format 3\000"
229255
/// > [...]
230-
fn is_sqlite_file<P: AsRef<Path>>(archive_index_path: P) -> Result<bool> {
256+
pub(crate) fn is_sqlite_file<P: AsRef<Path>>(archive_index_path: P) -> Result<bool> {
231257
let mut f = File::open(archive_index_path)?;
232258

233259
let mut buffer = [0; 16];
@@ -259,6 +285,20 @@ pub(crate) fn find_in_file<P: AsRef<Path>>(
259285
}
260286
}
261287

288+
pub(crate) fn convert_to_sqlite_index<P: AsRef<Path>>(path: P) -> Result<TempPath> {
289+
let path = path.as_ref();
290+
let index: Index = { serde_cbor::from_reader(BufReader::new(File::open(path)?))? };
291+
292+
// write the new index into a temporary file so reads from ongoing requests
293+
// can continue on the old index until the new one is fully written.
294+
let tmp_path = tempfile::NamedTempFile::new()?.into_temp_path();
295+
index
296+
.write_sqlite(&tmp_path)
297+
.context("error writing SQLite index")?;
298+
299+
Ok(tmp_path)
300+
}
301+
262302
#[cfg(test)]
263303
mod tests {
264304
use super::*;
@@ -270,29 +310,7 @@ mod tests {
270310
zipfile: &mut R,
271311
writer: &mut W,
272312
) -> Result<()> {
273-
let mut archive = zip::ZipArchive::new(zipfile)?;
274-
275-
// get file locations
276-
let mut files: HashMap<String, FileInfo> = HashMap::with_capacity(archive.len());
277-
for i in 0..archive.len() {
278-
let zf = archive.by_index(i)?;
279-
280-
files.insert(
281-
zf.name().to_string(),
282-
FileInfo {
283-
range: FileRange::new(
284-
zf.data_start(),
285-
zf.data_start() + zf.compressed_size() - 1,
286-
),
287-
compression: match zf.compression() {
288-
zip::CompressionMethod::Bzip2 => CompressionAlgorithm::Bzip2,
289-
c => bail!("unsupported compression algorithm {} in zip-file", c),
290-
},
291-
},
292-
);
293-
}
294-
295-
serde_cbor::to_writer(writer, &Index { files }).context("serialization error")
313+
serde_cbor::to_writer(writer, &Index::from_zip(zipfile)?).context("serialization error")
296314
}
297315

298316
fn create_test_archive() -> fs::File {
@@ -312,6 +330,38 @@ mod tests {
312330
tf
313331
}
314332

333+
#[test]
334+
fn convert_to_sqlite() {
335+
let mut tf = create_test_archive();
336+
let mut cbor_buf = Vec::new();
337+
create_cbor_index(&mut tf, &mut cbor_buf).unwrap();
338+
let mut cbor_index_file = tempfile::NamedTempFile::new().unwrap();
339+
io::copy(&mut &cbor_buf[..], &mut cbor_index_file).unwrap();
340+
341+
assert!(!is_sqlite_file(&cbor_index_file).unwrap());
342+
343+
let original_fi = find_in_file(
344+
cbor_index_file.path(),
345+
"testfile1",
346+
&SqliteConnectionPool::default(),
347+
)
348+
.unwrap()
349+
.unwrap();
350+
351+
let sqlite_index_file = convert_to_sqlite_index(cbor_index_file).unwrap();
352+
assert!(is_sqlite_file(&sqlite_index_file).unwrap());
353+
354+
let migrated_fi = find_in_file(
355+
sqlite_index_file,
356+
"testfile1",
357+
&SqliteConnectionPool::default(),
358+
)
359+
.unwrap()
360+
.unwrap();
361+
362+
assert_eq!(migrated_fi, original_fi);
363+
}
364+
315365
#[test]
316366
fn index_create_save_load_cbor_direct() {
317367
let mut tf = create_test_archive();

0 commit comments

Comments
 (0)