Skip to content

Commit 68f97c5

Browse files
committed
refactor: put send_request into httpcore
1 parent 31ee50d commit 68f97c5

File tree

4 files changed

+403
-400
lines changed

4 files changed

+403
-400
lines changed

stackslib/src/net/httpcore.rs

Lines changed: 218 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717
/// This module binds the http library to Stacks as a `ProtocolFamily` implementation
1818
use std::collections::{BTreeMap, HashMap};
1919
use std::io::{Read, Write};
20-
use std::net::SocketAddr;
20+
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
21+
use std::time::{Duration, Instant};
2122
use std::{fmt, io, mem};
2223

2324
use clarity::vm::costs::ExecutionCost;
@@ -32,8 +33,8 @@ use stacks_common::types::chainstate::{
3233
use stacks_common::types::net::PeerHost;
3334
use stacks_common::types::Address;
3435
use stacks_common::util::chunked_encoding::*;
35-
use stacks_common::util::get_epoch_time_ms;
3636
use stacks_common::util::retry::{BoundReader, RetryReader};
37+
use stacks_common::util::{get_epoch_time_ms, get_epoch_time_secs};
3738
use url::Url;
3839

3940
use super::rpc::ConversationHttp;
@@ -43,7 +44,7 @@ use crate::chainstate::burn::BlockSnapshot;
4344
use crate::chainstate::nakamoto::NakamotoChainState;
4445
use crate::chainstate::stacks::db::{StacksChainState, StacksHeaderInfo};
4546
use crate::core::{MemPoolDB, StacksEpoch};
46-
use crate::net::connection::ConnectionOptions;
47+
use crate::net::connection::{ConnectionOptions, NetworkConnection};
4748
use crate::net::http::common::{parse_raw_bytes, HTTP_PREAMBLE_MAX_ENCODED_SIZE};
4849
use crate::net::http::{
4950
http_reason, parse_bytes, parse_json, Error as HttpError, HttpBadRequest, HttpContentType,
@@ -1764,3 +1765,217 @@ pub fn decode_request_path(path: &str) -> Result<(String, String), NetError> {
17641765
query_str.unwrap_or("").to_string(),
17651766
))
17661767
}
1768+
1769+
/// Convert a NetError into an io::Error if appropriate.
1770+
fn handle_net_error(e: NetError, msg: &str) -> io::Error {
1771+
if let NetError::ReadError(ioe) = e {
1772+
ioe
1773+
} else if let NetError::WriteError(ioe) = e {
1774+
ioe
1775+
} else if let NetError::RecvTimeout = e {
1776+
io::Error::new(io::ErrorKind::WouldBlock, "recv timeout")
1777+
} else {
1778+
io::Error::new(io::ErrorKind::Other, format!("{}: {:?}", &e, msg).as_str())
1779+
}
1780+
}
1781+
1782+
/// Send an HTTP request to the given host:port. Returns the decoded response.
1783+
/// Internally, this creates a socket, connects it, sends the HTTP request, and decodes the HTTP
1784+
/// response. It is a blocking operation.
1785+
///
1786+
/// If the request encounters a network error, then return an error. Don't retry.
1787+
/// If the request times out after `timeout`, then return an error.
1788+
pub fn send_http_request(
1789+
host: &str,
1790+
port: u16,
1791+
request: StacksHttpRequest,
1792+
timeout: Duration,
1793+
) -> Result<StacksHttpResponse, io::Error> {
1794+
// Find the host:port that works.
1795+
// This is sometimes necessary because `localhost` can resolve to both its ipv4 and ipv6
1796+
// addresses, but usually, Stacks services like event observers are only bound to ipv4
1797+
// addresses. So, be sure to use an address that will lead to a socket connection!
1798+
let mut stream_and_addr = None;
1799+
let mut last_err = None;
1800+
for addr in format!("{host}:{port}").to_socket_addrs()? {
1801+
debug!("send_request: connect to {}", &addr);
1802+
match TcpStream::connect_timeout(&addr, timeout) {
1803+
Ok(sock) => {
1804+
stream_and_addr = Some((sock, addr));
1805+
break;
1806+
}
1807+
Err(e) => {
1808+
last_err = Some(e);
1809+
}
1810+
}
1811+
}
1812+
1813+
let Some((mut stream, addr)) = stream_and_addr else {
1814+
return Err(last_err.unwrap_or(io::Error::new(
1815+
io::ErrorKind::Other,
1816+
"Unable to connect to {host}:{port}",
1817+
)));
1818+
};
1819+
1820+
stream.set_read_timeout(Some(timeout))?;
1821+
stream.set_write_timeout(Some(timeout))?;
1822+
stream.set_nodelay(true)?;
1823+
1824+
let start = Instant::now();
1825+
1826+
debug!("send_request: Sending request"; "request" => %request.request_path());
1827+
1828+
// Some explanation of what's going on here is in order.
1829+
//
1830+
// The networking stack in Stacks is designed to operate on non-blocking sockets, and
1831+
// furthermore, it operates in a way that the call site in which a network request is issued can
1832+
// be in a wholly separate stack (or thread) from the connection. While this is absolutely necessary
1833+
// within the Stacks node, using it to issue a single blocking request imposes a lot of
1834+
// overhead.
1835+
//
1836+
// First, we will create the network connection and give it a ProtocolFamily implementation
1837+
// (StacksHttp), which gets used by the connection to encode and deocde messages.
1838+
//
1839+
// Second, we'll create a _handle_ to the network connection into which we will write requests
1840+
// and read responses. The connection itself is an opaque black box that, internally,
1841+
// implements a state machine around the ProtocolFamily implementation to incrementally read
1842+
// ProtocolFamily messages from a Read, and write them to a Write. The Read + Write is
1843+
// (usually) a non-blocking socket; the network connection deals with EWOULDBLOCK internally,
1844+
// as well as underfull socket buffers.
1845+
//
1846+
// Third, we need to _drive_ data to the socket. We have to repeatedly (1) flush the network
1847+
// handle (which contains the buffered bytes from the message to be fed into the socket), and
1848+
// (2) drive bytes from the handle into the socket iself via the network connection. This is a
1849+
// two-step process mainly because the handle is expected to live in a separate stack (or even
1850+
// a separate thread).
1851+
//
1852+
// Fourth, we need to _drive_ data from the socket. We have to repeatedly (1) pull data from
1853+
// the socket into the network connection, and (2) drive parsed messages from the connection to
1854+
// the handle. Then, the call site that owns the handle simply polls the handle for new
1855+
// messages. Once we have received a message, we can proceed to handle it.
1856+
//
1857+
// Finally, we deal with the kind of HTTP message we got. If it's an error response, we convert
1858+
// it into an error. If it's a request (i.e. not a response), we also return an error. We
1859+
// only return the message if it was a well-formed non-error HTTP response.
1860+
1861+
// Step 1-2: set up the connection and request handle
1862+
// NOTE: we don't need anything special for connection options, so just use the default
1863+
let conn_opts = ConnectionOptions::default();
1864+
let http = StacksHttp::new_client(addr, &conn_opts);
1865+
let mut connection = NetworkConnection::new(http, &conn_opts, None);
1866+
let mut request_handle = connection
1867+
.make_request_handle(0, get_epoch_time_secs() + timeout.as_secs(), 0)
1868+
.map_err(|e| {
1869+
io::Error::new(
1870+
io::ErrorKind::Other,
1871+
format!("Failed to create request handle: {:?}", &e).as_str(),
1872+
)
1873+
})?;
1874+
1875+
// Step 3: load up the request with the message we're gonna send, and iteratively dump its
1876+
// bytes from the handle into the socket (the connection does internal buffering and
1877+
// bookkeeping to deal with the cases where we fail to fill the socket buffer, or we can't send
1878+
// anymore because the socket buffer is currently full).
1879+
request
1880+
.send(&mut request_handle)
1881+
.map_err(|e| handle_net_error(e, "Failed to serialize request body"))?;
1882+
1883+
debug!("send_request(sending data)");
1884+
loop {
1885+
let flushed = request_handle
1886+
.try_flush()
1887+
.map_err(|e| handle_net_error(e, "Failed to flush request body"))?;
1888+
1889+
// send it out
1890+
let num_sent = connection
1891+
.send_data(&mut stream)
1892+
.map_err(|e| handle_net_error(e, "Failed to send socket data"))?;
1893+
1894+
debug!(
1895+
"send_request(sending data): flushed = {}, num_sent = {}",
1896+
flushed, num_sent
1897+
);
1898+
if flushed && num_sent == 0 {
1899+
break;
1900+
}
1901+
1902+
if Instant::now().saturating_duration_since(start) > timeout {
1903+
return Err(io::Error::new(
1904+
io::ErrorKind::WouldBlock,
1905+
"Timed out while receiving request",
1906+
));
1907+
}
1908+
}
1909+
1910+
// Step 4: pull bytes from the socket back into the handle, and see if the connection decoded
1911+
// and dispatched any new messages to the request handle. If so, then extract the message and
1912+
// check that it's a well-formed HTTP response.
1913+
debug!("send_request(receiving data)");
1914+
let response;
1915+
loop {
1916+
// get back the reply
1917+
debug!("send_request(receiving data): try to receive data");
1918+
match connection.recv_data(&mut stream) {
1919+
Ok(nr) => {
1920+
debug!("send_request(receiving data): received {} bytes", nr);
1921+
}
1922+
Err(e) => {
1923+
return Err(handle_net_error(e, "Failed to receive socket data"));
1924+
}
1925+
}
1926+
1927+
// fullfill the request -- send it to its corresponding handle
1928+
debug!("send_request(receiving data): drain inbox");
1929+
connection.drain_inbox();
1930+
1931+
// see if we got a message that was fulfilled in our handle
1932+
debug!("send_request(receiving data): try receive response");
1933+
let rh = match request_handle.try_recv() {
1934+
Ok(resp) => {
1935+
response = resp;
1936+
break;
1937+
}
1938+
Err(e) => match e {
1939+
Ok(handle) => handle,
1940+
Err(e) => {
1941+
return Err(handle_net_error(
1942+
e,
1943+
"Failed to receive message after socket has been drained",
1944+
));
1945+
}
1946+
},
1947+
};
1948+
request_handle = rh;
1949+
1950+
if Instant::now().saturating_duration_since(start) > timeout {
1951+
return Err(io::Error::new(
1952+
io::ErrorKind::WouldBlock,
1953+
"Timed out while receiving request",
1954+
));
1955+
}
1956+
}
1957+
1958+
// Step 5: decode the HTTP message and return it if it's not an error.
1959+
let response_data = match response {
1960+
StacksHttpMessage::Response(response_data) => response_data,
1961+
StacksHttpMessage::Error(path, response) => {
1962+
return Err(io::Error::new(
1963+
io::ErrorKind::Other,
1964+
format!(
1965+
"Request did not succeed ({} != 200). Path: '{}'",
1966+
response.preamble().status_code,
1967+
&path
1968+
)
1969+
.as_str(),
1970+
));
1971+
}
1972+
_ => {
1973+
return Err(io::Error::new(
1974+
io::ErrorKind::Other,
1975+
"Did not receive an HTTP response",
1976+
));
1977+
}
1978+
};
1979+
1980+
Ok(response_data)
1981+
}

0 commit comments

Comments
 (0)