-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathobjects.rs
More file actions
113 lines (95 loc) · 3.44 KB
/
objects.rs
File metadata and controls
113 lines (95 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::io;
use std::time::SystemTime;
use anyhow::Context;
use axum::body::Body;
use axum::http::{HeaderMap, StatusCode};
use axum::response::{IntoResponse, Response};
use axum::routing;
use axum::{Json, Router};
use futures_util::{StreamExt, TryStreamExt};
use objectstore_service::id::{ObjectContext, ObjectId};
use objectstore_types::Metadata;
use serde::Serialize;
use crate::auth::AuthAwareService;
use crate::error::ApiResult;
use crate::extractors::Xt;
use crate::state::ServiceState;
pub fn router() -> Router<ServiceState> {
let collection_routes = routing::post(objects_post);
let object_routes = routing::get(object_get)
.head(object_head)
.put(object_put)
// TODO(ja): Implement PATCH (metadata update w/o body)
.delete(object_delete);
Router::new()
.route("/objects/{usecase}/{scopes}", collection_routes.clone())
.route("/objects/{usecase}/{scopes}/", collection_routes)
.route("/objects/{usecase}/{scopes}/{*key}", object_routes)
}
/// Response returned when inserting an object.
#[derive(Debug, Serialize)]
pub struct InsertObjectResponse {
pub key: String,
}
async fn objects_post(
service: AuthAwareService,
Xt(context): Xt<ObjectContext>,
headers: HeaderMap,
body: Body,
) -> ApiResult<Response> {
let mut metadata =
Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
metadata.time_created = Some(SystemTime::now());
let stream = body.into_data_stream().map_err(io::Error::other).boxed();
let response_id = service
.insert_object(context, None, &metadata, stream)
.await?;
let response = Json(InsertObjectResponse {
key: response_id.key().to_string(),
});
Ok((StatusCode::CREATED, response).into_response())
}
async fn object_get(service: AuthAwareService, Xt(id): Xt<ObjectId>) -> ApiResult<Response> {
let Some((metadata, stream)) = service.get_object(&id).await? else {
return Ok(StatusCode::NOT_FOUND.into_response());
};
let headers = metadata
.to_headers("", false)
.context("extracting metadata from headers")?;
Ok((headers, Body::from_stream(stream)).into_response())
}
async fn object_head(service: AuthAwareService, Xt(id): Xt<ObjectId>) -> ApiResult<Response> {
let Some((metadata, _stream)) = service.get_object(&id).await? else {
return Ok(StatusCode::NOT_FOUND.into_response());
};
let headers = metadata
.to_headers("", false)
.context("extracting metadata from headers")?;
Ok((StatusCode::NO_CONTENT, headers).into_response())
}
async fn object_put(
service: AuthAwareService,
Xt(id): Xt<ObjectId>,
headers: HeaderMap,
body: Body,
) -> ApiResult<Response> {
let mut metadata =
Metadata::from_headers(&headers, "").context("extracting metadata from headers")?;
metadata.time_created = Some(SystemTime::now());
let ObjectId { context, key } = id;
let stream = body.into_data_stream().map_err(io::Error::other).boxed();
let response_id = service
.insert_object(context, Some(key), &metadata, stream)
.await?;
let response = Json(InsertObjectResponse {
key: response_id.key.to_string(),
});
Ok((StatusCode::OK, response).into_response())
}
async fn object_delete(
service: AuthAwareService,
Xt(id): Xt<ObjectId>,
) -> ApiResult<impl IntoResponse> {
service.delete_object(&id).await?;
Ok(StatusCode::NO_CONTENT)
}