Skip to content

Commit 5ba4b28

Browse files
committed
Add VssService and base service setup.
1 parent ee24608 commit 5ba4b28

File tree

3 files changed

+244
-0
lines changed

3 files changed

+244
-0
lines changed

rust/server/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "server"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
api = { path = "../api" }
8+
impls = { path = "../impls" }
9+
10+
hyper = { version = "1", default-features = false, features = ["server", "http1"] }
11+
http-body-util = { version = "0.1", default-features = false }
12+
hyper-util = { version = "0.1", default-features = false, features = ["server-graceful"] }
13+
tokio = { version = "1.38.0", default-features = false, features = ["time", "signal", "rt-multi-thread", "macros"] }
14+
prost = { version = "0.11.6", default-features = false, features = ["std"] }
15+
bytes = "1.4.0"

rust/server/src/main.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use hyper::service::Service;
2+
use prost::Message;
3+
use std::net::SocketAddr;
4+
5+
use tokio::net::TcpListener;
6+
use tokio::signal::unix::SignalKind;
7+
8+
use hyper::server::conn::http1;
9+
use hyper_util::rt::TokioIo;
10+
11+
use crate::vss_service::VssService;
12+
use api::auth::NoopAuthorizer;
13+
use api::kv_store::KvStore;
14+
use impls::postgres_store::PostgresBackendImpl;
15+
use std::str::FromStr;
16+
use std::sync::Arc;
17+
18+
pub(crate) mod vss_service;
19+
20+
fn main() {
21+
// Define the address to bind the server to
22+
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
23+
24+
let runtime = match tokio::runtime::Builder::new_multi_thread().enable_all().build() {
25+
Ok(runtime) => Arc::new(runtime),
26+
Err(e) => {
27+
eprintln!("Failed to setup tokio runtime: {}", e);
28+
std::process::exit(-1);
29+
},
30+
};
31+
32+
runtime.block_on(async {
33+
let mut sigterm_stream = match tokio::signal::unix::signal(SignalKind::terminate()) {
34+
Ok(stream) => stream,
35+
Err(e) => {
36+
println!("Failed to register for SIGTERM stream: {}", e);
37+
std::process::exit(-1);
38+
},
39+
};
40+
let authorizer = Arc::new(NoopAuthorizer {});
41+
let store = Arc::new(
42+
PostgresBackendImpl::new("postgresql://postgres:postgres@localhost:5432/postgres")
43+
.await
44+
.unwrap(),
45+
);
46+
let rest_svc_listener =
47+
TcpListener::bind(&addr).await.expect("Failed to bind listening port");
48+
loop {
49+
tokio::select! {
50+
res = rest_svc_listener.accept() => {
51+
match res {
52+
Ok((stream, _)) => {
53+
let io_stream = TokioIo::new(stream);
54+
let vss_service = VssService::new(Arc::clone(&store) as Arc<dyn KvStore>,authorizer );
55+
runtime.spawn(async move {
56+
if let Err(err) = http1::Builder::new().serve_connection(io_stream, vss_service).await {
57+
eprintln!("Failed to serve connection: {}", err);
58+
}
59+
});
60+
},
61+
Err(e) => eprintln!("Failed to accept connection: {}", e),
62+
}
63+
}
64+
_ = tokio::signal::ctrl_c() => {
65+
println!("Received CTRL-C, shutting down..");
66+
break;
67+
}
68+
_ = sigterm_stream.recv() => {
69+
println!("Received SIGTERM, shutting down..");
70+
break;
71+
}
72+
}
73+
}
74+
});
75+
}

rust/server/src/vss_service.rs

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
use http_body_util::{BodyExt, Full};
2+
use hyper::body::{Bytes, Incoming};
3+
use hyper::service::Service;
4+
use hyper::{Error, Request, Response, StatusCode};
5+
use std::collections::HashMap;
6+
7+
use prost::Message;
8+
9+
use api::auth::Authorizer;
10+
use api::error::VssError;
11+
use api::kv_store::KvStore;
12+
use api::types::{
13+
DeleteObjectRequest, DeleteObjectResponse, ErrorCode, ErrorResponse, GetObjectRequest,
14+
GetObjectResponse, ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest,
15+
PutObjectResponse,
16+
};
17+
use std::future::Future;
18+
use std::pin::Pin;
19+
use std::sync::Arc;
20+
21+
#[derive(Clone)]
22+
pub struct VssService {
23+
store: Arc<dyn KvStore>,
24+
authorizer: Arc<dyn Authorizer>,
25+
}
26+
27+
impl VssService {
28+
pub(crate) fn new(store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>) -> Self {
29+
Self { store, authorizer }
30+
}
31+
}
32+
33+
impl Service<Request<Incoming>> for VssService {
34+
type Response = Response<Full<Bytes>>;
35+
type Error = hyper::Error;
36+
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
37+
38+
fn call(&self, req: Request<Incoming>) -> Self::Future {
39+
let store = Arc::clone(&self.store);
40+
let authorizer = Arc::clone(&self.authorizer);
41+
let path = req.uri().path().to_owned();
42+
Box::pin(async move {
43+
match path.as_str() {
44+
"/getObject" => {
45+
handle_request(store, authorizer, req, handle_get_object_request).await
46+
},
47+
"/putObjects" => {
48+
handle_request(store, authorizer, req, handle_put_object_request).await
49+
},
50+
"/deleteObject" => {
51+
handle_request(store, authorizer, req, handle_delete_object_request).await
52+
},
53+
"/listKeyVersions" => {
54+
handle_request(store, authorizer, req, handle_list_object_request).await
55+
},
56+
_ => {
57+
let error = format!("Unknown request: {}", path).into_bytes();
58+
Ok(Response::builder()
59+
.status(StatusCode::BAD_REQUEST)
60+
.body(Full::new(Bytes::from(error)))
61+
.unwrap())
62+
},
63+
}
64+
})
65+
}
66+
}
67+
68+
async fn handle_get_object_request(
69+
store: Arc<dyn KvStore>, user_token: String, request: GetObjectRequest,
70+
) -> Result<GetObjectResponse, VssError> {
71+
store.get(user_token, request).await
72+
}
73+
async fn handle_put_object_request(
74+
store: Arc<dyn KvStore>, user_token: String, request: PutObjectRequest,
75+
) -> Result<PutObjectResponse, VssError> {
76+
store.put(user_token, request).await
77+
}
78+
async fn handle_delete_object_request(
79+
store: Arc<dyn KvStore>, user_token: String, request: DeleteObjectRequest,
80+
) -> Result<DeleteObjectResponse, VssError> {
81+
store.delete(user_token, request).await
82+
}
83+
async fn handle_list_object_request(
84+
store: Arc<dyn KvStore>, user_token: String, request: ListKeyVersionsRequest,
85+
) -> Result<ListKeyVersionsResponse, VssError> {
86+
store.list_key_versions(user_token, request).await
87+
}
88+
async fn handle_request<
89+
T: Message + Default,
90+
R: Message,
91+
F: FnOnce(Arc<dyn KvStore>, String, T) -> Fut + Send + 'static,
92+
Fut: Future<Output = Result<R, VssError>> + Send,
93+
>(
94+
store: Arc<dyn KvStore>, authorizer: Arc<dyn Authorizer>, request: Request<Incoming>,
95+
handler: F,
96+
) -> Result<<VssService as Service<Request<Incoming>>>::Response, hyper::Error> {
97+
// TODO: we should bound the amount of data we read to avoid allocating too much memory.
98+
let (parts, body) = request.into_parts();
99+
let headers_map = parts
100+
.headers
101+
.iter()
102+
.map(|(k, v)| (k.as_str().to_string(), v.to_str().unwrap_or("").to_string()))
103+
.collect::<HashMap<String, String>>();
104+
105+
let user_token = match authorizer.verify(&headers_map).await {
106+
Ok(auth_response) => auth_response.user_token,
107+
Err(e) => return build_error_response(e),
108+
};
109+
110+
let bytes = body.collect().await?.to_bytes();
111+
match T::decode(bytes) {
112+
Ok(request) => match handler(store.clone(), user_token, request).await {
113+
Ok(response) => Ok(Response::builder()
114+
.body(Full::new(Bytes::from(response.encode_to_vec())))
115+
// unwrap safety: body only errors when previous chained calls failed.
116+
.unwrap()),
117+
Err(e) => build_error_response(e),
118+
},
119+
Err(_) => Ok(Response::builder()
120+
.status(StatusCode::BAD_REQUEST)
121+
.body(Full::new(Bytes::from(b"Error parsing request".to_vec())))
122+
// unwrap safety: body only errors when previous chained calls failed.
123+
.unwrap()),
124+
}
125+
}
126+
127+
fn build_error_response(e: VssError) -> Result<Response<Full<Bytes>>, Error> {
128+
let error_response = match e {
129+
VssError::NoSuchKeyError(msg) => ErrorResponse {
130+
error_code: ErrorCode::NoSuchKeyException.into(),
131+
message: msg.to_string(),
132+
},
133+
VssError::ConflictError(msg) => ErrorResponse {
134+
error_code: ErrorCode::ConflictException.into(),
135+
message: msg.to_string(),
136+
},
137+
VssError::InvalidRequestError(msg) => ErrorResponse {
138+
error_code: ErrorCode::InvalidRequestException.into(),
139+
message: msg.to_string(),
140+
},
141+
VssError::AuthError(msg) => {
142+
ErrorResponse { error_code: ErrorCode::AuthException.into(), message: msg.to_string() }
143+
},
144+
_ => ErrorResponse {
145+
error_code: ErrorCode::InternalServerException.into(),
146+
message: "Unknown Server Error occurred.".to_string(),
147+
},
148+
};
149+
Ok(Response::builder()
150+
.status(StatusCode::INTERNAL_SERVER_ERROR)
151+
.body(Full::new(Bytes::from(error_response.encode_to_vec())))
152+
// unwrap safety: body only errors when previous chained calls failed.
153+
.unwrap())
154+
}

0 commit comments

Comments
 (0)