Skip to content

Commit 971955f

Browse files
committed
review
1 parent 8d380fe commit 971955f

File tree

3 files changed

+48
-34
lines changed

3 files changed

+48
-34
lines changed

objectstore-server/src/config.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ pub struct Config {
3131
pub http_addr: SocketAddr,
3232

3333
// storage config
34-
pub small_storage: Storage,
35-
pub large_storage: Storage,
34+
pub high_volume_storage: Storage,
35+
pub long_term_storage: Storage,
3636

3737
// authentication config
3838
pub jwt_secret: String,
@@ -46,10 +46,10 @@ impl Default for Config {
4646
Self {
4747
http_addr: "0.0.0.0:8888".parse().unwrap(),
4848

49-
small_storage: Storage::FileSystem {
49+
high_volume_storage: Storage::FileSystem {
5050
path: PathBuf::from("data"),
5151
},
52-
large_storage: Storage::FileSystem {
52+
long_term_storage: Storage::FileSystem {
5353
path: PathBuf::from("data"),
5454
},
5555

@@ -96,13 +96,13 @@ mod tests {
9696
#[test]
9797
fn configurable_via_env() {
9898
figment::Jail::expect_with(|jail| {
99-
jail.set_env("fss_large_storage__type", "s3compatible");
100-
jail.set_env("fss_large_storage__endpoint", "http://localhost:8888");
101-
jail.set_env("fss_large_storage__bucket", "whatever");
99+
jail.set_env("fss_long_term_storage__type", "s3compatible");
100+
jail.set_env("fss_long_term_storage__endpoint", "http://localhost:8888");
101+
jail.set_env("fss_long_term_storage__bucket", "whatever");
102102

103103
let config = Config::from_args(Args::default()).unwrap();
104104

105-
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).large_storage else {
105+
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).long_term_storage else {
106106
panic!("expected s3 storage");
107107
};
108108
assert_eq!(endpoint.as_deref(), Some("http://localhost:8888"));
@@ -118,7 +118,7 @@ mod tests {
118118
tempfile
119119
.write_all(
120120
br#"
121-
large_storage:
121+
long_term_storage:
122122
type: s3compatible
123123
endpoint: http://localhost:8888
124124
bucket: whatever
@@ -131,7 +131,7 @@ mod tests {
131131
};
132132
let config = Config::from_args(args).unwrap();
133133

134-
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).large_storage else {
134+
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).long_term_storage else {
135135
panic!("expected s3 storage");
136136
};
137137
assert_eq!(endpoint.as_deref(), Some("http://localhost:8888"));

objectstore-server/src/state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ pub struct State {
1313

1414
impl State {
1515
pub async fn new(config: Config) -> anyhow::Result<ServiceState> {
16-
let small_storage = map_storage_config(&config.small_storage);
17-
let large_storage = map_storage_config(&config.large_storage);
18-
let service = StorageService::new(small_storage, large_storage).await?;
16+
let high_volume = map_storage_config(&config.high_volume_storage);
17+
let long_term = map_storage_config(&config.long_term_storage);
18+
let service = StorageService::new(high_volume, long_term).await?;
1919

2020
Ok(Arc::new(Self { config, service }))
2121
}

objectstore-service/src/lib.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,19 @@ use crate::backend::{BackendStream, BoxedBackend};
2020
pub use backend::BigTableConfig;
2121
pub use metadata::*;
2222

23-
/// The threshold up until which we will go to the small backend.
24-
const SMALL_THRESHOLD: usize = 50 * 1024; // 50 KiB
23+
/// The threshold up until which we will go to the "high volume" backend.
24+
const BACKEND_SIZE_THRESHOLD: usize = 1024 * 1024; // 1 MiB
25+
const BACKEND_HIGH_VOLUME: u8 = 1;
26+
const BACKEND_LONG_TERM: u8 = 2;
2527

2628
/// High-level asynchronous service for storing and retrieving objects.
2729
#[derive(Clone, Debug)]
2830
pub struct StorageService(Arc<StorageServiceInner>);
2931

3032
#[derive(Debug)]
3133
struct StorageServiceInner {
32-
small_backend: BoxedBackend,
33-
large_backend: BoxedBackend,
34+
high_volume_backend: BoxedBackend,
35+
long_term_backend: BoxedBackend,
3436
}
3537

3638
/// Configuration to initialize a [`StorageService`].
@@ -55,15 +57,15 @@ pub enum StorageConfig<'a> {
5557
impl StorageService {
5658
/// Creates a new `StorageService` with the specified configuration.
5759
pub async fn new(
58-
small_config: StorageConfig<'_>,
59-
large_config: StorageConfig<'_>,
60+
high_volume_config: StorageConfig<'_>,
61+
long_term_config: StorageConfig<'_>,
6062
) -> anyhow::Result<Self> {
61-
let small_backend = create_backend(small_config).await?;
62-
let large_backend = create_backend(large_config).await?;
63+
let high_volume_backend = create_backend(high_volume_config).await?;
64+
let long_term_backend = create_backend(long_term_config).await?;
6365

6466
let inner = StorageServiceInner {
65-
small_backend,
66-
large_backend,
67+
high_volume_backend,
68+
long_term_backend,
6769
};
6870
Ok(Self(Arc::new(inner)))
6971
}
@@ -77,12 +79,12 @@ impl StorageService {
7779
mut stream: BackendStream,
7880
) -> anyhow::Result<ScopedKey> {
7981
let mut first_chunk = BytesMut::new();
80-
let mut backend_id = 1; // 1 = small files backend
82+
let mut backend_id = BACKEND_HIGH_VOLUME;
8183
while let Some(chunk) = stream.try_next().await? {
8284
first_chunk.extend_from_slice(&chunk);
8385

84-
if first_chunk.len() > SMALL_THRESHOLD {
85-
backend_id = 2; // 2 = large files backend
86+
if first_chunk.len() > BACKEND_SIZE_THRESHOLD {
87+
backend_id = BACKEND_LONG_TERM;
8688
break;
8789
}
8890
}
@@ -97,10 +99,22 @@ impl StorageService {
9799
key,
98100
};
99101

100-
self.0
101-
.small_backend
102-
.put_object(&key, metadata, stream)
103-
.await?;
102+
match backend_id {
103+
BACKEND_HIGH_VOLUME => {
104+
self.0
105+
.high_volume_backend
106+
.put_object(&key, metadata, stream)
107+
.await?
108+
}
109+
BACKEND_LONG_TERM => {
110+
self.0
111+
.long_term_backend
112+
.put_object(&key, metadata, stream)
113+
.await?
114+
}
115+
_ => unreachable!(),
116+
}
117+
104118
Ok(key)
105119
}
106120

@@ -110,17 +124,17 @@ impl StorageService {
110124
key: &ScopedKey,
111125
) -> anyhow::Result<Option<(Metadata, BackendStream)>> {
112126
match key.key.backend {
113-
1 => self.0.small_backend.get_object(key).await,
114-
2 => self.0.large_backend.get_object(key).await,
127+
BACKEND_HIGH_VOLUME => self.0.high_volume_backend.get_object(key).await,
128+
BACKEND_LONG_TERM => self.0.long_term_backend.get_object(key).await,
115129
_ => anyhow::bail!("invalid backend"),
116130
}
117131
}
118132

119133
/// Deletes an object stored at the given key, if it exists.
120134
pub async fn delete_object(&self, key: &ScopedKey) -> anyhow::Result<()> {
121135
match key.key.backend {
122-
1 => self.0.small_backend.delete_object(key).await,
123-
2 => self.0.large_backend.delete_object(key).await,
136+
BACKEND_HIGH_VOLUME => self.0.high_volume_backend.delete_object(key).await,
137+
BACKEND_LONG_TERM => self.0.long_term_backend.delete_object(key).await,
124138
_ => anyhow::bail!("invalid backend"),
125139
}
126140
}

0 commit comments

Comments
 (0)