Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ actix-rt = { version = "2.2", default-features = false }

ahash = "0.8"
bitflags = "2"
bytes = "1"
bytes = "1.7"
bytestring = "1"
derive_more = { version = "1", features = ["as_ref", "deref", "deref_mut", "display", "error", "from"] }
encoding_rs = "0.8"
Expand Down
17 changes: 16 additions & 1 deletion actix-http/examples/actix-web.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
use std::sync::OnceLock;

use actix_http::HttpService;
use actix_server::Server;
use actix_service::map_config;
use actix_web::{dev::AppConfig, get, App, Responder};

static MEDIUM: OnceLock<String> = OnceLock::new();
static LARGE: OnceLock<String> = OnceLock::new();

#[get("/")]
async fn index() -> impl Responder {
"Hello, world. From Actix Web!"
}

#[get("/large")]
async fn large() -> &'static str {
LARGE.get_or_init(|| "123456890".repeat(1024 * 100))
}

#[get("/medium")]
async fn medium() -> &'static str {
MEDIUM.get_or_init(|| "123456890".repeat(1024 * 5))
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
Server::build()
.bind("hello-world", "127.0.0.1:8080", || {
// construct actix-web app
let app = App::new().service(index);
let app = App::new().service(index).service(large).service(medium);

HttpService::build()
// pass the app to service builder
Expand Down
124 changes: 124 additions & 0 deletions actix-http/src/big_bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::collections::VecDeque;

use bytes::{Buf, BufMut, Bytes, BytesMut};

// 64KB max capacity (arbitrarily chosen)
const MAX_CAPACITY: usize = 1024 * 64;

pub struct BigBytes {
buffer: BytesMut,
frozen: VecDeque<Bytes>,
frozen_len: usize,
}

impl BigBytes {
/// Initialize a new BigBytes with the internal buffer set to `capacity` capacity
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: BytesMut::with_capacity(capacity),
frozen: VecDeque::default(),
frozen_len: 0,
}
}

/// Clear the internal queue and buffer, resetting length to zero
///
/// if the internal buffer capacity exceeds 64KB or new_capacity, whichever is greater, it will
/// be freed and a new buffer of capacity `new_capacity` will be allocated
pub fn clear(&mut self, new_capacity: usize) {
std::mem::take(&mut self.frozen);
self.frozen_len = 0;
self.buffer.clear();

if self.buffer.capacity() > new_capacity.max(MAX_CAPACITY) {
self.buffer = BytesMut::with_capacity(new_capacity);
}
}

/// Return a mutable reference to the underlying buffer. This should only be used when dealing
/// with small allocations (e.g. writing headers)
pub fn buffer_mut(&mut self) -> &mut BytesMut {
&mut self.buffer
}

/// Return the total length of the bytes stored in BigBytes
pub fn total_len(&mut self) -> usize {
self.frozen_len + self.buffer.len()
}

/// Return whether there are no bytes present in the BigBytes
pub fn is_empty(&self) -> bool {
self.frozen_len == 0 && self.buffer.is_empty()
}

/// Add the `bytes` to the internal structure. If `bytes` exceeds 64KB, it is pushed into a
/// queue, otherwise, it is added to a buffer.
pub fn put_bytes(&mut self, bytes: Bytes) {
if !self.buffer.is_empty() {
let current = self.buffer.split().freeze();
self.frozen_len += current.len();
self.frozen.push_back(current);
}

if !bytes.is_empty() {
self.frozen_len += bytes.len();
self.frozen.push_back(bytes);
}
}

/// Returns a slice of the frontmost buffer
///
/// While there are bytes present in BigBytes, front_slice is guaranteed not to return an empty
/// slice.
pub fn front_slice(&self) -> &[u8] {
if let Some(front) = self.frozen.front() {
front
} else {
&self.buffer
}
}

/// Advances the first buffer by `count` bytes. If the first buffer is advanced to completion,
/// it is popped from the queue
pub fn advance(&mut self, count: usize) {
if let Some(front) = self.frozen.front_mut() {
front.advance(count);

if front.is_empty() {
self.frozen.pop_front();
}

self.frozen_len -= count;
} else {
self.buffer.advance(count);
}
}

/// Pops the front Bytes from the BigBytes, or splits and freezes the internal buffer if no
/// Bytes are present.
pub fn pop_front(&mut self) -> Option<Bytes> {
if let Some(front) = self.frozen.pop_front() {
self.frozen_len -= front.len();
Some(front)
} else if !self.buffer.is_empty() {
Some(self.buffer.split().freeze())
} else {
None
}
}

/// Drain the BigBytes, writing everything into the provided BytesMut
pub fn write_to(&mut self, dst: &mut BytesMut) {
dst.reserve(self.total_len());

for buf in &self.frozen {
dst.put_slice(buf);
}

dst.put_slice(&self.buffer.split());

self.frozen_len = 0;

std::mem::take(&mut self.frozen);
}
}
38 changes: 28 additions & 10 deletions actix-http/src/h1/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use super::{
decoder::{self, PayloadDecoder, PayloadItem, PayloadType},
encoder, Message, MessageType,
};
use crate::{body::BodySize, error::ParseError, ConnectionType, Request, Response, ServiceConfig};
use crate::{
big_bytes::BigBytes, body::BodySize, error::ParseError, ConnectionType, Request, Response,
ServiceConfig,
};

bitflags! {
#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -146,14 +149,12 @@ impl Decoder for Codec {
}
}

impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
type Error = io::Error;

fn encode(
impl Codec {
pub(super) fn encode_bigbytes(
&mut self,
item: Message<(Response<()>, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
dst: &mut BigBytes,
) -> std::io::Result<()> {
match item {
Message::Item((mut res, length)) => {
// set response version
Expand All @@ -172,7 +173,7 @@ impl Encoder<Message<(Response<()>, BodySize)>> for Codec {

// encode message
self.encoder.encode(
dst,
dst.buffer_mut(),
&mut res,
self.flags.contains(Flags::HEAD),
self.flags.contains(Flags::STREAM),
Expand All @@ -184,18 +185,35 @@ impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
}

Message::Chunk(Some(bytes)) => {
self.encoder.encode_chunk(bytes.as_ref(), dst)?;
self.encoder.encode_chunk_bigbytes(bytes, dst)?;
}

Message::Chunk(None) => {
self.encoder.encode_eof(dst)?;
self.encoder.encode_eof(dst.buffer_mut())?;
}
}

Ok(())
}
}

impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
type Error = io::Error;

fn encode(
&mut self,
item: Message<(Response<()>, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
let mut bigbytes = BigBytes::with_capacity(1024 * 8);
self.encode_bigbytes(item, &mut bigbytes)?;

bigbytes.write_to(dst);

Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
42 changes: 22 additions & 20 deletions actix-http/src/h1/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ use actix_codec::{Framed, FramedParts};
use actix_rt::time::sleep_until;
use actix_service::Service;
use bitflags::bitflags;
use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use futures_core::ready;
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::codec::{Decoder as _, Encoder as _};
use tokio_util::codec::Decoder as _;
use tracing::{error, trace};

use super::{
Expand All @@ -27,6 +27,7 @@ use super::{
Message, MessageType,
};
use crate::{
big_bytes::BigBytes,
body::{BodySize, BoxBody, MessageBody},
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
Expand Down Expand Up @@ -165,7 +166,7 @@ pin_project! {

pub(super) io: Option<T>,
read_buf: BytesMut,
write_buf: BytesMut,
write_buf: BigBytes,
codec: Codec,
}
}
Expand Down Expand Up @@ -277,7 +278,7 @@ where

io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BigBytes::with_capacity(HW_BUFFER_SIZE),
codec: Codec::new(config),
},
},
Expand Down Expand Up @@ -329,27 +330,24 @@ where
let InnerDispatcherProj { io, write_buf, .. } = self.project();
let mut io = Pin::new(io.as_mut().unwrap());

let len = write_buf.len();
let mut written = 0;

while written < len {
match io.as_mut().poll_write(cx, &write_buf[written..])? {
while write_buf.total_len() > 0 {
match io.as_mut().poll_write(cx, write_buf.front_slice())? {
Poll::Ready(0) => {
println!("WRITE ZERO");
error!("write zero; closing");
return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, "")));
}

Poll::Ready(n) => written += n,
Poll::Ready(n) => write_buf.advance(n),

Poll::Pending => {
write_buf.advance(written);
return Poll::Pending;
}
}
}

// everything has written to I/O; clear buffer
write_buf.clear();
write_buf.clear(HW_BUFFER_SIZE);

// flush the I/O and check if get blocked
io.poll_flush(cx)
Expand All @@ -365,7 +363,7 @@ where
let size = body.size();

this.codec
.encode(Message::Item((res, size)), this.write_buf)
.encode_bigbytes(Message::Item((res, size)), this.write_buf)
.map_err(|err| {
if let Some(mut payload) = this.payload.take() {
payload.set_error(PayloadError::Incomplete(None));
Expand Down Expand Up @@ -416,6 +414,7 @@ where
fn send_continue(self: Pin<&mut Self>) {
self.project()
.write_buf
.buffer_mut()
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
}

Expand Down Expand Up @@ -493,15 +492,16 @@ where
StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
while this.write_buf.total_len() < super::payload::MAX_BUFFER_SIZE {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
.encode(Message::Chunk(Some(item)), this.write_buf)?;
.encode_bigbytes(Message::Chunk(Some(item)), this.write_buf)?;
}

Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
this.codec
.encode_bigbytes(Message::Chunk(None), this.write_buf)?;

// payload stream finished.
// set state to None and handle next message
Expand Down Expand Up @@ -532,15 +532,16 @@ where

// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
while this.write_buf.total_len() < super::payload::MAX_BUFFER_SIZE {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
.encode(Message::Chunk(Some(item)), this.write_buf)?;
.encode_bigbytes(Message::Chunk(Some(item)), this.write_buf)?;
}

Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
this.codec
.encode_bigbytes(Message::Chunk(None), this.write_buf)?;

// payload stream finished
// set state to None and handle next message
Expand Down Expand Up @@ -575,6 +576,7 @@ where
// to service call.
Poll::Ready(Ok(req)) => {
this.write_buf
.buffer_mut()
.extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
let fut = this.flow.service.call(req);
this.state.set(State::ServiceCall { fut });
Expand Down Expand Up @@ -1027,7 +1029,7 @@ where
mem::take(this.codec),
mem::take(this.read_buf),
);
parts.write_buf = mem::take(this.write_buf);
this.write_buf.write_to(&mut parts.write_buf);
let framed = Framed::from_parts(parts);
this.flow.upgrade.as_ref().unwrap().call((req, framed))
}
Expand Down
Loading