Skip to content

Commit 8d380fe

Browse files
committed
Add a second storage backend
This hardcodes two storage backends, one for small files, and one for large files. The threshold has been chosen arbitrarily as 50 KiB for now.
1 parent 211ec61 commit 8d380fe

File tree

4 files changed

+101
-52
lines changed

4 files changed

+101
-52
lines changed

objectstore-server/src/config.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ pub struct Config {
3131
pub http_addr: SocketAddr,
3232

3333
// storage config
34-
pub storage: Storage,
34+
pub small_storage: Storage,
35+
pub large_storage: Storage,
3536

3637
// authentication config
3738
pub jwt_secret: String,
@@ -45,7 +46,10 @@ impl Default for Config {
4546
Self {
4647
http_addr: "0.0.0.0:8888".parse().unwrap(),
4748

48-
storage: Storage::FileSystem {
49+
small_storage: Storage::FileSystem {
50+
path: PathBuf::from("data"),
51+
},
52+
large_storage: Storage::FileSystem {
4953
path: PathBuf::from("data"),
5054
},
5155

@@ -92,13 +96,13 @@ mod tests {
9296
#[test]
9397
fn configurable_via_env() {
9498
figment::Jail::expect_with(|jail| {
95-
jail.set_env("fss_storage__type", "s3compatible");
96-
jail.set_env("fss_storage__endpoint", "http://localhost:8888");
97-
jail.set_env("fss_storage__bucket", "whatever");
99+
jail.set_env("fss_large_storage__type", "s3compatible");
100+
jail.set_env("fss_large_storage__endpoint", "http://localhost:8888");
101+
jail.set_env("fss_large_storage__bucket", "whatever");
98102

99103
let config = Config::from_args(Args::default()).unwrap();
100104

101-
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).storage else {
105+
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).large_storage else {
102106
panic!("expected s3 storage");
103107
};
104108
assert_eq!(endpoint.as_deref(), Some("http://localhost:8888"));
@@ -114,7 +118,7 @@ mod tests {
114118
tempfile
115119
.write_all(
116120
br#"
117-
storage:
121+
large_storage:
118122
type: s3compatible
119123
endpoint: http://localhost:8888
120124
bucket: whatever
@@ -127,7 +131,7 @@ mod tests {
127131
};
128132
let config = Config::from_args(args).unwrap();
129133

130-
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).storage else {
134+
let Storage::S3Compatible { endpoint, bucket } = dbg!(config).large_storage else {
131135
panic!("expected s3 storage");
132136
};
133137
assert_eq!(endpoint.as_deref(), Some("http://localhost:8888"));

objectstore-server/src/state.rs

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,29 @@ pub struct State {
1313

1414
impl State {
1515
pub async fn new(config: Config) -> anyhow::Result<ServiceState> {
16-
let storage_config = match &config.storage {
17-
Storage::FileSystem { path } => StorageConfig::FileSystem { path },
18-
Storage::S3Compatible { endpoint, bucket } => StorageConfig::S3Compatible {
19-
endpoint: endpoint.as_deref(),
20-
bucket,
21-
},
22-
Storage::BigTable {
23-
project_id,
24-
instance_name,
25-
table_name,
26-
} => StorageConfig::BigTable(BigTableConfig {
27-
project_id: project_id.clone(),
28-
instance_name: instance_name.clone(),
29-
table_name: table_name.clone(),
30-
}),
31-
};
32-
let service = StorageService::new(storage_config).await?;
16+
let small_storage = map_storage_config(&config.small_storage);
17+
let large_storage = map_storage_config(&config.large_storage);
18+
let service = StorageService::new(small_storage, large_storage).await?;
3319

3420
Ok(Arc::new(Self { config, service }))
3521
}
3622
}
23+
24+
fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> {
25+
match config {
26+
Storage::FileSystem { path } => StorageConfig::FileSystem { path },
27+
Storage::S3Compatible { endpoint, bucket } => StorageConfig::S3Compatible {
28+
endpoint: endpoint.as_deref(),
29+
bucket,
30+
},
31+
Storage::BigTable {
32+
project_id,
33+
instance_name,
34+
table_name,
35+
} => StorageConfig::BigTable(BigTableConfig {
36+
project_id: project_id.clone(),
37+
instance_name: instance_name.clone(),
38+
table_name: table_name.clone(),
39+
}),
40+
}
41+
}

objectstore-service/src/backend/bigtable.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ const FAMILY_GC: &str = "fg";
3838
const FAMILY_MANUAL: &str = "fm";
3939

4040
/// Configuration for the BigTable backend.
41-
#[derive(Debug)]
41+
#[derive(Debug, Clone)]
4242
pub struct BigTableConfig {
4343
/// GCP project ID.
4444
pub project_id: String,
@@ -227,7 +227,7 @@ impl Backend for BigTableBackend {
227227

228228
let mut payload = Vec::new();
229229
while let Some(chunk) = stream.try_next().await? {
230-
payload.extend(&chunk);
230+
payload.extend_from_slice(&chunk);
231231
}
232232

233233
let mutations = [

objectstore-service/src/lib.rs

Lines changed: 65 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
mod backend;
99
mod metadata;
1010

11+
use bytes::BytesMut;
12+
use futures_util::{StreamExt, TryStreamExt};
1113
use objectstore_types::{Metadata, Scope};
1214

1315
use std::path::Path;
@@ -18,17 +20,21 @@ use crate::backend::{BackendStream, BoxedBackend};
1820
pub use backend::BigTableConfig;
1921
pub use metadata::*;
2022

23+
/// The threshold up until which we will go to the small backend.
24+
const SMALL_THRESHOLD: usize = 50 * 1024; // 50 KiB
25+
2126
/// High-level asynchronous service for storing and retrieving objects.
2227
#[derive(Clone, Debug)]
2328
pub struct StorageService(Arc<StorageServiceInner>);
2429

2530
#[derive(Debug)]
2631
struct StorageServiceInner {
27-
backend: BoxedBackend,
32+
small_backend: BoxedBackend,
33+
large_backend: BoxedBackend,
2834
}
2935

3036
/// Configuration to initialize a [`StorageService`].
31-
#[derive(Debug)]
37+
#[derive(Debug, Clone)]
3238
pub enum StorageConfig<'a> {
3339
/// Use a local filesystem as the storage backend.
3440
FileSystem {
@@ -48,22 +54,17 @@ pub enum StorageConfig<'a> {
4854

4955
impl StorageService {
5056
/// Creates a new `StorageService` with the specified configuration.
51-
pub async fn new(config: StorageConfig<'_>) -> anyhow::Result<Self> {
52-
let backend = match config {
53-
StorageConfig::FileSystem { path } => Box::new(backend::LocalFs::new(path)),
54-
StorageConfig::S3Compatible { endpoint, bucket } => {
55-
if let Some(endpoint) = endpoint {
56-
Box::new(backend::S3Compatible::without_token(endpoint, bucket))
57-
} else {
58-
backend::gcs(bucket).await?
59-
}
60-
}
61-
StorageConfig::BigTable(config) => {
62-
Box::new(backend::BigTableBackend::new(config).await?)
63-
}
57+
pub async fn new(
58+
small_config: StorageConfig<'_>,
59+
large_config: StorageConfig<'_>,
60+
) -> anyhow::Result<Self> {
61+
let small_backend = create_backend(small_config).await?;
62+
let large_backend = create_backend(large_config).await?;
63+
64+
let inner = StorageServiceInner {
65+
small_backend,
66+
large_backend,
6467
};
65-
66-
let inner = StorageServiceInner { backend };
6768
Ok(Self(Arc::new(inner)))
6869
}
6970

@@ -73,16 +74,33 @@ impl StorageService {
7374
usecase: String,
7475
scope: Scope,
7576
metadata: &Metadata,
76-
stream: BackendStream,
77+
mut stream: BackendStream,
7778
) -> anyhow::Result<ScopedKey> {
78-
let key = ObjectKey::for_backend(1);
79+
let mut first_chunk = BytesMut::new();
80+
let mut backend_id = 1; // 1 = small files backend
81+
while let Some(chunk) = stream.try_next().await? {
82+
first_chunk.extend_from_slice(&chunk);
83+
84+
if first_chunk.len() > SMALL_THRESHOLD {
85+
backend_id = 2; // 2 = large files backend
86+
break;
87+
}
88+
}
89+
let stream = futures_util::stream::once(async { Ok(first_chunk.into()) })
90+
.chain(stream)
91+
.boxed();
92+
93+
let key = ObjectKey::for_backend(backend_id);
7994
let key = ScopedKey {
8095
usecase,
8196
scope,
8297
key,
8398
};
8499

85-
self.0.backend.put_object(&key, metadata, stream).await?;
100+
self.0
101+
.small_backend
102+
.put_object(&key, metadata, stream)
103+
.await?;
86104
Ok(key)
87105
}
88106

@@ -91,15 +109,37 @@ impl StorageService {
91109
&self,
92110
key: &ScopedKey,
93111
) -> anyhow::Result<Option<(Metadata, BackendStream)>> {
94-
self.0.backend.get_object(key).await
112+
match key.key.backend {
113+
1 => self.0.small_backend.get_object(key).await,
114+
2 => self.0.large_backend.get_object(key).await,
115+
_ => anyhow::bail!("invalid backend"),
116+
}
95117
}
96118

97119
/// Deletes an object stored at the given key, if it exists.
98120
pub async fn delete_object(&self, key: &ScopedKey) -> anyhow::Result<()> {
99-
self.0.backend.delete_object(key).await
121+
match key.key.backend {
122+
1 => self.0.small_backend.delete_object(key).await,
123+
2 => self.0.large_backend.delete_object(key).await,
124+
_ => anyhow::bail!("invalid backend"),
125+
}
100126
}
101127
}
102128

129+
async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result<BoxedBackend> {
130+
Ok(match config {
131+
StorageConfig::FileSystem { path } => Box::new(backend::LocalFs::new(path)),
132+
StorageConfig::S3Compatible { endpoint, bucket } => {
133+
if let Some(endpoint) = endpoint {
134+
Box::new(backend::S3Compatible::without_token(endpoint, bucket))
135+
} else {
136+
backend::gcs(bucket).await?
137+
}
138+
}
139+
StorageConfig::BigTable(config) => Box::new(backend::BigTableBackend::new(config).await?),
140+
})
141+
}
142+
103143
#[cfg(test)]
104144
mod tests {
105145
use bytes::BytesMut;
@@ -118,7 +158,7 @@ mod tests {
118158
let config = StorageConfig::FileSystem {
119159
path: tempdir.path(),
120160
};
121-
let service = StorageService::new(config).await.unwrap();
161+
let service = StorageService::new(config.clone(), config).await.unwrap();
122162

123163
let key = service
124164
.put_object(
@@ -146,7 +186,7 @@ mod tests {
146186
endpoint: None,
147187
bucket: "sbx-warp-benchmark-bucket",
148188
};
149-
let service = StorageService::new(config).await.unwrap();
189+
let service = StorageService::new(config.clone(), config).await.unwrap();
150190

151191
let key = service
152192
.put_object(
@@ -174,7 +214,7 @@ mod tests {
174214
endpoint: Some("http://localhost:8333"),
175215
bucket: "whatever",
176216
};
177-
let service = StorageService::new(config).await.unwrap();
217+
let service = StorageService::new(config.clone(), config).await.unwrap();
178218

179219
let key = service
180220
.put_object(

0 commit comments

Comments
 (0)