diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index d2eba8c15..a7370bd1b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -194,6 +194,11 @@ jobs: steps: - uses: actions/checkout@v3 + - name: setup dependencies + uses: ./.github/actions/spin-ci-dependencies + with: + rust: true + # Install all the toolchain dependencies - name: Install Rust wasm target run: rustup target add wasm32-wasip1 wasm32-unknown-unknown diff --git a/Cargo.lock b/Cargo.lock index 05e7f7ba0..21d8ae9a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8431,6 +8431,7 @@ dependencies = [ "hyper-util", "reqwest 0.12.9", "rustls 0.23.18", + "serde", "spin-factor-outbound-networking", "spin-factor-variables", "spin-factors", @@ -8439,6 +8440,7 @@ dependencies = [ "spin-world", "tokio", "tokio-rustls 0.26.0", + "tower-service", "tracing", "wasmtime", "wasmtime-wasi", diff --git a/Cargo.toml b/Cargo.toml index feaa76d38..51065b44e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,6 +164,7 @@ tokio = "1" tokio-rustls = { version = "0.26", default-features = false, features = ["logging", "tls12"] } toml = "0.8" toml_edit = "0.22" +tower-service = "0.3.3" tracing = { version = "0.1.41", features = ["log"] } url = "2" walkdir = "2" @@ -186,4 +187,4 @@ blocks_in_conditions = "allow" [[bin]] name = "spin" -path = "src/bin/spin.rs" \ No newline at end of file +path = "src/bin/spin.rs" diff --git a/crates/factor-outbound-http/Cargo.toml b/crates/factor-outbound-http/Cargo.toml index 3bab71f01..2dba3e9f1 100644 --- a/crates/factor-outbound-http/Cargo.toml +++ b/crates/factor-outbound-http/Cargo.toml @@ -13,12 +13,14 @@ hyper = { workspace = true } hyper-util = { workspace = true } reqwest = { workspace = true, features = ["gzip"] } rustls = { workspace = true } +serde = { workspace = true } spin-factor-outbound-networking = { path = "../factor-outbound-networking" } spin-factors = { path = "../factors" } spin-telemetry = { path = "../telemetry" } spin-world = { path = "../world" } tokio = { workspace = true, features = ["macros", "rt", "net"] } tokio-rustls = { workspace = true } +tower-service = { workspace = true } tracing = { workspace = true } wasmtime = { workspace = true } wasmtime-wasi = { workspace = true } @@ -28,5 +30,10 @@ wasmtime-wasi-http = { workspace = true } spin-factor-variables = { path = "../factor-variables" } spin-factors-test = { path = "../factors-test" } +[features] +default = ["spin-cli"] +# Includes the runtime configuration handling used by the Spin CLI +spin-cli = [] + [lints] workspace = true diff --git a/crates/factor-outbound-http/src/lib.rs b/crates/factor-outbound-http/src/lib.rs index ea270e5de..0170123b1 100644 --- a/crates/factor-outbound-http/src/lib.rs +++ b/crates/factor-outbound-http/src/lib.rs @@ -1,4 +1,5 @@ pub mod intercept; +pub mod runtime_config; mod spin; mod wasi; pub mod wasi_2023_10_18; @@ -12,6 +13,7 @@ use http::{ HeaderValue, Uri, }; use intercept::OutboundHttpInterceptor; +use runtime_config::RuntimeConfig; use spin_factor_outbound_networking::{ config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks}, ComponentTlsClientConfigs, OutboundNetworkingFactor, @@ -34,8 +36,8 @@ pub struct OutboundHttpFactor { } impl Factor for OutboundHttpFactor { - type RuntimeConfig = (); - type AppState = (); + type RuntimeConfig = RuntimeConfig; + type AppState = AppState; type InstanceBuilder = InstanceState; fn init(&mut self, ctx: &mut impl spin_factors::InitContext) -> anyhow::Result<()> { @@ -46,9 +48,14 @@ impl Factor for OutboundHttpFactor { fn configure_app( &self, - _ctx: ConfigureAppContext, + mut ctx: ConfigureAppContext, ) -> anyhow::Result { - Ok(()) + Ok(AppState { + connection_pooling: ctx + .take_runtime_config() + .unwrap_or_default() + .connection_pooling, + }) } fn prepare( @@ -67,6 +74,8 @@ impl Factor for OutboundHttpFactor { self_request_origin: None, request_interceptor: None, spin_http_client: None, + wasi_http_clients: None, + connection_pooling: ctx.app_state().connection_pooling, }) } } @@ -80,6 +89,9 @@ pub struct InstanceState { request_interceptor: Option>, // Connection-pooling client for 'fermyon:spin/http' interface spin_http_client: Option, + // Connection pooling client for `wasi:http/outgoing-handler` interface + wasi_http_clients: Option, + connection_pooling: bool, } impl InstanceState { @@ -157,3 +169,7 @@ impl std::fmt::Display for SelfRequestOrigin { write!(f, "{}://{}", self.scheme, self.authority) } } + +pub struct AppState { + connection_pooling: bool, +} diff --git a/crates/factor-outbound-http/src/runtime_config.rs b/crates/factor-outbound-http/src/runtime_config.rs new file mode 100644 index 000000000..5c2b5b3a6 --- /dev/null +++ b/crates/factor-outbound-http/src/runtime_config.rs @@ -0,0 +1,17 @@ +#[cfg(feature = "spin-cli")] +pub mod spin; + +/// Runtime configuration for outbound HTTP. +#[derive(Debug)] +pub struct RuntimeConfig { + /// If true, enable connection pooling and reuse. + pub connection_pooling: bool, +} + +impl Default for RuntimeConfig { + fn default() -> Self { + Self { + connection_pooling: true, + } + } +} diff --git a/crates/factor-outbound-http/src/runtime_config/spin.rs b/crates/factor-outbound-http/src/runtime_config/spin.rs new file mode 100644 index 000000000..65aa483b7 --- /dev/null +++ b/crates/factor-outbound-http/src/runtime_config/spin.rs @@ -0,0 +1,31 @@ +use serde::Deserialize; +use spin_factors::runtime_config::toml::GetTomlValue; + +/// Get the runtime configuration for outbound HTTP from a TOML table. +/// +/// Expects table to be in the format: +/// ```toml +/// [outbound_http] +/// connection_pooling = true +/// ``` +pub fn config_from_table( + table: &impl GetTomlValue, +) -> anyhow::Result> { + if let Some(outbound_http) = table.get("outbound_http") { + Ok(Some(super::RuntimeConfig { + connection_pooling: outbound_http + .clone() + .try_into::()? + .connection_pooling, + })) + } else { + Ok(None) + } +} + +#[derive(Debug, Default, Deserialize)] +#[serde(deny_unknown_fields)] +struct OutboundHttpToml { + #[serde(default)] + connection_pooling: bool, +} diff --git a/crates/factor-outbound-http/src/spin.rs b/crates/factor-outbound-http/src/spin.rs index a161affd4..2f8fc428f 100644 --- a/crates/factor-outbound-http/src/spin.rs +++ b/crates/factor-outbound-http/src/spin.rs @@ -90,7 +90,13 @@ impl spin_http::Host for crate::InstanceState { // Allow reuse of Client's internal connection pool for multiple requests // in a single component execution - let client = self.spin_http_client.get_or_insert_with(Default::default); + let client = self.spin_http_client.get_or_insert_with(|| { + let mut builder = reqwest::Client::builder(); + if !self.connection_pooling { + builder = builder.pool_max_idle_per_host(0); + } + builder.build().unwrap() + }); let resp = client.execute(req).await.map_err(log_reqwest_error)?; diff --git a/crates/factor-outbound-http/src/wasi.rs b/crates/factor-outbound-http/src/wasi.rs index 1569d753c..4d16b8301 100644 --- a/crates/factor-outbound-http/src/wasi.rs +++ b/crates/factor-outbound-http/src/wasi.rs @@ -1,23 +1,41 @@ -use std::{error::Error, future::Future, pin::Pin, sync::Arc, time::Duration}; +use std::{ + error::Error, + future::Future, + io::IoSlice, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; -use anyhow::Context; -use bytes::Bytes; -use http::{header::HOST, Request}; -use http_body_util::{combinators::BoxBody, BodyExt}; -use hyper_util::rt::TokioExecutor; +use anyhow::Context as _; +use http::{header::HOST, Request, Uri}; +use http_body_util::BodyExt; +use hyper_util::{ + client::legacy::{ + connect::{Connected, Connection}, + Client, + }, + rt::{TokioExecutor, TokioIo}, +}; use spin_factor_outbound_networking::{ config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks}, ComponentTlsClientConfigs, TlsClientConfig, }; use spin_factors::{wasmtime::component::ResourceTable, RuntimeFactorsInstanceState}; -use tokio::{net::TcpStream, time::timeout}; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + net::TcpStream, + time::timeout, +}; +use tokio_rustls::client::TlsStream; +use tower_service::Service; use tracing::{field::Empty, instrument, Instrument}; use wasmtime::component::HasData; use wasmtime_wasi::p2::{IoImpl, IoView}; use wasmtime_wasi_http::{ bindings::http::types::ErrorCode, body::HyperOutgoingBody, - io::TokioIo, types::{HostFutureIncomingResponse, IncomingResponse}, WasiHttpCtx, WasiHttpImpl, WasiHttpView, }; @@ -70,6 +88,19 @@ impl OutboundHttpFactor { } } +type HttpClient = Client; +type HttpsClient = Client; + +#[derive(Clone)] +pub(super) struct HttpClients { + /// Used for non-TLS HTTP/1 connections. + http1: HttpClient, + /// Used for non-TLS HTTP/2 connections (e.g. when h2 prior knowledge is available). + http2: HttpClient, + /// Used for HTTP-over-TLS connections, using ALPN to negotiate the HTTP version. + https: HttpsClient, +} + pub(crate) struct WasiHttpImplInner<'a> { state: &'a mut InstanceState, table: &'a mut ResourceTable, @@ -104,6 +135,25 @@ impl WasiHttpView for WasiHttpImplInner<'_> { request: Request, config: wasmtime_wasi_http::types::OutgoingRequestConfig, ) -> wasmtime_wasi_http::HttpResult { + let connection_pooling = self.state.connection_pooling; + let builder = move || { + let mut builder = Client::builder(TokioExecutor::new()); + if !connection_pooling { + builder.pool_max_idle_per_host(0); + } + builder + }; + + let http_clients = self + .state + .wasi_http_clients + .get_or_insert_with(|| HttpClients { + http1: builder().build(HttpConnector), + http2: builder().http2_only(true).build(HttpConnector), + https: builder().build(HttpsConnector), + }) + .clone(); + Ok(HostFutureIncomingResponse::Pending( wasmtime_wasi::runtime::spawn( send_request_impl( @@ -114,6 +164,7 @@ impl WasiHttpView for WasiHttpImplInner<'_> { self.state.request_interceptor.clone(), self.state.self_request_origin.clone(), self.state.blocked_networks.clone(), + http_clients, ) .in_current_span(), ), @@ -121,6 +172,190 @@ impl WasiHttpView for WasiHttpImplInner<'_> { } } +#[derive(Clone)] +struct ConnectOptions { + blocked_networks: BlockedNetworks, + connect_timeout: Duration, +} + +// We must use task-local variables for these config options when using +// `hyper_util::client::legacy::Client::request` because there's no way to plumb +// them through as parameters. Moreover, if there's already a pooled connection +// ready, we'll reuse that and ignore these options anyway. +tokio::task_local! { + static CONNECT_OPTIONS: ConnectOptions; + static TLS_CLIENT_CONFIG: TlsClientConfig; +} + +async fn connect_tcp(uri: Uri, default_port: u16) -> Result<(TcpStream, String), ErrorCode> { + let authority_str = if let Some(authority) = uri.authority() { + if authority.port().is_some() { + authority.to_string() + } else { + format!("{authority}:{default_port}") + } + } else { + return Err(ErrorCode::HttpRequestUriInvalid); + }; + + let ConnectOptions { + blocked_networks, + connect_timeout, + } = CONNECT_OPTIONS.get(); + + let mut socket_addrs = tokio::net::lookup_host(&authority_str) + .await + .map_err(|_| dns_error("address not available".into(), 0))? + .collect::>(); + + // Remove blocked IPs + let blocked_addrs = blocked_networks.remove_blocked(&mut socket_addrs); + if socket_addrs.is_empty() && !blocked_addrs.is_empty() { + tracing::error!( + "error.type" = "destination_ip_prohibited", + ?blocked_addrs, + "all destination IP(s) prohibited by runtime config" + ); + return Err(ErrorCode::DestinationIpProhibited); + } + + Ok(( + timeout(connect_timeout, TcpStream::connect(socket_addrs.as_slice())) + .await + .map_err(|_| ErrorCode::ConnectionTimeout)? + .map_err(|err| match err.kind() { + std::io::ErrorKind::AddrNotAvailable => { + dns_error("address not available".into(), 0) + } + _ => ErrorCode::ConnectionRefused, + })?, + authority_str, + )) +} + +#[derive(Clone)] +struct HttpConnector; + +impl HttpConnector { + async fn connect(uri: Uri) -> Result, ErrorCode> { + Ok(TokioIo::new(connect_tcp(uri, 80).await?.0)) + } +} + +impl Service for HttpConnector { + type Response = TokioIo; + type Error = ErrorCode; + type Future = Pin, ErrorCode>> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, uri: Uri) -> Self::Future { + Box::pin(async move { Self::connect(uri).await }) + } +} + +struct RustlsStream(TlsStream); + +impl Connection for RustlsStream { + fn connected(&self) -> Connected { + if self.0.get_ref().1.alpn_protocol() == Some(b"h2") { + self.0.get_ref().0.connected().negotiated_h2() + } else { + self.0.get_ref().0.connected() + } + } +} + +impl AsyncRead for RustlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_read(cx, buf) + } +} + +impl AsyncWrite for RustlsStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.get_mut().0).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } +} + +#[derive(Clone)] +struct HttpsConnector; + +impl HttpsConnector { + async fn connect(uri: Uri) -> Result, ErrorCode> { + use rustls::pki_types::ServerName; + + let (tcp_stream, authority_str) = connect_tcp(uri, 443).await?; + + let mut tls_client_config = (*TLS_CLIENT_CONFIG.get()).clone(); + tls_client_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_client_config)); + let mut parts = authority_str.split(':'); + let host = parts.next().unwrap_or(&authority_str); + let domain = ServerName::try_from(host) + .map_err(|e| { + tracing::warn!("dns lookup error: {e:?}"); + dns_error("invalid dns name".to_string(), 0) + })? + .to_owned(); + let stream = connector.connect(domain, tcp_stream).await.map_err(|e| { + tracing::warn!("tls protocol error: {e:?}"); + ErrorCode::TlsProtocolError + })?; + + Ok(TokioIo::new(RustlsStream(stream))) + } +} + +impl Service for HttpsConnector { + type Response = TokioIo; + type Error = ErrorCode; + type Future = Pin, ErrorCode>> + Send>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, uri: Uri) -> Self::Future { + Box::pin(async move { Self::connect(uri).await }) + } +} + +#[allow(clippy::too_many_arguments)] async fn send_request_impl( mut request: Request, mut config: wasmtime_wasi_http::types::OutgoingRequestConfig, @@ -129,6 +364,7 @@ async fn send_request_impl( request_interceptor: Option>, self_request_origin: Option, blocked_networks: BlockedNetworks, + http_clients: HttpClients, ) -> anyhow::Result> { // wasmtime-wasi-http fills in scheme and authority for relative URLs // (e.g. https://:443/), which makes them hard to reason about. @@ -212,12 +448,16 @@ async fn send_request_impl( span.record("server.port", port.as_u16()); } - Ok(send_request_handler(request, config, tls_client_config, blocked_networks).await) + Ok(send_request_handler( + request, + config, + tls_client_config, + blocked_networks, + http_clients, + ) + .await) } -/// This is a fork of wasmtime_wasi_http::default_send_request_handler function -/// forked from bytecodealliance/wasmtime commit-sha 29a76b68200fcfa69c8fb18ce6c850754279a05b -/// This fork provides the ability to configure client cert auth for mTLS async fn send_request_handler( mut request: http::Request, wasmtime_wasi_http::types::OutgoingRequestConfig { @@ -228,212 +468,73 @@ async fn send_request_handler( }: wasmtime_wasi_http::types::OutgoingRequestConfig, tls_client_config: TlsClientConfig, blocked_networks: BlockedNetworks, + http_clients: HttpClients, ) -> Result { - let authority_str = if let Some(authority) = request.uri().authority() { - if authority.port().is_some() { - authority.to_string() - } else { - let port = if use_tls { 443 } else { 80 }; - format!("{authority}:{port}") - } - } else { - return Err(ErrorCode::HttpRequestUriInvalid); - }; - - // Resolve the authority to IP addresses - let mut socket_addrs = tokio::net::lookup_host(&authority_str) - .await - .map_err(|_| dns_error("address not available".into(), 0))? - .collect::>(); - - // Remove blocked IPs - let blocked_addrs = blocked_networks.remove_blocked(&mut socket_addrs); - if socket_addrs.is_empty() && !blocked_addrs.is_empty() { - tracing::error!( - "error.type" = "destination_ip_prohibited", - ?blocked_addrs, - "all destination IP(s) prohibited by runtime config" - ); - return Err(ErrorCode::DestinationIpProhibited); - } - - let tcp_stream = timeout(connect_timeout, TcpStream::connect(socket_addrs.as_slice())) - .await - .map_err(|_| ErrorCode::ConnectionTimeout)? - .map_err(|err| match err.kind() { - std::io::ErrorKind::AddrNotAvailable => dns_error("address not available".into(), 0), - _ => ErrorCode::ConnectionRefused, - })?; - - let (mut sender, worker, is_http2) = if use_tls { - #[cfg(any(target_arch = "riscv64", target_arch = "s390x"))] - { - return Err(ErrorCode::InternalError(Some( - "unsupported architecture for SSL".to_string(), - ))); - } + // Some servers (looking at you nginx) don't like a host header even though + // http/2 allows it: https://github.com/hyperium/hyper/issues/3298 + request.headers_mut().remove(HOST); - #[cfg(not(any(target_arch = "riscv64", target_arch = "s390x")))] - { - use rustls::pki_types::ServerName; - - let mut tls_client_config = (*tls_client_config).clone(); - tls_client_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - - let connector = tokio_rustls::TlsConnector::from(Arc::new(tls_client_config)); - let mut parts = authority_str.split(':'); - let host = parts.next().unwrap_or(&authority_str); - let domain = ServerName::try_from(host) - .map_err(|e| { - tracing::warn!("dns lookup error: {e:?}"); - dns_error("invalid dns name".to_string(), 0) - })? - .to_owned(); - let stream = connector.connect(domain, tcp_stream).await.map_err(|e| { - tracing::warn!("tls protocol error: {e:?}"); - ErrorCode::TlsProtocolError - })?; - - let is_http2 = stream.get_ref().1.alpn_protocol() == Some(b"h2"); - - let stream = TokioIo::new(stream); - - let (sender, conn) = new_sender_and_conn(stream, is_http2, connect_timeout).await?; - - let worker = wasmtime_wasi::runtime::spawn(async move { - match conn.await { - Ok(()) => {} - // TODO: shouldn't throw away this error and ideally should - // surface somewhere. - Err(e) => tracing::warn!("dropping error {e}"), + let resp = CONNECT_OPTIONS.scope( + ConnectOptions { + blocked_networks, + connect_timeout, + }, + async move { + if use_tls { + TLS_CLIENT_CONFIG + .scope(tls_client_config, async move { + http_clients.https.request(request).await + }) + .await + } else { + let use_http2 = + std::env::var_os("SPIN_OUTBOUND_H2C_PRIOR_KNOWLEDGE").is_some_and(|v| { + request + .uri() + .authority() + .is_some_and(|authority| authority.as_str() == v) + }); + + if use_http2 { + http_clients.http2.request(request).await + } else { + http_clients.http1.request(request).await } - }); - - (sender, worker, is_http2) - } - } else { - let tcp_stream = TokioIo::new(tcp_stream); - - let is_http2 = std::env::var_os("SPIN_OUTBOUND_H2C_PRIOR_KNOWLEDGE").is_some_and(|v| { - request - .uri() - .authority() - .is_some_and(|authority| authority.as_str() == v) - }); - - let (sender, conn) = new_sender_and_conn(tcp_stream, is_http2, connect_timeout).await?; - - let worker = wasmtime_wasi::runtime::spawn(async move { - match conn.await { - Ok(()) => {} - // TODO: same as above, shouldn't throw this error away. - Err(e) => tracing::warn!("dropping error {e}"), } - }); - - (sender, worker, is_http2) - }; + }, + ); - if is_http2 { - // Some servers (looking at you nginx) don't like a host header even though - // http/2 allows it: https://github.com/hyperium/hyper/issues/3298 - request.headers_mut().remove(HOST); - } else { - // at this point, the request contains the scheme and the authority, but - // the http packet should only include those if addressing a proxy, so - // remove them here, since SendRequest::send_request does not do it for us - *request.uri_mut() = http::Uri::builder() - .path_and_query( - request - .uri() - .path_and_query() - .map(|p| p.as_str()) - .unwrap_or("/"), - ) - .build() - .expect("comes from valid request"); - } - - let resp = timeout(first_byte_timeout, sender.send_request(request)) + let resp = timeout(first_byte_timeout, resp) .await .map_err(|_| ErrorCode::ConnectionReadTimeout)? - .map_err(hyper_request_error)? + .map_err(hyper_legacy_request_error)? .map(|body| body.map_err(hyper_request_error).boxed()); tracing::Span::current().record("http.response.status_code", resp.status().as_u16()); Ok(wasmtime_wasi_http::types::IncomingResponse { resp, - worker: Some(worker), + worker: None, between_bytes_timeout, }) } -async fn new_sender_and_conn( - stream: T, - is_http2: bool, - connect_timeout: Duration, -) -> Result<(HttpSender, HttpConn), ErrorCode> { - if is_http2 { - timeout( - connect_timeout, - hyper::client::conn::http2::handshake(TokioExecutor::default(), stream), - ) - .await - .map_err(|_| ErrorCode::ConnectionTimeout)? - .map_err(hyper_request_error) - .map(|(sender, conn)| (HttpSender::Http2(sender), HttpConn::Http2(conn))) - } else { - timeout( - connect_timeout, - hyper::client::conn::http1::handshake(stream), - ) - .await - .map_err(|_| ErrorCode::ConnectionTimeout)? - .map_err(hyper_request_error) - .map(|(sender, conn)| (HttpSender::Http1(sender), HttpConn::Http1(conn))) - } -} - -enum HttpSender { - Http1(hyper::client::conn::http1::SendRequest>), - Http2(hyper::client::conn::http2::SendRequest>), -} - -#[allow(clippy::large_enum_variant)] -enum HttpConn { - Http1(hyper::client::conn::http1::Connection>), - Http2(hyper::client::conn::http2::Connection, TokioExecutor>), -} - -impl Future for HttpConn { - type Output = Result<(), hyper::Error>; - - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - match self.get_mut() { - HttpConn::Http1(conn) => Pin::new(conn).poll(cx), - HttpConn::Http2(conn) => Pin::new(conn).poll(cx), +/// Translate a [`hyper::Error`] to a wasi-http `ErrorCode` in the context of a request. +fn hyper_request_error(err: hyper::Error) -> ErrorCode { + // If there's a source, we might be able to extract a wasi-http error from it. + if let Some(cause) = err.source() { + if let Some(err) = cause.downcast_ref::() { + return err.clone(); } } -} -impl HttpSender { - async fn send_request( - &mut self, - request: http::Request>, - ) -> Result, hyper::Error> { - match self { - HttpSender::Http1(sender) => sender.send_request(request).await, - HttpSender::Http2(sender) => sender.send_request(request).await, - } - } + tracing::warn!("hyper request error: {err:?}"); + + ErrorCode::HttpProtocolError } -/// Translate a [`hyper::Error`] to a wasi-http `ErrorCode` in the context of a request. -fn hyper_request_error(err: hyper::Error) -> ErrorCode { +/// Translate a [`hyper_util::client::legacy::Error`] to a wasi-http `ErrorCode` in the context of a request. +fn hyper_legacy_request_error(err: hyper_util::client::legacy::Error) -> ErrorCode { // If there's a source, we might be able to extract a wasi-http error from it. if let Some(cause) = err.source() { if let Some(err) = cause.downcast_ref::() { diff --git a/crates/runtime-config/src/lib.rs b/crates/runtime-config/src/lib.rs index ee81d930c..d8bbc49a1 100644 --- a/crates/runtime-config/src/lib.rs +++ b/crates/runtime-config/src/lib.rs @@ -379,8 +379,10 @@ impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<'_, '_> { } impl FactorRuntimeConfigSource for TomlRuntimeConfigSource<'_, '_> { - fn get_runtime_config(&mut self) -> anyhow::Result> { - Ok(None) + fn get_runtime_config( + &mut self, + ) -> anyhow::Result::RuntimeConfig>> { + spin_factor_outbound_http::runtime_config::spin::config_from_table(&self.toml.table) } } diff --git a/examples/spin-timer/Cargo.lock b/examples/spin-timer/Cargo.lock index ec073d9b6..94a0f8472 100644 --- a/examples/spin-timer/Cargo.lock +++ b/examples/spin-timer/Cargo.lock @@ -17,6 +17,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -71,6 +82,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-channel" version = "1.9.0" @@ -816,6 +833,18 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -838,6 +867,29 @@ dependencies = [ "piper", ] +[[package]] +name = "borsh" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8646f98db542e39fc66e68a20b2144f6a732636df7c2354e74645faaa433ce" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd1d3c0c2f5833f22386f252fe8ed005c7f59fdcddeef025c01b4c3b9fd9ac3" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "btoi" version = "0.4.3" @@ -856,6 +908,28 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -1719,6 +1793,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.31" @@ -1980,6 +2060,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -3457,6 +3540,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "postgres_range" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6dce28dc5ba143d8eb157b62aac01ae5a1c585c40792158b720e86a87642101" +dependencies = [ + "postgres-protocol", + "postgres-types", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -3472,6 +3565,15 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-crate" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edce586971a4dfaa28950c6f18ed55e0406c1ab88bbce2c6f6293a7aaba73d35" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3551,6 +3653,26 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pulley-interpreter" version = "35.0.0" @@ -3643,6 +3765,12 @@ version = "5.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.7.3" @@ -3917,6 +4045,15 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + [[package]] name = "reqwest" version = "0.12.15" @@ -3984,6 +4121,35 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rkyv" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "rumqttc" version = "0.24.0" @@ -4017,6 +4183,23 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rust_decimal" +version = "1.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b203a6425500a03e0919c42d3c47caca51e79f1132046626d2c8871c5092035d" +dependencies = [ + "arrayvec", + "borsh", + "bytes", + "num-traits", + "postgres-types", + "rand 0.8.5", + "rkyv", + "serde", + "serde_json", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -4319,6 +4502,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "security-framework" version = "2.11.1" @@ -4488,6 +4677,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "siphasher" version = "1.0.1" @@ -4667,6 +4862,7 @@ dependencies = [ "spin-world", "tokio", "tokio-rustls 0.26.2", + "tower-service", "tracing", "wasmtime", "wasmtime-wasi", @@ -4732,11 +4928,14 @@ name = "spin-factor-outbound-pg" version = "3.4.0-pre0" dependencies = [ "anyhow", + "bytes", "chrono", "deadpool-postgres", "moka", "native-tls", "postgres-native-tls", + "postgres_range", + "rust_decimal", "serde_json", "spin-core", "spin-factor-outbound-networking", @@ -5004,6 +5203,7 @@ dependencies = [ "spin-factors-executor", "spin-runtime-config", "spin-trigger", + "spin-variables-static", "terminal", "tracing", ] @@ -5130,8 +5330,11 @@ name = "spin-variables-static" version = "3.4.0-pre0" dependencies = [ "serde", + "serde_json", + "spin-common", "spin-expressions", "spin-factors", + "toml", ] [[package]] @@ -5289,6 +5492,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "target-lexicon" version = "0.13.2" @@ -7089,6 +7298,15 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "xmlparser" version = "0.13.6"