Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,735 changes: 0 additions & 4,735 deletions rust-runtime/Cargo.lock

This file was deleted.

55 changes: 44 additions & 11 deletions rust-runtime/aws-smithy-http-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-http-server"
version = "0.65.7"
version = "0.66.0"
authors = ["Smithy Rust Server <[email protected]>"]
edition = "2021"
license = "Apache-2.0"
Expand All @@ -13,37 +13,70 @@ Server runtime for Smithy Rust Server Framework.
publish = true

[features]
aws-lambda = ["dep:lambda_http"]
default = []
unredacted-logging = []
request-id = ["dep:uuid"]
aws-lambda = ["dep:lambda_http"]

[dependencies]
aws-smithy-cbor = { path = "../aws-smithy-cbor" }
aws-smithy-http = { path = "../aws-smithy-http", features = ["rt-tokio"] }
aws-smithy-json = { path = "../aws-smithy-json" }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api", features = ["http-02x"] }
aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x", "hyper-0-14-x"] }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" }
aws-smithy-types = { path = "../aws-smithy-types", features = [
"http-body-1-x",
] }
aws-smithy-xml = { path = "../aws-smithy-xml" }
aws-smithy-cbor = { path = "../aws-smithy-cbor" }

bytes = "1.10.0"
futures-util = { version = "0.3.29", default-features = false }
http = "0.2.12"
http-body = "0.4.6"
hyper = { version = "0.14.26", features = ["server", "http1", "http2", "tcp", "stream"] }
lambda_http = { version = "0.8.4", optional = true }

http = "1"
http-body = "1.0"
hyper = { version = "1", features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1", features = [
"tokio",
"server",
"server-auto",
"server-graceful",
"service",
"http1",
"http2",
] }
http-body-util = "0.1"

lambda_http = { version = "0.17", optional = true }

mime = "0.3.17"
nom = "7.1.3"
pin-project-lite = "0.2.14"
regex = "1.11.1"
serde_urlencoded = "0.7"
thiserror = "2"
tokio = { version = "1.40.0", features = ["full"] }
tower = { version = "0.4.13", features = ["util", "make"], default-features = false }
tower-http = { version = "0.3", features = ["add-extension", "map-response-body"] }
tower = { version = "0.4.13", features = [
"util",
"make",
], default-features = false }
tower-http = { version = "0.6", features = [
"add-extension",
"map-response-body",
] }
tracing = "0.1.40"
uuid = { version = "1.1.2", features = ["v4", "fast-rng"], optional = true }

[dev-dependencies]
pretty_assertions = "1"
hyper-util = { version = "0.1", features = [
"tokio",
"client",
"client-legacy",
"http1",
"http2",
] }
tracing-subscriber = { version = "0.3", features = ["fmt"] }
tower = { version = "0.4.13", features = ["util", "make", "limit"] }
tower-http = { version = "0.6", features = ["timeout"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Example showing how to limit concurrent connections.
//!
//! This example demonstrates using `limit_connections()` to cap the number
//! of simultaneous TCP connections the server will accept.
//!
//! Run with:
//! ```
//! cargo run --example connection_limiting
//! ```
//!
//! Test with:
//! ```
//! # Single request works fine
//! curl http://localhost:3000
//!
//! # Try overwhelming with many concurrent connections
//! oha -n 200 -c 200 http://localhost:3000
//! ```

use aws_smithy_http_server::{
routing::IntoMakeService,
serve::{serve, ListenerExt},
};
use http::{Request, Response};
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use std::{convert::Infallible, time::Duration};
use tokio::net::TcpListener;
use tower::service_fn;
use tracing::info;

async fn handler(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
// Simulate some work
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(Response::new(Full::new(Bytes::from("OK\n"))))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

info!("Starting server with connection limit...");

// The listener limits concurrent connections to 100.
// Once 100 connections are active, new connections will wait at the OS level
// until an existing connection completes.
let listener = TcpListener::bind("0.0.0.0:3000").await?.limit_connections(100);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh...I wonder if we want an extension trait here or something more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It comes from ListenerExt imported at the topβ€”happy to hear what you had in mind for making it more explicit.


let app = service_fn(handler);

info!("Server listening on http://0.0.0.0:3000");
info!("Max concurrent connections: 100");
info!("Try: oha -n 200 -c 200 http://localhost:3000");

serve(listener, IntoMakeService::new(app)).await?;

Ok(())
}
150 changes: 150 additions & 0 deletions rust-runtime/aws-smithy-http-server/examples/custom_accept_loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Example demonstrating a custom accept loop with connection-level timeouts.
//!
//! This example shows how to implement your own accept loop instead of using
//! the built-in `serve()` function. This gives you control over:
//! - Overall connection duration limits
//! - Connection-level configuration
//! - Per-connection decision making
//!
//! Run with:
//! ```
//! cargo run --example custom_accept_loop
//! ```
//!
//! Test with curl:
//! ```
//! curl http://localhost:3000/
//! curl http://localhost:3000/slow
//! ```

use aws_smithy_http_server::{routing::IntoMakeService, serve::IncomingStream};
use http::{Request, Response};
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder,
service::TowerToHyperService,
};
use std::{convert::Infallible, sync::Arc, time::Duration};
use tokio::{net::TcpListener, sync::Semaphore};
use tower::{service_fn, ServiceBuilder, ServiceExt};
use tower_http::timeout::TimeoutLayer;
use tracing::{info, warn};

/// Simple handler that responds immediately
async fn hello_handler(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello, World!\n"))))
}

/// Handler that simulates a slow response
async fn slow_handler(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
info!("slow handler: sleeping for 45 seconds");
tokio::time::sleep(Duration::from_secs(45)).await;
Ok(Response::new(Full::new(Bytes::from("Completed\n"))))
}

/// Router that dispatches to handlers based on path
async fn router(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
match req.uri().path() {
"/slow" => slow_handler(req).await,
_ => hello_handler(req).await,
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let listener = TcpListener::bind("0.0.0.0:3000").await?;
let local_addr = listener.local_addr()?;

info!("Server listening on http://{}", local_addr);
info!("Configuration:");
info!(" - Header read timeout: 10 seconds");
info!(" - Request timeout: 30 seconds");
info!(" - Connection duration limit: 5 minutes");
info!(" - Max concurrent connections: 1000");
info!(" - HTTP/2 keep-alive: 60s interval, 20s timeout");

// Connection limiting with semaphore
let connection_semaphore = Arc::new(Semaphore::new(1000));

// Build the service with request timeout layer
let base_service = ServiceBuilder::new()
.layer(TimeoutLayer::new(Duration::from_secs(30)))
.service(service_fn(router));

let make_service = IntoMakeService::new(base_service);

loop {
// Accept new connection
let (stream, remote_addr) = listener.accept().await?;

// Try to acquire connection permit
let permit = match connection_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
warn!("connection limit reached, rejecting connection from {}", remote_addr);
drop(stream);
continue;
}
};

info!("accepted connection from {}", remote_addr);

let make_service = make_service.clone();

tokio::spawn(async move {
// The permit will be dropped when this task ends, freeing up a connection slot
let _permit = permit;

let io = TokioIo::new(stream);

// Create service for this connection
let tower_service =
match ServiceExt::oneshot(make_service, IncomingStream::<TcpListener> { io: &io, remote_addr }).await {
Ok(svc) => svc,
Err(_) => {
warn!("failed to create service for connection from {}", remote_addr);
return;
}
};

let hyper_service = TowerToHyperService::new(tower_service);

// Configure Hyper builder with timeouts
let mut builder = Builder::new(TokioExecutor::new());
builder
.http1()
.header_read_timeout(Duration::from_secs(10))
.keep_alive(true);
builder
.http2()
.keep_alive_interval(Duration::from_secs(60))
.keep_alive_timeout(Duration::from_secs(20));

// Serve the connection with overall duration timeout
let conn = builder.serve_connection(io, hyper_service);

// Wrap the entire connection in a timeout.
// The connection will be closed after 5 minutes regardless of activity.
match tokio::time::timeout(Duration::from_secs(300), conn).await {
Ok(Ok(())) => {
info!("connection from {} closed normally", remote_addr);
}
Ok(Err(e)) => {
warn!("error serving connection from {}: {:?}", remote_addr, e);
}
Err(_) => {
info!("connection from {} exceeded 5 minute duration limit", remote_addr);
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

//! Example showing how to configure header read timeout.
//!
//! This demonstrates setting a timeout for reading HTTP request headers.
//! By default, Hyper allows 30 seconds for a client to send complete headers.
//! This example shows how to customize that duration.
//!
//! Run with:
//! ```
//! cargo run --example header_read_timeout
//! ```
//!
//! Test with:
//! ```
//! # Normal request works fine
//! curl http://localhost:3000
//!
//! # Simulate slow header sending (will timeout after 10s)
//! (echo -n "GET / HTTP/1.1\r\n"; sleep 15; echo "Host: localhost\r\n\r\n") | nc localhost 3000
//! ```

use aws_smithy_http_server::{routing::IntoMakeService, serve::serve};
use http::{Request, Response};
use http_body_util::Full;
use hyper::body::{Bytes, Incoming};
use std::{convert::Infallible, time::Duration};
use tokio::net::TcpListener;
use tower::service_fn;
use tracing::info;

async fn handler(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(Response::new(Full::new(Bytes::from("Hello, World!\n"))))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

info!("Starting server with custom header read timeout...");

let listener = TcpListener::bind("0.0.0.0:3000").await?;
let app = service_fn(handler);

info!("Server listening on http://0.0.0.0:3000");
info!("Header read timeout: 10 seconds (default is 30s)");
info!("");
info!("The client must send complete HTTP headers within 10 seconds,");
info!("otherwise the connection will be closed.");

serve(listener, IntoMakeService::new(app))
.configure_hyper(|mut builder| {
builder.http1().header_read_timeout(Duration::from_secs(10));
builder
})
.await?;

Ok(())
}
Loading
Loading