Skip to content

Commit 6666a2f

Browse files
committed
DB inspect:
- add num_keys and disk_space to MetaStore trait - refactor get_object to be reusable - use clap to support sub command
1 parent 85734e3 commit 6666a2f

File tree

10 files changed

+151
-227
lines changed

10 files changed

+151
-227
lines changed

Cargo.lock

Lines changed: 49 additions & 181 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ async-fs = "2.1"
2323
faster-hex = "0.10.0"
2424
uuid = { version = "1.12", features = ["v4"] }
2525
chrono = "0.4"
26-
structopt = { version = "0.3.26" }
2726
anyhow = { version = "1.0.95" }
2827
dotenv = { version = "0.15" }
2928
openssl = { version = "0.10.68", features = ["vendored"], optional = true }
@@ -42,6 +41,7 @@ hyper-util = { version = "0.1.9", features = [
4241
rusoto_core = "0.48.0"
4342
hyper = { version = "1.6.0" }
4443
http-body-util = "0.1.2"
44+
clap = { version = "4.5.32", features = ["derive"] }
4545

4646

4747
[profile.release]

src/cas/fs.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,32 @@ impl CasFS {
200200
self.meta_store.get_meta(bucket_name, key)
201201
}
202202

203+
pub fn get_object_blocks(
204+
&self,
205+
bucket_name: &str,
206+
key: &str,
207+
) -> Result<Option<(Object, Vec<crate::metastore::Block>)>, MetaError> {
208+
let obj_meta = self.get_object_meta(bucket_name, key)?;
209+
let Some(obj_meta) = obj_meta else {
210+
return Ok(None);
211+
};
212+
213+
if obj_meta.is_inlined() {
214+
Ok(Some((obj_meta, vec![])))
215+
} else {
216+
let blocks = obj_meta.blocks();
217+
let block_map = self.block_tree()?;
218+
let mut block_vec = Vec::with_capacity(blocks.len());
219+
for block in blocks {
220+
let block_meta = block_map
221+
.get_block(block)?
222+
.ok_or(MetaError::BlockNotFound)?;
223+
block_vec.push(block_meta);
224+
}
225+
Ok(Some((obj_meta, block_vec)))
226+
}
227+
}
228+
203229
// create and insert a new bucket
204230
pub fn create_bucket(&self, bucket_name: &str) -> Result<(), MetaError> {
205231
let bm = BucketMeta::new(bucket_name.to_string());

src/main.rs

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,57 +2,67 @@ use std::path::PathBuf;
22

33
use anyhow::Result;
44
use bytes::Bytes;
5+
use clap::{Parser, Subcommand};
56
use http_body_util::Full;
67
use prometheus::Encoder;
7-
use structopt::StructOpt;
88
use tracing::{info, Level};
99
use tracing_subscriber::FmtSubscriber;
1010

1111
use s3_cas::cas::{CasFS, StorageEngine};
1212
use s3_cas::metastore::Durability;
1313

14-
#[derive(StructOpt)]
15-
struct Args {
16-
#[structopt(long, default_value = ".")]
14+
#[derive(Parser)]
15+
#[command(version)]
16+
struct Cli {
17+
#[arg(long, default_value = ".")]
1718
fs_root: PathBuf,
1819

19-
#[structopt(long, default_value = ".")]
20+
#[arg(long, default_value = ".")]
2021
meta_root: PathBuf,
2122

22-
#[structopt(long, default_value = "localhost")]
23+
#[arg(long, default_value = "localhost")]
2324
host: String,
2425

25-
#[structopt(long, default_value = "8014")]
26+
#[arg(long, default_value = "8014")]
2627
port: u16,
2728

28-
#[structopt(long, default_value = "localhost")]
29+
#[arg(long, default_value = "localhost")]
2930
metric_host: String,
3031

31-
#[structopt(long, default_value = "9100")]
32+
#[arg(long, default_value = "9100")]
3233
metric_port: u16,
3334

34-
#[structopt(long, help = "leave empty to disable it")]
35+
#[arg(long, help = "leave empty to disable it")]
3536
inline_metadata_size: Option<usize>,
3637

37-
#[structopt(long, requires("secret-key"), display_order = 1000)]
38+
#[arg(long, required = true, display_order = 1000)]
3839
access_key: Option<String>,
3940

40-
#[structopt(long, requires("access-key"), display_order = 1000)]
41+
#[arg(long, required = true, display_order = 1000)]
4142
secret_key: Option<String>,
4243

43-
#[structopt(
44+
#[arg(
4445
long,
4546
default_value = "fjall",
46-
help = "Metadata DB (fjall, fjall_notx)"
47+
help = "Metadata DB (fjall, fjall_notx)"
4748
)]
4849
metadata_db: StorageEngine,
4950

50-
#[structopt(
51+
#[arg(
5152
long,
5253
default_value = "fdatasync",
5354
help = "Durability level (buffer, fsync, fdatasync)"
5455
)]
5556
durability: Durability,
57+
58+
#[command(subcommand)]
59+
command: Option<Command>,
60+
}
61+
62+
#[derive(Debug, Subcommand)]
63+
pub enum Command {
64+
// TODO: inspect DB
65+
Inspect,
5666
}
5767

5868
fn setup_tracing() {
@@ -67,18 +77,24 @@ fn main() -> Result<()> {
6777
dotenv::dotenv().ok();
6878

6979
setup_tracing();
70-
71-
let args: Args = Args::from_args();
72-
73-
run(args)
80+
let cli = Cli::parse();
81+
match cli.command {
82+
Some(Command::Inspect) => {
83+
println!("Inspecting");
84+
}
85+
None => {
86+
run(cli)?;
87+
}
88+
}
89+
Ok(())
7490
}
7591

7692
use hyper_util::rt::{TokioExecutor, TokioIo};
7793
use hyper_util::server::conn::auto::Builder as ConnBuilder;
7894
use s3s::service::S3ServiceBuilder;
7995

8096
#[tokio::main]
81-
async fn run(args: Args) -> anyhow::Result<()> {
97+
async fn run(args: Cli) -> anyhow::Result<()> {
8298
let storage_engine = args.metadata_db;
8399

84100
// provider

src/metastore/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub enum MetaError {
4040
NotMetaTree(String),
4141
TransactionError(String),
4242
PersistError(String),
43+
BlockNotFound,
4344
OtherDBError(String),
4445
}
4546

@@ -59,6 +60,7 @@ impl fmt::Display for MetaError {
5960
MetaError::NotMetaTree(ref s) => write!(f, "Not a meta tree: {}", s),
6061
MetaError::TransactionError(ref s) => write!(f, "Transaction error: {}", s),
6162
MetaError::PersistError(ref s) => write!(f, "Persist error: {}", s),
63+
MetaError::BlockNotFound => write!(f, "Block not found"),
6264
MetaError::OtherDBError(ref s) => write!(f, "Other DB error: {}", s),
6365
}
6466
}

src/metastore/object.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,9 @@ impl Object {
143143
}
144144
}
145145

146+
pub fn is_inlined(&self) -> bool {
147+
matches!(&self.data, ObjectData::Inline { .. })
148+
}
146149
pub fn inlined(&self) -> Option<&Vec<u8>> {
147150
match &self.data {
148151
ObjectData::Inline { data } => Some(data),

src/metastore/stores/fjall.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@ impl MetaStore for FjallStore {
254254

255255
Box::new(FjallTransaction::new(tx, Arc::new(self.clone())))
256256
}
257+
258+
fn num_keys(&self) -> (usize, usize, usize) {
259+
unimplemented!();
260+
}
261+
262+
fn disk_space(&self) -> u64 {
263+
self.keyspace.disk_space()
264+
}
257265
}
258266

259267
pub struct FjallTransaction {

src/metastore/stores/fjall_notx.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,18 @@ impl MetaStore for FjallStoreNotx {
209209
fn begin_transaction(&self) -> Box<dyn Transaction> {
210210
Box::new(FjallNoTransaction::new(Arc::new(self.clone())))
211211
}
212+
213+
fn num_keys(&self) -> (usize, usize, usize) {
214+
(
215+
self.bucket_partition.approximate_len(),
216+
self.block_partition.approximate_len(),
217+
self.path_partition.approximate_len(),
218+
)
219+
}
220+
221+
fn disk_space(&self) -> u64 {
222+
self.keyspace.disk_space()
223+
}
212224
}
213225

214226
// FjallNoTransaction is fjall without real transaction support.

src/metastore/traits.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ pub trait MetaStore: Send + Sync + Debug + 'static {
7272
fn delete_object(&self, bucket: &str, key: &str) -> Result<Vec<Block>, MetaError>;
7373

7474
fn begin_transaction(&self) -> Box<dyn Transaction>;
75+
76+
// returns the number of keys of the bucket, block, and path trees.
77+
fn num_keys(&self) -> (usize, usize, usize);
78+
79+
// returns the disk space used by the metadata store.
80+
fn disk_space(&self) -> u64;
7581
}
7682

7783
pub trait Transaction: Send + Sync {

src/s3fs.rs

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,9 @@ impl S3 for S3FS {
347347
}
348348

349349
// load metadata
350-
let obj_meta = match self.casfs.get_object_meta(&bucket, &key) {
351-
Ok(Some(obj_meta)) => obj_meta,
350+
351+
let (obj_meta, blocks) = match self.casfs.get_object_blocks(&bucket, &key) {
352+
Ok(Some((obj_meta, blocks))) => (obj_meta, blocks),
352353
Ok(None) => {
353354
return Err(s3_error!(NoSuchKey, "Object does not exist"));
354355
}
@@ -387,31 +388,13 @@ impl S3 for S3FS {
387388
};
388389

389390
// load the data
390-
let block_map = try_!(self.casfs.block_tree());
391-
let mut paths = Vec::with_capacity(obj_meta.blocks().len());
391+
let mut paths = Vec::with_capacity(blocks.len());
392392
let mut block_size = 0;
393-
for block in obj_meta.blocks() {
394-
// unwrap here is safe as we only add blocks to the list of an object if they are
395-
// corectly inserted in the block map
396-
let block_meta = match block_map.get_block(block) {
397-
Ok(Some(block_meta)) => block_meta,
398-
Ok(None) => {
399-
return Err(s3_error!(
400-
InternalError,
401-
"Could not find block in block map"
402-
));
403-
}
404-
Err(e) => {
405-
return Err(s3_error!(
406-
InternalError,
407-
"Could not get block from block map: {}",
408-
e
409-
));
410-
}
411-
};
412-
block_size += block_meta.size();
413-
paths.push((block_meta.disk_path(self.root.clone()), block_meta.size()));
393+
for block in blocks {
394+
block_size += block.size();
395+
paths.push((block.disk_path(self.root.clone()), block.size()));
414396
}
397+
415398
debug_assert!(obj_meta.size() as usize == block_size);
416399
let block_stream = BlockStream::new(paths, block_size, range, self.metrics.clone());
417400
let stream = StreamingBlob::wrap(block_stream);

0 commit comments

Comments
 (0)