Skip to content

Commit 6eba75b

Browse files
committed
Command to retrieve object
1 parent c809474 commit 6eba75b

File tree

5 files changed

+116
-0
lines changed

5 files changed

+116
-0
lines changed

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ cd s3-cas
1818
cargo build --release --features binary
1919
```
2020

21+
## Running
22+
23+
```console
24+
s3-cas server --access-key=MY_KEY --secret-key=MY_SECRET --fs-root=/tmp/s3/fs --meta-root=/tmp/s3/meta
25+
```
26+
2127
## Inline metadata
2228

2329
Objects smaller than or equal to a configurable threshold can be stored directly in their metadata records,

src/cas/fs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ impl CasFS {
155155
self.meta_store.get_path_tree()
156156
}
157157

158+
pub fn fs_root(&self) -> &PathBuf {
159+
&self.root
160+
}
161+
158162
pub fn max_inlined_data_length(&self) -> usize {
159163
self.meta_store.max_inlined_data_length()
160164
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ pub mod cas;
55
pub mod inspect;
66
pub mod metastore;
77
pub mod metrics;
8+
pub mod retrieve;
89
pub mod s3fs;

src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tracing_subscriber::FmtSubscriber;
1111
use s3_cas::cas::{CasFS, StorageEngine};
1212
use s3_cas::inspect::{disk_space, num_keys};
1313
use s3_cas::metastore::Durability;
14+
use s3_cas::retrieve::{retrieve, RetrieveConfig};
1415

1516
#[derive(Parser)]
1617
#[command(version)]
@@ -81,6 +82,8 @@ pub enum Command {
8182
command: InspectCommand,
8283
},
8384

85+
Retrieve(RetrieveConfig),
86+
8487
/// Start S3-cas server
8588
Server(ServerConfig),
8689
}
@@ -120,6 +123,7 @@ fn main() -> Result<()> {
120123
println!("Disk space: {}", disk_space);
121124
}
122125
},
126+
Command::Retrieve(config) => retrieve(config)?,
123127
Command::Server(config) => {
124128
run(config)?;
125129
}

src/retrieve.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::path::PathBuf;
2+
3+
use anyhow::Result;
4+
use bytes::Bytes;
5+
use clap::Parser;
6+
use futures::StreamExt;
7+
use tokio::io::AsyncWriteExt;
8+
9+
use crate::cas::block_stream::BlockStream;
10+
use crate::cas::range_request::RangeRequest;
11+
use crate::cas::CasFS;
12+
use crate::cas::StorageEngine;
13+
use crate::metrics::SharedMetrics;
14+
15+
#[derive(Parser, Debug)]
16+
pub struct RetrieveConfig {
17+
#[arg(long, default_value = ".")]
18+
pub meta_root: PathBuf,
19+
20+
#[arg(long, default_value = ".")]
21+
pub fs_root: PathBuf,
22+
23+
#[arg(
24+
long,
25+
default_value = "fjall",
26+
help = "Metadata DB (fjall, fjall_notx)"
27+
)]
28+
pub metadata_db: StorageEngine,
29+
30+
#[arg(required = true, help = "Bucket name")]
31+
pub bucket: String,
32+
33+
#[arg(required = true, help = "Object key")]
34+
pub key: String,
35+
36+
#[arg(required = true, help = "Destination file path")]
37+
pub dest: String,
38+
}
39+
40+
#[tokio::main]
41+
pub async fn retrieve(args: RetrieveConfig) -> Result<()> {
42+
tracing::info!(
43+
"Retrieving object from bucket: {}, key: {}",
44+
args.bucket,
45+
args.key
46+
);
47+
let storage_engine = args.metadata_db;
48+
let metrics = SharedMetrics::new();
49+
let casfs = CasFS::new(
50+
args.fs_root.clone(),
51+
args.meta_root.clone(),
52+
metrics.clone(),
53+
storage_engine,
54+
None,
55+
None,
56+
);
57+
58+
tracing::info!("get_object_blocks");
59+
let (obj_meta, blocks) = match casfs.get_object_blocks(&args.bucket, &args.key)? {
60+
Some((obj, blocks)) => (obj, blocks),
61+
None => {
62+
println!("Object not found");
63+
return Ok(());
64+
}
65+
};
66+
67+
tracing::info!("Object found, size: {}", obj_meta.size());
68+
if let Some(data) = obj_meta.inlined() {
69+
let mut file = tokio::fs::File::create(&args.dest).await?;
70+
file.write_all(data).await?;
71+
return Ok(());
72+
}
73+
74+
let mut paths = Vec::with_capacity(blocks.len());
75+
let mut block_size = 0;
76+
for block in blocks {
77+
block_size += block.size();
78+
paths.push((block.disk_path(casfs.fs_root().clone()), block.size()));
79+
tracing::info!("block path: {:?}", block.disk_path(casfs.fs_root().clone()));
80+
}
81+
82+
debug_assert!(obj_meta.size() as usize == block_size);
83+
tracing::info!("creating block stream");
84+
let mut block_stream = BlockStream::new(paths, block_size, RangeRequest::All, metrics);
85+
86+
// Create the destination file
87+
tracing::info!("creating destination file: {}", args.dest);
88+
let mut file = tokio::fs::File::create(&args.dest).await?;
89+
90+
// Read from block stream and write to file
91+
while let Some(chunk_result) = block_stream.next().await {
92+
let chunk: Bytes = chunk_result?;
93+
tracing::info!("writing chunk of size: {}", chunk.len());
94+
file.write_all(&chunk).await?;
95+
}
96+
97+
// Ensure all data is written to disk
98+
file.flush().await?;
99+
100+
Ok(())
101+
}

0 commit comments

Comments
 (0)