Skip to content

Commit 28a91ed

Browse files
committed
assembly blobs from parts
1 parent 4945996 commit 28a91ed

5 files changed

+138
-19
lines changed

.sqlx/query-270d3344d0fc55c4fa32bc97d77882026371af8105ede628970a807131a13660.json

Lines changed: 46 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-52f93a6666babec678cdf8eb611921189c0f160fcc28a435370355d0d1521dde.json

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/.sqlx/query-7164172dcc7008ff76bb70e2b57f96331170ebe76a013003892950ed78e023ab.json renamed to .sqlx/query-7164172dcc7008ff76bb70e2b57f96331170ebe76a013003892950ed78e023ab.json

File renamed without changes.

service/.sqlx/query-940a4f83c63c73e96c0325718904c021084dfc40649711712aaadd1b655f8390.json renamed to .sqlx/query-940a4f83c63c73e96c0325718904c021084dfc40649711712aaadd1b655f8390.json

File renamed without changes.

service/src/lib.rs

Lines changed: 77 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use std::fs::OpenOptions;
77
use std::io::{Read, Seek as _, SeekFrom, Write};
88
use std::path::{Path, PathBuf};
9-
use std::sync::{Arc, Mutex};
9+
use std::sync::Mutex;
1010

1111
use sqlx::PgPool;
1212
use uuid::Uuid;
@@ -15,6 +15,16 @@ mod db;
1515

1616
pub use db::initialize_db;
1717

18+
#[allow(unused)]
19+
#[derive(Debug)]
20+
struct Part {
21+
part_size: i32,
22+
compression: i16,
23+
compressed_size: i32,
24+
segment_id: Uuid,
25+
segment_offset: i32,
26+
}
27+
1828
pub struct StorageService {
1929
db: PgPool,
2030
path: PathBuf,
@@ -30,6 +40,37 @@ impl StorageService {
3040
}
3141
}
3242

43+
pub async fn assemble_file_from_parts(&self, id: &str, parts: &[i64]) -> anyhow::Result<()> {
44+
sqlx::query!(
45+
r#"
46+
INSERT INTO blobs
47+
(id, parts)
48+
VALUES
49+
($1, $2);"#,
50+
id,
51+
parts,
52+
)
53+
.execute(&self.db)
54+
.await?;
55+
56+
Ok(())
57+
}
58+
59+
async fn get_file_parts(&self, id: &str) -> anyhow::Result<Vec<Part>> {
60+
let parts = sqlx::query_as!(
61+
Part,
62+
r#"
63+
SELECT p.part_size, p."compression", p.compressed_size, p.segment_id, p.segment_offset
64+
FROM (SELECT unnest(parts) AS id FROM blobs WHERE id = $1) AS part_id
65+
JOIN parts AS p ON p.id = part_id.id;"#,
66+
id,
67+
)
68+
.fetch_all(&self.db)
69+
.await?;
70+
71+
Ok(parts)
72+
}
73+
3374
pub async fn put_part(&self, contents: &[u8]) -> anyhow::Result<i64> {
3475
let part_size = contents.len();
3576

@@ -44,7 +85,7 @@ impl StorageService {
4485
.create(true)
4586
.open(segment_path)?;
4687

47-
let segment_offset = segment_file.stream_position()?;
88+
let segment_offset = segment_file.seek(SeekFrom::End(0))?;
4889
segment_file.write_all(contents)?;
4990
segment_file.sync_data()?;
5091
drop(segment_file);
@@ -68,15 +109,7 @@ impl StorageService {
68109
Ok(id.id)
69110
}
70111

71-
pub async fn get_part(&self, id: i64) -> anyhow::Result<Option<Arc<[u8]>>> {
72-
#[allow(unused)]
73-
struct Part {
74-
part_size: i32,
75-
compression: i16,
76-
compressed_size: i32,
77-
segment_id: Uuid,
78-
segment_offset: i32,
79-
}
112+
pub async fn get_part(&self, id: i64) -> anyhow::Result<Option<Vec<u8>>> {
80113
let Some(part) = sqlx::query_as!(
81114
Part,
82115
r#"
@@ -91,20 +124,22 @@ impl StorageService {
91124
return Ok(None);
92125
};
93126

127+
let contents = self.get_part_from_storage(&part).await?;
128+
Ok(Some(contents))
129+
}
130+
131+
async fn get_part_from_storage(&self, part: &Part) -> anyhow::Result<Vec<u8>> {
94132
let segment_path = self.path.join(format!("{}.bin", part.segment_id));
95-
let mut segment_file = OpenOptions::new().read(true).open(segment_path).unwrap();
133+
let mut segment_file = OpenOptions::new().read(true).open(segment_path)?;
96134

97-
segment_file
98-
.seek(SeekFrom::Start(part.segment_offset as u64))
99-
.unwrap();
135+
segment_file.seek(SeekFrom::Start(part.segment_offset as u64))?;
100136

101137
let mut buf = vec![0; part.part_size as usize];
102-
segment_file.read_exact(&mut buf).unwrap();
138+
segment_file.read_exact(&mut buf)?;
103139

104140
drop(segment_file);
105141

106-
// FIXME: this reallocates. why the hell can’t we read into a `Arc<[MaybeUninit]>`?
107-
Ok(Some(buf.into()))
142+
Ok(buf)
108143
}
109144
}
110145

@@ -121,6 +156,29 @@ mod tests {
121156
let id = service.put_part(blob).await.unwrap();
122157

123158
let got_blob = service.get_part(id).await.unwrap();
124-
assert_eq!(got_blob.unwrap().as_ref(), blob);
159+
assert_eq!(got_blob.unwrap(), blob);
160+
}
161+
162+
#[sqlx::test]
163+
async fn stores_blob_as_parts(db: PgPool) {
164+
let tempdir = tempfile::tempdir().unwrap();
165+
let service = StorageService::new(db, tempdir.path());
166+
167+
let part1 = service.put_part(b"oh ").await.unwrap();
168+
let part2 = service.put_part(b"hai!").await.unwrap();
169+
service
170+
.assemble_file_from_parts("some/file/id", &[part1, part2])
171+
.await
172+
.unwrap();
173+
174+
let parts = service.get_file_parts("some/file/id").await.unwrap();
175+
176+
let mut contents = vec![];
177+
for part in parts {
178+
let part = service.get_part_from_storage(&part).await.unwrap();
179+
contents.extend_from_slice(&part);
180+
}
181+
182+
assert_eq!(contents, b"oh hai!");
125183
}
126184
}

0 commit comments

Comments
 (0)