Skip to content

Commit e38678b

Browse files
apu031Apu Islam
andauthored
feat: implement HTTP/2 informational responses support (#865)
Add support for HTTP/2 informational responses (1xx status codes) including 103 Early Hints. This enables servers to send preliminary headers before the final response, improving client performance through early resource discovery and connection establishment. Changes include: - extend client and server APIs to handle interim informational responses - update stream state management for 1xx responses - add test for interim informational response scenarios This implementation follows RFC 7540 and RFC 8297 specifications for HTTP/2 informational responses handling. Co-authored-by: Apu Islam <apuislam@amazon.com>
1 parent 0fe6457 commit e38678b

File tree

7 files changed

+661
-14
lines changed

7 files changed

+661
-14
lines changed

src/client.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1485,6 +1485,22 @@ impl ResponseFuture {
14851485
pub fn stream_id(&self) -> crate::StreamId {
14861486
crate::StreamId::from_internal(self.inner.stream_id())
14871487
}
1488+
1489+
/// Polls for informational responses (1xx status codes).
1490+
///
1491+
/// This method should be called before polling the main response future
1492+
/// to check for any informational responses that have been received.
1493+
///
1494+
/// Returns `Poll::Ready(Some(response))` if an informational response is available,
1495+
/// `Poll::Ready(None)` if no more informational responses are expected,
1496+
/// or `Poll::Pending` if no informational response is currently available.
1497+
pub fn poll_informational(
1498+
&mut self,
1499+
cx: &mut Context<'_>,
1500+
) -> Poll<Option<Result<Response<()>, crate::Error>>> {
1501+
self.inner.poll_informational(cx).map_err(Into::into)
1502+
}
1503+
14881504
/// Returns a stream of PushPromises
14891505
///
14901506
/// # Panics

src/codec/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub enum UserError {
4949

5050
/// Tries to send push promise to peer who has disabled server push
5151
PeerDisabledServerPush,
52+
53+
/// Invalid status code for informational response (must be 1xx)
54+
InvalidInformationalStatusCode,
5255
}
5356

5457
// ===== impl SendError =====
@@ -97,6 +100,7 @@ impl fmt::Display for UserError {
97100
SendPingWhilePending => "send_ping before received previous pong",
98101
SendSettingsWhilePending => "sending SETTINGS before received previous ACK",
99102
PeerDisabledServerPush => "sending PUSH_PROMISE to peer who disabled server push",
103+
InvalidInformationalStatusCode => "invalid informational status code",
100104
})
101105
}
102106
}

src/proto/streams/recv.rs

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ pub(super) enum Event {
6666
Headers(peer::PollMessage),
6767
Data(Bytes),
6868
Trailers(HeaderMap),
69+
InformationalHeaders(peer::PollMessage),
6970
}
7071

7172
#[derive(Debug)]
@@ -264,6 +265,21 @@ impl Recv {
264265
// corresponding headers frame pushed to `stream.pending_recv`.
265266
self.pending_accept.push(stream);
266267
}
268+
} else {
269+
// This is an informational response (1xx status code)
270+
// Convert to response and store it for polling
271+
let message = counts
272+
.peer()
273+
.convert_poll_message(pseudo, fields, stream_id)?;
274+
275+
tracing::trace!("Received informational response: stream_id={:?}", stream_id);
276+
277+
// Push the informational response onto the stream's recv buffer
278+
// with a special event type so it can be polled separately
279+
stream
280+
.pending_recv
281+
.push_back(&mut self.buffer, Event::InformationalHeaders(message));
282+
stream.notify_recv();
267283
}
268284

269285
Ok(())
@@ -324,24 +340,63 @@ impl Recv {
324340
) -> Poll<Result<Response<()>, proto::Error>> {
325341
use super::peer::PollMessage::*;
326342

327-
// If the buffer is not empty, then the first frame must be a HEADERS
328-
// frame or the user violated the contract.
329-
match stream.pending_recv.pop_front(&mut self.buffer) {
330-
Some(Event::Headers(Client(response))) => Poll::Ready(Ok(response)),
331-
Some(_) => panic!("poll_response called after response returned"),
332-
None => {
333-
if !stream.state.ensure_recv_open()? {
334-
proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
335-
return Poll::Ready(Err(Error::library_reset(
336-
stream.id,
337-
Reason::PROTOCOL_ERROR,
338-
)));
343+
// Skip over any interim informational headers to find the main response
344+
loop {
345+
match stream.pending_recv.pop_front(&mut self.buffer) {
346+
Some(Event::Headers(Client(response))) => return Poll::Ready(Ok(response)),
347+
Some(Event::InformationalHeaders(_)) => {
348+
tracing::trace!("Skipping informational response in poll_response - should be consumed via poll_informational; stream_id={:?}", stream.id);
349+
continue;
339350
}
351+
Some(_) => panic!("poll_response called after response returned"),
352+
None => {
353+
if !stream.state.ensure_recv_open()? {
354+
proto_err!(stream: "poll_response: stream={:?} is not opened;", stream.id);
355+
return Poll::Ready(Err(Error::library_reset(
356+
stream.id,
357+
Reason::PROTOCOL_ERROR,
358+
)));
359+
}
340360

341-
stream.recv_task = Some(cx.waker().clone());
342-
Poll::Pending
361+
stream.recv_task = Some(cx.waker().clone());
362+
return Poll::Pending;
363+
}
364+
}
365+
}
366+
}
367+
368+
/// Called by the client to get informational responses (1xx status codes)
369+
pub fn poll_informational(
370+
&mut self,
371+
cx: &Context,
372+
stream: &mut store::Ptr,
373+
) -> Poll<Option<Result<Response<()>, proto::Error>>> {
374+
use super::peer::PollMessage::*;
375+
376+
// Try to pop the front event and check if it's an informational response
377+
// If it's not, we put it back
378+
if let Some(event) = stream.pending_recv.pop_front(&mut self.buffer) {
379+
match event {
380+
Event::InformationalHeaders(Client(response)) => {
381+
// Found an informational response, return it
382+
return Poll::Ready(Some(Ok(response)));
383+
}
384+
other => {
385+
// Not an informational response, put it back at the front
386+
stream.pending_recv.push_front(&mut self.buffer, other);
387+
}
343388
}
344389
}
390+
391+
// No informational response available at the front
392+
if stream.state.ensure_recv_open()? {
393+
// Request to get notified once more frames arrive
394+
stream.recv_task = Some(cx.waker().clone());
395+
Poll::Pending
396+
} else {
397+
// No more frames will be received
398+
Poll::Ready(None)
399+
}
345400
}
346401

347402
/// Transition the stream based on receiving trailers

src/proto/streams/send.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,38 @@ impl Send {
167167
Ok(())
168168
}
169169

170+
/// Send interim informational headers (1xx responses) without changing stream state.
171+
/// This allows multiple interim informational responses to be sent before the final response.
172+
pub fn send_interim_informational_headers<B>(
173+
&mut self,
174+
frame: frame::Headers,
175+
buffer: &mut Buffer<Frame<B>>,
176+
stream: &mut store::Ptr,
177+
_counts: &mut Counts,
178+
task: &mut Option<Waker>,
179+
) -> Result<(), UserError> {
180+
tracing::trace!(
181+
"send_interim_informational_headers; frame={:?}; stream_id={:?}",
182+
frame,
183+
frame.stream_id()
184+
);
185+
186+
// Validate headers
187+
Self::check_headers(frame.fields())?;
188+
189+
debug_assert!(frame.is_informational(),
190+
"Frame must be informational (1xx status code) at this point. Validation should happen at the public API boundary.");
191+
debug_assert!(!frame.is_end_stream(),
192+
"Informational frames must not have end_stream flag set. Validation should happen at the internal send informational header streams.");
193+
194+
// Queue the frame for sending WITHOUT changing stream state
195+
// This is the key difference from send_headers - we don't call stream.state.send_open()
196+
self.prioritize
197+
.queue_frame(frame.into(), buffer, stream, task);
198+
199+
Ok(())
200+
}
201+
170202
/// Send an explicit RST_STREAM frame
171203
pub fn send_reset<B>(
172204
&mut self,

src/proto/streams/streams.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,6 +1150,43 @@ impl<B> StreamRef<B> {
11501150
}
11511151
}
11521152

1153+
pub fn send_informational_headers(&mut self, frame: frame::Headers) -> Result<(), UserError> {
1154+
let mut me = self.opaque.inner.lock().unwrap();
1155+
let me = &mut *me;
1156+
1157+
let stream = me.store.resolve(self.opaque.key);
1158+
let actions = &mut me.actions;
1159+
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
1160+
let send_buffer = &mut *send_buffer;
1161+
1162+
me.counts.transition(stream, |counts, stream| {
1163+
// For informational responses (1xx), we need to send headers without
1164+
// changing the stream state. This allows multiple informational responses
1165+
// to be sent before the final response.
1166+
1167+
// Validate that this is actually an informational response
1168+
debug_assert!(
1169+
frame.is_informational(),
1170+
"Frame must be informational after conversion from informational response"
1171+
);
1172+
1173+
// Ensure the frame is not marked as end_stream for informational responses
1174+
if frame.is_end_stream() {
1175+
return Err(UserError::UnexpectedFrameType);
1176+
}
1177+
1178+
// Send the interim informational headers directly to the buffer without state changes
1179+
// This bypasses the normal send_headers flow that would transition the stream state
1180+
actions.send.send_interim_informational_headers(
1181+
frame,
1182+
send_buffer,
1183+
stream,
1184+
counts,
1185+
&mut actions.task,
1186+
)
1187+
})
1188+
}
1189+
11531190
pub fn send_response(
11541191
&mut self,
11551192
mut response: Response<()>,
@@ -1334,6 +1371,19 @@ impl OpaqueStreamRef {
13341371

13351372
me.actions.recv.poll_response(cx, &mut stream)
13361373
}
1374+
1375+
/// Called by a client to check for informational responses (1xx status codes)
1376+
pub fn poll_informational(
1377+
&mut self,
1378+
cx: &Context,
1379+
) -> Poll<Option<Result<Response<()>, proto::Error>>> {
1380+
let mut me = self.inner.lock().unwrap();
1381+
let me = &mut *me;
1382+
1383+
let mut stream = me.store.resolve(self.key);
1384+
1385+
me.actions.recv.poll_informational(cx, &mut stream)
1386+
}
13371387
/// Called by a client to check for a pushed request.
13381388
pub fn poll_pushed(
13391389
&mut self,

src/server.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,109 @@ impl Default for Builder {
11021102
// ===== impl SendResponse =====
11031103

11041104
impl<B: Buf> SendResponse<B> {
1105+
/// Send an interim informational response (1xx status codes)
1106+
///
1107+
/// This method can be called multiple times before calling `send_response()`
1108+
/// to send the final response. Only 1xx status codes are allowed.
1109+
///
1110+
/// Interim informational responses are used to provide early feedback to the client
1111+
/// before the final response is ready. Common examples include:
1112+
/// - 100 Continue: Indicates the client should continue with the request
1113+
/// - 103 Early Hints: Provides early hints about resources to preload
1114+
///
1115+
/// # Arguments
1116+
/// * `response` - HTTP response with 1xx status code and headers
1117+
///
1118+
/// # Returns
1119+
/// * `Ok(())` - Interim Informational response sent successfully
1120+
/// * `Err(Error)` - Failed to send (invalid status code, connection error, etc.)
1121+
///
1122+
/// # Examples
1123+
/// ```rust
1124+
/// use h2::server;
1125+
/// use http::{Response, StatusCode};
1126+
///
1127+
/// # async fn example(mut send_response: h2::server::SendResponse<bytes::Bytes>) -> Result<(), h2::Error> {
1128+
/// // Send 100 Continue before processing request body
1129+
/// let continue_response = Response::builder()
1130+
/// .status(StatusCode::CONTINUE)
1131+
/// .body(())
1132+
/// .unwrap();
1133+
/// send_response.send_informational(continue_response)?;
1134+
///
1135+
/// // Later send the final response
1136+
/// let final_response = Response::builder()
1137+
/// .status(StatusCode::OK)
1138+
/// .body(())
1139+
/// .unwrap();
1140+
/// let _stream = send_response.send_response(final_response, false)?;
1141+
/// # Ok(())
1142+
/// # }
1143+
/// ```
1144+
///
1145+
/// # Errors
1146+
/// This method will return an error if:
1147+
/// - The response status code is not in the 1xx range
1148+
/// - The final response has already been sent
1149+
/// - There is a connection-level error
1150+
pub fn send_informational(&mut self, response: Response<()>) -> Result<(), crate::Error> {
1151+
let stream_id = self.inner.stream_id();
1152+
let status = response.status();
1153+
1154+
tracing::trace!(
1155+
"send_informational called with status: {} on stream: {:?}",
1156+
status,
1157+
stream_id
1158+
);
1159+
1160+
// Validate that this is an informational response (1xx status code)
1161+
if !response.status().is_informational() {
1162+
tracing::trace!(
1163+
"invalid informational status code: {} on stream: {:?}",
1164+
status,
1165+
stream_id
1166+
);
1167+
return Err(crate::Error::from(
1168+
UserError::InvalidInformationalStatusCode,
1169+
));
1170+
}
1171+
1172+
tracing::trace!(
1173+
"converting informational response to HEADERS frame without END_STREAM flag for stream: {:?}",
1174+
stream_id
1175+
);
1176+
1177+
let frame = Peer::convert_send_message(
1178+
stream_id, response, false, // NOT end_of_stream for informational responses
1179+
);
1180+
1181+
tracing::trace!(
1182+
"sending interim informational headers frame for stream: {:?}",
1183+
stream_id
1184+
);
1185+
1186+
// Use the proper H2 streams API for sending interim informational headers
1187+
// This bypasses the normal response flow and allows multiple informational responses
1188+
let result = self
1189+
.inner
1190+
.send_informational_headers(frame)
1191+
.map_err(Into::into);
1192+
1193+
match &result {
1194+
Ok(()) => tracing::trace!(
1195+
"Successfully sent informational headers for stream: {:?}",
1196+
stream_id
1197+
),
1198+
Err(e) => tracing::trace!(
1199+
"Failed to send informational headers for stream: {:?}: {:?}",
1200+
stream_id,
1201+
e
1202+
),
1203+
}
1204+
1205+
result
1206+
}
1207+
11051208
/// Send a response to a client request.
11061209
///
11071210
/// On success, a [`SendStream`] instance is returned. This instance can be

0 commit comments

Comments
 (0)