Skip to content

Commit 3e54119

Browse files
authored
feat(notary): add concurrency limit (#770)
* feat(notary): add concurrency limit * switch to 503 status code * remove test-api feature * improve naming and comments * set default concurrency to 32
1 parent 71aa90d commit 3e54119

File tree

8 files changed

+272
-49
lines changed

8 files changed

+272
-49
lines changed

crates/notary/client/src/client.rs

Lines changed: 105 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
//! subsequent requests for notarization.
55
66
use http_body_util::{BodyExt as _, Either, Empty, Full};
7-
use hyper::{body::Bytes, client::conn::http1::Parts, Request, StatusCode};
7+
use hyper::{
8+
body::{Bytes, Incoming},
9+
client::conn::http1::Parts,
10+
Request, Response, StatusCode,
11+
};
812
use hyper_util::rt::TokioIo;
913
use notary_server::{ClientType, NotarizationSessionRequest, NotarizationSessionResponse};
1014
use std::{
@@ -16,6 +20,7 @@ use std::{
1620
use tokio::{
1721
io::{AsyncRead, AsyncWrite, ReadBuf},
1822
net::TcpStream,
23+
time::{sleep, timeout, Duration},
1924
};
2025
use tokio_rustls::{
2126
client::TlsStream,
@@ -129,6 +134,14 @@ pub struct NotaryClient {
129134
/// in notary server.
130135
#[builder(setter(into, strip_option), default)]
131136
api_key: Option<String>,
137+
/// The duration of notarization request timeout in seconds.
138+
#[builder(default = "60")]
139+
request_timeout: usize,
140+
/// The number of seconds to wait between notarization request retries.
141+
///
142+
/// By default uses the value suggested by the server.
143+
#[builder(default = "None")]
144+
request_retry_override: Option<u64>,
132145
}
133146

134147
impl NotaryClientBuilder {
@@ -355,15 +368,56 @@ impl NotaryClient {
355368

356369
debug!("Sending notarization request: {:?}", notarization_request);
357370

358-
let notarization_response = notary_request_sender
359-
.send_request(notarization_request)
360-
.await
361-
.map_err(|err| {
362-
error!("Failed to send http request for notarization");
363-
ClientError::new(ErrorKind::Http, Some(Box::new(err)))
364-
})?;
371+
let notarize_with_retry_fut = async {
372+
loop {
373+
let notarization_response = notary_request_sender
374+
.send_request(notarization_request.clone())
375+
.await
376+
.map_err(|err| {
377+
error!("Failed to send http request for notarization");
378+
ClientError::new(ErrorKind::Http, Some(Box::new(err)))
379+
})?;
380+
381+
if notarization_response.status() == StatusCode::SWITCHING_PROTOCOLS {
382+
return Ok::<Response<Incoming>, ClientError>(notarization_response);
383+
} else if notarization_response.status() == StatusCode::SERVICE_UNAVAILABLE {
384+
let retry_after = self
385+
.request_retry_override
386+
.unwrap_or(parse_retry_after(&notarization_response)?);
387+
388+
debug!("Retrying notarization request in {:?}", retry_after);
389+
390+
sleep(Duration::from_secs(retry_after)).await;
391+
} else {
392+
return Err(ClientError::new(
393+
ErrorKind::Internal,
394+
Some(
395+
format!(
396+
"Server sent unexpected status code {:?}",
397+
notarization_response.status()
398+
)
399+
.into(),
400+
),
401+
));
402+
}
403+
}
404+
};
405+
406+
let notarization_response = timeout(
407+
Duration::from_secs(self.request_timeout as u64),
408+
notarize_with_retry_fut,
409+
)
410+
.await
411+
.map_err(|_| {
412+
ClientError::new(
413+
ErrorKind::Internal,
414+
Some(
415+
"Timed out while waiting for server to accept notarization request".into(),
416+
),
417+
)
418+
})??;
365419

366-
debug!("Sent notarization request");
420+
debug!("Notarization request was accepted by the server");
367421

368422
if notarization_response.status() != StatusCode::SWITCHING_PROTOCOLS {
369423
return Err(ClientError::new(
@@ -388,6 +442,17 @@ impl NotaryClient {
388442

389443
Ok((notary_socket.into_inner(), session_id))
390444
}
445+
446+
/// Sets notarization request timeout duration in seconds.
447+
pub fn request_timeout(&mut self, timeout: usize) {
448+
self.request_timeout = timeout;
449+
}
450+
451+
/// Sets the number of seconds to wait between notarization request
452+
/// retries.
453+
pub fn request_retry_override(&mut self, seconds: u64) {
454+
self.request_retry_override = Some(seconds);
455+
}
391456
}
392457

393458
/// Default root store using mozilla certs.
@@ -403,3 +468,34 @@ fn default_root_store() -> RootCertStore {
403468

404469
root_store
405470
}
471+
472+
// Attempts to parse the value of the "Retry-After" header from the given
473+
// `response`.
474+
fn parse_retry_after(response: &Response<Incoming>) -> Result<u64, ClientError> {
475+
let seconds = match response.headers().get("Retry-After") {
476+
Some(value) => {
477+
let value_str = value.to_str().map_err(|err| {
478+
ClientError::new(
479+
ErrorKind::Internal,
480+
Some(format!("Invalid Retry-After header: {}", err).into()),
481+
)
482+
})?;
483+
484+
let seconds: u64 = value_str.parse().map_err(|err| {
485+
ClientError::new(
486+
ErrorKind::Internal,
487+
Some(format!("Could not parse Retry-After header as number: {}", err).into()),
488+
)
489+
})?;
490+
seconds
491+
}
492+
None => {
493+
return Err(ClientError::new(
494+
ErrorKind::Internal,
495+
Some("The expected Retry-After header was not found in server response".into()),
496+
));
497+
}
498+
};
499+
500+
Ok(seconds)
501+
}

crates/notary/server/config/config.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,5 @@ logging:
4444
authorization:
4545
enabled: false
4646
whitelist_csv_path: "./fixture/auth/whitelist.csv"
47+
48+
concurrency: 32

crates/notary/server/src/config.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use serde::Deserialize;
22

3-
#[derive(Clone, Debug, Deserialize, Default)]
3+
#[derive(Clone, Debug, Deserialize)]
44
pub struct NotaryServerProperties {
55
/// Name and address of the notary server
66
pub server: ServerProperties,
@@ -14,6 +14,22 @@ pub struct NotaryServerProperties {
1414
pub logging: LoggingProperties,
1515
/// Setting for authorization
1616
pub authorization: AuthorizationProperties,
17+
/// The maximum number of concurrent notarization sessions
18+
pub concurrency: usize,
19+
}
20+
21+
impl Default for NotaryServerProperties {
22+
fn default() -> Self {
23+
Self {
24+
server: ServerProperties::default(),
25+
notarization: NotarizationProperties::default(),
26+
tls: TLSProperties::default(),
27+
notary_key: NotarySigningKeyProperties::default(),
28+
logging: LoggingProperties::default(),
29+
authorization: AuthorizationProperties::default(),
30+
concurrency: 32,
31+
}
32+
}
1733
}
1834

1935
#[derive(Clone, Debug, Deserialize, Default)]

crates/notary/server/src/domain/notary.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{
44
sync::{Arc, Mutex},
55
};
66
use tlsn_core::CryptoProvider;
7+
use tokio::sync::Semaphore;
78

89
use crate::{config::NotarizationProperties, domain::auth::AuthorizationWhitelistRecord};
910

@@ -53,19 +54,23 @@ pub struct NotaryGlobals {
5354
pub store: Arc<Mutex<HashMap<String, ()>>>,
5455
/// Whitelist of API keys for authorization purpose
5556
pub authorization_whitelist: Option<Arc<Mutex<HashMap<String, AuthorizationWhitelistRecord>>>>,
57+
/// A semaphore to acquire a permit for notarization
58+
pub semaphore: Arc<Semaphore>,
5659
}
5760

5861
impl NotaryGlobals {
5962
pub fn new(
6063
crypto_provider: Arc<CryptoProvider>,
6164
notarization_config: NotarizationProperties,
6265
authorization_whitelist: Option<Arc<Mutex<HashMap<String, AuthorizationWhitelistRecord>>>>,
66+
semaphore: Arc<Semaphore>,
6367
) -> Self {
6468
Self {
6569
crypto_provider,
6670
notarization_config,
6771
store: Default::default(),
6872
authorization_whitelist,
73+
semaphore,
6974
}
7075
}
7176
}

crates/notary/server/src/server.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ use crate::{
4949
#[cfg(feature = "tee_quote")]
5050
use crate::tee::{generate_ephemeral_keypair, quote};
5151

52+
use tokio::sync::Semaphore;
53+
5254
/// Start a TCP server (with or without TLS) to accept notarization request for
5355
/// both TCP and WebSocket clients
5456
#[tracing::instrument(skip(config))]
@@ -114,6 +116,7 @@ pub async fn run_server(config: &NotaryServerProperties) -> Result<(), NotarySer
114116
Arc::new(crypto_provider),
115117
config.notarization.clone(),
116118
authorization_whitelist,
119+
Arc::new(Semaphore::new(config.concurrency)),
117120
);
118121

119122
// Parameters needed for the info endpoint

crates/notary/server/src/service.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod tcp;
33
pub mod websocket;
44

55
use axum::{
6+
body::Body,
67
extract::{rejection::JsonRejection, FromRequestParts, Query, State},
78
http::{header, request::Parts, StatusCode},
89
response::{IntoResponse, Json, Response},
@@ -78,6 +79,17 @@ pub async fn upgrade_protocol(
7879
State(notary_globals): State<NotaryGlobals>,
7980
Query(params): Query<NotarizationRequestQuery>,
8081
) -> Response {
82+
let permit = if let Ok(permit) = notary_globals.semaphore.clone().try_acquire_owned() {
83+
permit
84+
} else {
85+
// TODO: estimate the time more precisely to avoid unnecessary retries.
86+
return Response::builder()
87+
.status(StatusCode::SERVICE_UNAVAILABLE)
88+
.header("Retry-After", 5)
89+
.body(Body::default())
90+
.expect("Builder should not fail");
91+
};
92+
8193
info!("Received upgrade protocol request");
8294
let session_id = params.session_id;
8395
// Check if session_id exists in the store, this also removes session_id from
@@ -96,12 +108,14 @@ pub async fn upgrade_protocol(
96108
// This completes the HTTP Upgrade request and returns a successful response to
97109
// the client, meanwhile initiating the websocket or tcp connection
98110
match protocol_upgrade {
99-
ProtocolUpgrade::Ws(ws) => {
100-
ws.on_upgrade(move |socket| websocket_notarize(socket, notary_globals, session_id))
101-
}
102-
ProtocolUpgrade::Tcp(tcp) => {
103-
tcp.on_upgrade(move |stream| tcp_notarize(stream, notary_globals, session_id))
104-
}
111+
ProtocolUpgrade::Ws(ws) => ws.on_upgrade(move |socket| async move {
112+
websocket_notarize(socket, notary_globals, session_id).await;
113+
drop(permit);
114+
}),
115+
ProtocolUpgrade::Tcp(tcp) => tcp.on_upgrade(move |stream| async move {
116+
tcp_notarize(stream, notary_globals, session_id).await;
117+
drop(permit);
118+
}),
105119
}
106120
}
107121

crates/notary/tests-integration/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ tlsn-tls-core = { workspace = true }
1414
tlsn-core = { workspace = true }
1515

1616
async-tungstenite = { workspace = true, features = ["tokio-native-tls"] }
17+
futures = { workspace = true }
1718
http = { workspace = true }
1819
http-body-util = { workspace = true }
1920
hyper = { workspace = true, features = ["client", "http1", "server"] }

0 commit comments

Comments
 (0)