Skip to content

Commit 2367574

Browse files
authored
Encode backend into a new ObjectKey (#63)
The new `ObjectKey` encodes the used backend along with a (v7) uuid. This can be used along with the `ScopedKey`, which also carries the usecase and scope, to be encoded as a path.
1 parent b111c55 commit 2367574

File tree

10 files changed

+489
-306
lines changed

10 files changed

+489
-306
lines changed

Cargo.lock

Lines changed: 229 additions & 225 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

objectstore-server/src/authentication.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use std::collections::BTreeSet;
2+
use std::str::FromStr;
23

34
use axum::extract::FromRequestParts;
45
use axum::http::request::Parts;
56
use axum::http::{StatusCode, header};
67
use axum::response::{IntoResponse, Response};
78
use jsonwebtoken::errors::Result as JwtResult;
89
use jsonwebtoken::{DecodingKey, Validation, decode};
9-
use objectstore_service::ObjectKey;
10+
use objectstore_service::{ObjectKey, ScopedKey};
1011
use objectstore_types::Scope;
1112
use serde::Deserialize;
1213

@@ -28,12 +29,13 @@ pub struct Claim {
2829
}
2930

3031
impl Claim {
31-
pub fn into_key(self, key: String) -> ObjectKey {
32-
ObjectKey {
32+
pub fn into_key(self, key: String) -> anyhow::Result<ScopedKey> {
33+
let key = ObjectKey::from_str(&key)?;
34+
Ok(ScopedKey {
3335
usecase: self.usecase,
3436
scope: self.scope,
3537
key,
36-
}
38+
})
3739
}
3840

3941
#[allow(clippy::result_large_err)]

objectstore-server/src/http.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use axum::routing::{get, put};
1414
use axum::{Json, Router};
1515
use axum_extra::middleware::option_layer;
1616
use futures_util::{StreamExt, TryStreamExt};
17-
use objectstore_service::{ObjectKey, StorageService};
17+
use objectstore_service::{ScopedKey, StorageService};
1818
use objectstore_types::Metadata;
1919
use sentry::integrations::tower as sentry_tower;
2020
use serde::Serialize;
@@ -39,8 +39,8 @@ fn make_app(state: ServiceState) -> axum::Router {
3939
.layer(TraceLayer::new_for_http().on_failure(DefaultOnFailure::new().level(Level::DEBUG)));
4040

4141
let routes = Router::new()
42-
.route("/", put(put_blob))
43-
.route("/{*key}", get(get_blob).delete(delete_blob));
42+
.route("/", put(put_object))
43+
.route("/{*key}", get(get_object).delete(delete_object));
4444

4545
routes.layer(middleware).with_state(state)
4646
}
@@ -101,30 +101,34 @@ struct PutBlobResponse {
101101
}
102102

103103
#[tracing::instrument(level = "trace", skip(state, body))]
104-
async fn put_blob(
104+
async fn put_object(
105105
State(state): State<ServiceState>,
106106
ExtractScope(claim): ExtractScope,
107107
headers: HeaderMap,
108108
body: Body,
109109
) -> error::Result<impl IntoResponse> {
110110
claim.ensure_permission(Permission::Write)?;
111-
let key = claim.into_key(Uuid::new_v4().to_string());
112111
let metadata = Metadata::from_headers(&headers, "")?;
113112

114113
let stream = body.into_data_stream().map_err(io::Error::other).boxed();
115-
state.service.put_object(&key, &metadata, stream).await?;
114+
let key = state
115+
.service
116+
.put_object(claim.usecase, claim.scope, &metadata, stream)
117+
.await?;
116118

117-
Ok(Json(PutBlobResponse { key: key.key }))
119+
Ok(Json(PutBlobResponse {
120+
key: key.key.to_string(),
121+
}))
118122
}
119123

120124
#[tracing::instrument(level = "trace", skip(state))]
121-
async fn get_blob(
125+
async fn get_object(
122126
State(state): State<ServiceState>,
123127
ExtractScope(claim): ExtractScope,
124128
Path(key): Path<String>,
125129
) -> error::Result<Response> {
126130
claim.ensure_permission(Permission::Read)?;
127-
let key = claim.into_key(key);
131+
let key = claim.into_key(key)?;
128132

129133
let Some((metadata, stream)) = state.service.get_object(&key).await? else {
130134
return Ok(StatusCode::NOT_FOUND.into_response());
@@ -134,14 +138,14 @@ async fn get_blob(
134138
Ok((headers, Body::from_stream(stream)).into_response())
135139
}
136140

137-
#[tracing::instrument(level = "trace", skip_all, fields(usecase, scope, key))]
138-
async fn delete_blob(
141+
#[tracing::instrument(level = "trace", skip(state))]
142+
async fn delete_object(
139143
State(state): State<ServiceState>,
140144
ExtractScope(claim): ExtractScope,
141145
Path(key): Path<String>,
142146
) -> error::Result<impl IntoResponse> {
143147
claim.ensure_permission(Permission::Write)?;
144-
let key = claim.into_key(key);
148+
let key = claim.into_key(key)?;
145149

146150
state.service.delete_object(&key).await?;
147151

objectstore-service/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ async-trait = "0.1.88"
1010
bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", branch = "admin" }
1111
bincode = { version = "2.0.1", features = ["serde"] }
1212
bytes = "1.10.1"
13+
data-encoding = "2.9.0"
1314
futures-util = "0.3.31"
1415
gcp_auth = "0.12.3"
1516
objectstore-types = { path = "../objectstore-types" }
@@ -18,6 +19,8 @@ serde = { version = "1.0.219", features = ["derive"] }
1819
tokio = { version = "1.46.0", features = ["full"] }
1920
tokio-util = "0.7.15"
2021
tonic = "0.13.1"
22+
uuid = { version = "1.18.0", features = ["v7"] }
23+
watto = "0.2.0"
2124
zstd = "0.13.3"
2225

2326
[dev-dependencies]

objectstore-service/src/backend/bigtable.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use objectstore_types::{ExpirationPolicy, Metadata};
1818
use tokio::runtime::Handle;
1919
use tonic::Code;
2020

21+
use crate::ScopedKey;
2122
use crate::backend::{Backend, BackendStream};
2223

2324
/// Connection timeout used for the initial connection to BigQuery.
@@ -168,13 +169,13 @@ impl BigTableBackend {
168169
}
169170
}
170171

171-
async fn mutate<I>(&self, path: &str, mutations: I) -> Result<MutateRowResponse>
172+
async fn mutate<I>(&self, path: Vec<u8>, mutations: I) -> Result<MutateRowResponse>
172173
where
173174
I: IntoIterator<Item = Mutation>,
174175
{
175176
let request = v2::MutateRowRequest {
176177
table_name: self.table_path.clone(),
177-
row_key: path.as_bytes().to_vec(),
178+
row_key: path,
178179
mutations: mutations
179180
.into_iter()
180181
.map(|m| v2::Mutation { mutation: Some(m) })
@@ -206,10 +207,11 @@ impl BigTableBackend {}
206207
impl Backend for BigTableBackend {
207208
async fn put_object(
208209
&self,
209-
path: &str,
210+
key: &ScopedKey,
210211
metadata: &Metadata,
211212
mut stream: BackendStream,
212213
) -> Result<()> {
214+
let path = key.as_path().to_string().into_bytes();
213215
// TODO: Inject the access time from the request.
214216
let access_time = SystemTime::now();
215217

@@ -253,9 +255,10 @@ impl Backend for BigTableBackend {
253255
Ok(())
254256
}
255257

256-
async fn get_object(&self, path: &str) -> Result<Option<(Metadata, BackendStream)>> {
258+
async fn get_object(&self, key: &ScopedKey) -> Result<Option<(Metadata, BackendStream)>> {
259+
let path = key.as_path().to_string().into_bytes();
257260
let rows = v2::RowSet {
258-
row_keys: vec![path.as_bytes().to_vec()],
261+
row_keys: vec![path.clone()],
259262
row_ranges: vec![],
260263
};
261264

@@ -270,11 +273,11 @@ impl Backend for BigTableBackend {
270273
let response = client.read_rows(request).await?;
271274
debug_assert!(response.len() <= 1, "Expected at most one row");
272275

273-
let Some((key, cells)) = response.into_iter().next() else {
276+
let Some((read_path, cells)) = response.into_iter().next() else {
274277
return Ok(None);
275278
};
276279

277-
debug_assert!(key == path.as_bytes(), "Row key mismatch");
280+
debug_assert!(read_path == path, "Row key mismatch");
278281
let mut value = Bytes::new();
279282
let mut metadata = Metadata::default();
280283
let mut expire_at = None;
@@ -310,15 +313,16 @@ impl Backend for BigTableBackend {
310313
let value = Bytes::clone(&value);
311314
let stream = stream::once(async { Ok(value) }).boxed();
312315
// TODO: Avoid the serialize roundtrip for metadata
313-
self.put_object(path, &metadata, stream).await?;
316+
self.put_object(key, &metadata, stream).await?;
314317
}
315318
}
316319

317320
let stream = stream::once(async { Ok(value) }).boxed();
318321
Ok(Some((metadata, stream)))
319322
}
320323

321-
async fn delete_object(&self, path: &str) -> Result<()> {
324+
async fn delete_object(&self, key: &ScopedKey) -> Result<()> {
325+
let path = key.as_path().to_string().into_bytes();
322326
self.mutate(path, [Mutation::DeleteFromRow(DeleteFromRow {})])
323327
.await?;
324328
Ok(())
@@ -350,6 +354,10 @@ fn micros_to_time(micros: i64) -> Option<SystemTime> {
350354
mod tests {
351355
use std::collections::BTreeMap;
352356

357+
use objectstore_types::Scope;
358+
359+
use crate::ObjectKey;
360+
353361
use super::*;
354362

355363
// NB: Not run any of these tests, you need to have a BigTable emulator running and
@@ -380,15 +388,22 @@ mod tests {
380388
Ok(payload)
381389
}
382390

383-
fn make_path() -> String {
384-
format!("usecase1/4711/{}", uuid::Uuid::new_v4())
391+
fn make_key() -> ScopedKey {
392+
ScopedKey {
393+
usecase: "testing".into(),
394+
scope: Scope {
395+
organization: 1234,
396+
project: Some(1234),
397+
},
398+
key: ObjectKey::for_backend(0),
399+
}
385400
}
386401

387402
#[tokio::test]
388403
async fn test_roundtrip() -> Result<()> {
389404
let backend = create_test_backend().await?;
390405

391-
let path = make_path();
406+
let path = make_key();
392407
let metadata = Metadata {
393408
expiration_policy: ExpirationPolicy::Manual,
394409
compression: None,
@@ -413,7 +428,7 @@ mod tests {
413428
async fn test_get_nonexistent() -> Result<()> {
414429
let backend = create_test_backend().await?;
415430

416-
let path = make_path();
431+
let path = make_key();
417432
let result = backend.get_object(&path).await?;
418433
assert!(result.is_none());
419434

@@ -424,7 +439,7 @@ mod tests {
424439
async fn test_delete_nonexistent() -> Result<()> {
425440
let backend = create_test_backend().await?;
426441

427-
let path = make_path();
442+
let path = make_key();
428443
backend.delete_object(&path).await?;
429444

430445
Ok(())
@@ -434,7 +449,7 @@ mod tests {
434449
async fn test_overwrite() -> Result<()> {
435450
let backend = create_test_backend().await?;
436451

437-
let path = make_path();
452+
let path = make_key();
438453
let metadata = Metadata {
439454
expiration_policy: ExpirationPolicy::Manual,
440455
compression: None,
@@ -469,7 +484,7 @@ mod tests {
469484
async fn test_read_after_delete() -> Result<()> {
470485
let backend = create_test_backend().await?;
471486

472-
let path = make_path();
487+
let path = make_key();
473488
let metadata = Metadata {
474489
expiration_policy: ExpirationPolicy::Manual,
475490
compression: None,
@@ -495,7 +510,7 @@ mod tests {
495510

496511
let backend = create_test_backend().await?;
497512

498-
let path = make_path();
513+
let path = make_key();
499514
let metadata = Metadata {
500515
expiration_policy: ExpirationPolicy::TimeToLive(Duration::from_secs(0)),
501516
compression: None,
@@ -519,7 +534,7 @@ mod tests {
519534

520535
let backend = create_test_backend().await?;
521536

522-
let path = make_path();
537+
let path = make_key();
523538
let metadata = Metadata {
524539
expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(0)),
525540
compression: None,

objectstore-service/src/backend/local_fs.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use tokio::fs::OpenOptions;
99
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
1010
use tokio_util::io::{ReaderStream, StreamReader};
1111

12+
use crate::ScopedKey;
13+
1214
use super::{Backend, BackendStream};
1315

1416
const BC_CONFIG: bincode::config::Configuration = bincode::config::standard();
@@ -28,10 +30,11 @@ impl LocalFs {
2830
impl Backend for LocalFs {
2931
async fn put_object(
3032
&self,
31-
path: &str,
33+
key: &ScopedKey,
3234
metadata: &Metadata,
3335
stream: BackendStream,
3436
) -> anyhow::Result<()> {
37+
let path = key.as_path().to_string();
3538
let path = self.path.join(path);
3639
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
3740
let file = OpenOptions::new()
@@ -56,7 +59,11 @@ impl Backend for LocalFs {
5659
Ok(())
5760
}
5861

59-
async fn get_object(&self, path: &str) -> anyhow::Result<Option<(Metadata, BackendStream)>> {
62+
async fn get_object(
63+
&self,
64+
key: &ScopedKey,
65+
) -> anyhow::Result<Option<(Metadata, BackendStream)>> {
66+
let path = key.as_path().to_string();
6067
let path = self.path.join(path);
6168
let file = match OpenOptions::new().read(true).open(path).await {
6269
Ok(file) => file,
@@ -101,7 +108,8 @@ impl Backend for LocalFs {
101108
Ok(Some((metadata, stream.boxed())))
102109
}
103110

104-
async fn delete_object(&self, path: &str) -> anyhow::Result<()> {
111+
async fn delete_object(&self, key: &ScopedKey) -> anyhow::Result<()> {
112+
let path = key.as_path().to_string();
105113
let path = self.path.join(path);
106114
Ok(tokio::fs::remove_file(path).await?)
107115
}
@@ -113,7 +121,9 @@ mod tests {
113121

114122
use bytes::BytesMut;
115123
use futures_util::TryStreamExt;
116-
use objectstore_types::{Compression, ExpirationPolicy};
124+
use objectstore_types::{Compression, ExpirationPolicy, Scope};
125+
126+
use crate::ObjectKey;
117127

118128
use super::*;
119129

@@ -126,17 +136,25 @@ mod tests {
126136
let tempdir = tempfile::tempdir().unwrap();
127137
let backend = LocalFs::new(tempdir.path());
128138

139+
let key = ScopedKey {
140+
usecase: "testing".into(),
141+
scope: Scope {
142+
organization: 1234,
143+
project: Some(1234),
144+
},
145+
key: ObjectKey::for_backend(0),
146+
};
129147
let metadata = Metadata {
130148
expiration_policy: ExpirationPolicy::TimeToIdle(Duration::from_secs(3600)),
131149
compression: Some(Compression::Zstd),
132150
custom: [("foo".into(), "bar".into())].into(),
133151
};
134152
backend
135-
.put_object("foo/bar", &metadata, make_stream(b"oh hai!"))
153+
.put_object(&key, &metadata, make_stream(b"oh hai!"))
136154
.await
137155
.unwrap();
138156

139-
let (read_metadata, stream) = backend.get_object("foo/bar").await.unwrap().unwrap();
157+
let (read_metadata, stream) = backend.get_object(&key).await.unwrap().unwrap();
140158
let file_contents: BytesMut = stream.try_collect().await.unwrap();
141159

142160
assert_eq!(read_metadata, metadata);

0 commit comments

Comments
 (0)