Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/casfs_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ fn setup_casfs() -> (CasFS, TempDir) {
storage_engine,
inlined_metadata_size,
durability,
5, // max_concurrent_block_writes for benchmarking
);

(fs, dir)
Expand Down
13,992 changes: 0 additions & 13,992 deletions httpUIsession.txt

This file was deleted.

4 changes: 4 additions & 0 deletions src/auth/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct UserRouter {
storage_engine: StorageEngine,
inlined_metadata_size: Option<usize>,
durability: Option<Durability>,
max_concurrent_block_writes: usize,
}

impl UserRouter {
Expand All @@ -58,6 +59,7 @@ impl UserRouter {
storage_engine: StorageEngine,
inlined_metadata_size: Option<usize>,
durability: Option<Durability>,
max_concurrent_block_writes: usize,
) -> Self {
Self {
shared_block_store,
Expand All @@ -68,6 +70,7 @@ impl UserRouter {
storage_engine,
inlined_metadata_size,
durability,
max_concurrent_block_writes,
}
}

Expand All @@ -88,6 +91,7 @@ impl UserRouter {
self.storage_engine,
self.inlined_metadata_size,
self.durability,
self.max_concurrent_block_writes,
);

Arc::new(casfs)
Expand Down
15 changes: 10 additions & 5 deletions src/cas/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub struct CasFS {
block_tree: Arc<BlockTree>,
shared_path_tree: Option<Arc<dyn BaseMetaTree>>,
shared_meta_store: Option<Arc<MetaStore>>,
max_concurrent_block_writes: usize,
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -129,6 +130,7 @@ impl CasFS {
storage_engine: StorageEngine,
inlined_metadata_size: Option<usize>,
durability: Option<Durability>,
max_concurrent_block_writes: usize,
) -> Self {
meta_path.push("db");
root.push("blocks");
Expand Down Expand Up @@ -168,6 +170,7 @@ impl CasFS {
block_tree: Arc::new(block_tree),
shared_path_tree: None, // Single-user mode
shared_meta_store: None, // Single-user mode
max_concurrent_block_writes,
}
}

Expand Down Expand Up @@ -195,6 +198,7 @@ impl CasFS {
storage_engine: StorageEngine,
inlined_metadata_size: Option<usize>,
durability: Option<Durability>,
max_concurrent_block_writes: usize,
) -> Self {
user_meta_path.push("db");
root.push("blocks");
Expand Down Expand Up @@ -227,6 +231,7 @@ impl CasFS {
block_tree: shared_block_tree,
shared_path_tree: Some(shared_path_tree),
shared_meta_store: Some(shared_meta_store),
max_concurrent_block_writes,
}
}

Expand Down Expand Up @@ -537,9 +542,7 @@ impl CasFS {
})
.zip(stream::repeat((tx, old_obj_meta)))
.enumerate()
.for_each(
// 1,
|(idx, (maybe_chunk, (mut tx, old_obj_meta)))| async move {
.map(|(idx, (maybe_chunk, (mut tx, old_obj_meta)))| async move {
if let Err(e) = maybe_chunk {
if let Err(e) = tx
.send(Err(std::io::Error::new(e.kind(), e.to_string())))
Expand Down Expand Up @@ -651,8 +654,9 @@ impl CasFS {
if let Err(e) = tx.unbounded_send(Ok((idx, block_hash))) {
tracing::error!(error = %e, "Could not send block id");
}
},
)
})
.buffer_unordered(self.max_concurrent_block_writes)
.for_each(|_| async {})
.await;

let mut ids = rx.try_collect::<Vec<(usize, BlockID)>>().await?;
Expand Down Expand Up @@ -716,6 +720,7 @@ mod tests {
storage_engine,
Some(1),
Some(Durability::Buffer),
5, // max_concurrent_block_writes for tests
);
(fs, dir)
}
Expand Down
1 change: 1 addition & 0 deletions src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub async fn check_integrity(args: CheckConfig) -> Result<()> {
storage_engine,
None,
None,
5, // max_concurrent_block_writes (not used for read operations)
);

let (obj_meta, _) = match casfs.get_object_paths(&args.bucket, &args.key)? {
Expand Down
10 changes: 10 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ pub struct ServerConfig {
help = "Log level (error, warn, info, debug, trace). Can also be set via RUST_LOG env var"
)]
log_level: String,

#[arg(
long,
default_value = "5",
help = "Maximum concurrent block writes during object uploads (default: 5, recommended: 3-12 based on hardware)"
)]
max_concurrent_block_writes: usize,
}

#[derive(Debug, Subcommand)]
Expand Down Expand Up @@ -296,6 +303,7 @@ async fn run_single_user(
storage_engine,
args.inline_metadata_size,
Some(args.durability),
args.max_concurrent_block_writes,
);
let s3fs = s3_cas::s3fs::S3FS::new(Arc::new(casfs), metrics.clone());
let s3fs = s3_cas::metrics::MetricFs::new(s3fs, metrics.clone());
Expand All @@ -309,6 +317,7 @@ async fn run_single_user(
storage_engine,
args.inline_metadata_size,
Some(args.durability),
args.max_concurrent_block_writes,
);

let http_ui_username = args.http_ui_username.clone();
Expand Down Expand Up @@ -386,6 +395,7 @@ async fn run_multi_user(
storage_engine,
args.inline_metadata_size,
Some(args.durability),
args.max_concurrent_block_writes,
));

let user_count = user_store.count_users()?;
Expand Down
53 changes: 34 additions & 19 deletions src/metastore/stores/fjall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use std::{convert::TryFrom, sync::Mutex};
use std::{convert::TryFrom, sync::RwLock};

use fjall::{self, TxPartitionHandle};

Expand All @@ -16,7 +16,7 @@ pub struct FjallStore {
keyspace: Arc<fjall::TxKeyspace>,
inlined_metadata_size: usize,
durability: fjall::PersistMode,
partition_cache: Arc<Mutex<HashMap<String, TxPartitionHandle>>>,
partition_cache: Arc<RwLock<HashMap<String, TxPartitionHandle>>>,
}

impl std::fmt::Debug for FjallStore {
Expand Down Expand Up @@ -47,30 +47,45 @@ impl FjallStore {
Durability::Fdatasync => fjall::PersistMode::SyncAll,
};

Self {
let store = Self {
keyspace: Arc::new(tx_keyspace),
inlined_metadata_size,
durability,
partition_cache: Arc::new(Mutex::new(HashMap::new())),
partition_cache: Arc::new(RwLock::new(HashMap::new())),
};

// Pre-warm partition cache with common partitions
// This ensures all accesses to these partitions use the fast read-only path
for partition_name in ["_BLOCKS", "_PATHS", "_BUCKETS", "_MULTIPART_PARTS"] {
let _ = store.get_partition(partition_name);
}

store
}

fn get_partition(&self, name: &str) -> Result<fjall::TxPartitionHandle, MetaError> {
Ok(self
.partition_cache
.lock()
.expect("Can lock partition cache")
.entry(name.to_string())
.or_insert(
// match self.keyspace.open_partition(name, Default::default()) {
// Ok(partition) => Ok(partition),
// Err(e) => Err(MetaError::OtherDBError(e.to_string())),
// },
self.keyspace
.open_partition(name, Default::default())
.expect("Can open parition"),
)
.clone())
// Fast path: try read lock first (most common after warmup)
{
let cache = self.partition_cache.read().expect("Can read partition cache");
if let Some(partition) = cache.get(name) {
return Ok(partition.clone());
}
}

// Slow path: cache miss, acquire write lock
let mut cache = self.partition_cache.write().expect("Can write partition cache");

// Double-check after acquiring write lock (another thread may have inserted)
if let Some(partition) = cache.get(name) {
return Ok(partition.clone());
}

// Open partition and insert into cache
let partition = self.keyspace
.open_partition(name, Default::default())
.expect("Can open partition");
cache.insert(name.to_string(), partition.clone());
Ok(partition)
}

fn commit_persist(&self, tx: fjall::WriteTransaction) -> Result<(), MetaError> {
Expand Down
39 changes: 34 additions & 5 deletions src/metastore/stores/fjall_notx.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use fjall;

Expand All @@ -14,6 +15,7 @@ use crate::metastore::{
pub struct FjallStoreNotx {
keyspace: Arc<fjall::Keyspace>,
inlined_metadata_size: usize,
partition_cache: Arc<RwLock<HashMap<String, fjall::PartitionHandle>>>,
}

impl std::fmt::Debug for FjallStoreNotx {
Expand All @@ -32,17 +34,44 @@ impl FjallStoreNotx {
// setting very low will practically disable it by default
let inlined_metadata_size = inlined_metadata_size.unwrap_or(1);

Self {
let store = Self {
keyspace: Arc::new(keyspace),
inlined_metadata_size,
partition_cache: Arc::new(RwLock::new(HashMap::new())),
};

// Pre-warm partition cache with common partitions
// This ensures all accesses to these partitions use the fast read-only path
for partition_name in ["_BLOCKS", "_PATHS", "_BUCKETS", "_MULTIPART_PARTS"] {
let _ = store.get_partition(partition_name);
}

store
}

fn get_partition(&self, name: &str) -> Result<fjall::PartitionHandle, MetaError> {
match self.keyspace.open_partition(name, Default::default()) {
Ok(partition) => Ok(partition),
Err(e) => Err(MetaError::OtherDBError(e.to_string())),
// Fast path: try read lock first (most common after warmup)
{
let cache = self.partition_cache.read().expect("Can read partition cache");
if let Some(partition) = cache.get(name) {
return Ok(partition.clone());
}
}

// Slow path: cache miss, acquire write lock
let mut cache = self.partition_cache.write().expect("Can write partition cache");

// Double-check after acquiring write lock (another thread may have inserted)
if let Some(partition) = cache.get(name) {
return Ok(partition.clone());
}

// Open partition and insert into cache
let partition = self.keyspace
.open_partition(name, Default::default())
.map_err(|e| MetaError::OtherDBError(e.to_string()))?;
cache.insert(name.to_string(), partition.clone());
Ok(partition)
}

pub fn get_inlined_metadata_size(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions src/retrieve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub async fn retrieve(args: RetrieveConfig) -> Result<()> {
storage_engine,
None,
None,
5, // max_concurrent_block_writes (not used for read operations)
);

let (obj_meta, paths) = match casfs.get_object_paths(&args.bucket, &args.key)? {
Expand Down
1 change: 1 addition & 0 deletions tests/it_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ static CONFIG: Lazy<SdkConfig> = Lazy::new(|| {
storage_engine,
inlined_size,
None,
5, // max_concurrent_block_writes for tests
);
let s3fs = s3_cas::s3fs::S3FS::new(Arc::new(casfs), metrics.clone());

Expand Down
Loading