Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/+raw-tcp-ports.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added `feature.network.incoming.raw_tcp_ports` for raw TCP steal ports that should bypass HTTP detection and TLS handling.
14 changes: 14 additions & 0 deletions mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2404,6 +2404,20 @@
"minimum": 0
}
},
"raw_tcp_ports": {
"title": "raw_tcp_ports",
"description": "Ports to steal as raw TCP. Incoming connections on these ports bypass HTTP detection and\nTLS handling, so they are forwarded immediately to the local application.",
"type": [
"array",
"null"
],
"items": {
"type": "integer",
"format": "uint16",
"maximum": 65535,
"minimum": 0
}
},
"tls_delivery": {
"title": "tls_delivery",
"description": "(Operator Only): configures how mirrord delivers stolen TLS traffic\nto the local application.",
Expand Down
9 changes: 9 additions & 0 deletions mirrord/agent/src/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ pub use steal_handle::{StealHandle, StolenTraffic};
pub use task::{RedirectorTask, RedirectorTaskConfig};
use tokio::net::TcpStream;

/// Port-wide handling mode for redirected incoming connections.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum IncomingPortMode {
/// Use the normal incoming path: apply configured TLS handling and detect HTTP traffic.
Detect,
/// Forward bytes immediately as raw TCP, bypassing HTTP detection and TLS handling.
RawTcp,
}

/// A component that implements redirecting incoming TCP connections.
pub trait PortRedirector {
type Error: Sized;
Expand Down
26 changes: 26 additions & 0 deletions mirrord/agent/src/incoming/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,32 @@ impl MaybeHttp {
/// Timeout for detemining if the redirected connection is HTTP.
pub const HTTP_DETECTION_TIMEOUT: Duration = Duration::from_secs(10);

/// Wraps the given redirected connection as raw TCP.
///
/// Used when the port is known to carry raw TCP traffic, avoiding the detection timeout
/// that would stall server-first protocols (e.g. SMTP, FTP).
pub fn accept_raw_tcp(redirected: Redirected) -> Result<Self, HttpDetectError> {
let metric_guard = MetricGuard::new(&REDIRECTED_CONNECTIONS);
let local_addr = redirected
.stream
.local_addr()
.map_err(HttpDetectError::LocalAddr)?;

Ok(Self {
info: ConnectionInfo {
original_destination: redirected.destination,
local_addr,
peer_addr: redirected.source,
tls_connector: None,
},
http_version: None,
stream: Box::new(IncomingIoWrapper {
io: redirected.stream,
_metric_guard: metric_guard,
}),
})
}

/// Accepts the (possibly TLS) connection and detects if the redirected connection is
/// HTTP.
pub async fn detect(
Expand Down
13 changes: 11 additions & 2 deletions mirrord/agent/src/incoming/steal_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tokio_stream::{StreamMap, StreamNotifyClose, wrappers::ReceiverStream};
use tokio_util::sync::CancellationToken;

use super::{
IncomingPortMode,
connection::{ConnectionInfo, http::RedirectedHttp, tcp::RedirectedTcp},
error::RedirectorTaskError,
task::{RedirectRequest, TaskError},
Expand Down Expand Up @@ -47,15 +48,23 @@ impl StealHandle {
/// If this method returns [`Ok`], it means that the port redirection
/// was done in the [`RedirectorTask`](super::RedirectorTask),
/// and incoming connections are now being stolen.
pub async fn steal(&mut self, port: u16) -> Result<(), RedirectorTaskError> {
pub async fn steal(
&mut self,
port: u16,
mode: IncomingPortMode,
) -> Result<(), RedirectorTaskError> {
if self.stolen_ports.contains_key(&port) {
return Ok(());
};

let (receiver_tx, receiver_rx) = oneshot::channel();
if self
.message_tx
.send(RedirectRequest::Steal { port, receiver_tx })
.send(RedirectRequest::Steal {
port,
mode,
receiver_tx,
})
.await
.is_err()
{
Expand Down
138 changes: 122 additions & 16 deletions mirrord/agent/src/incoming/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio_util::sync::CancellationToken;
use tracing::Level;

use super::{
PortRedirector, Redirected,
IncomingPortMode, PortRedirector, Redirected,
connection::{ConnectionInfo, MaybeHttp, http::RedirectedHttp, tcp::RedirectedTcp},
error::RedirectorTaskError,
steal_handle::{StealHandle, StolenTraffic},
Expand Down Expand Up @@ -173,17 +173,21 @@ where
let tx = self.internal_tx.clone();
let tls_store = self.tls_store.clone();
let shutdown = state.shutdown.child_token();
let port_mode = state.mode;
Self::spawn_tracked_connection(
self.internal_tx.clone(),
destination.port(),
state,
async move {
let detection_result = tokio::select! {
r = MaybeHttp::detect(conn, &tls_store) => r,
_ = shutdown.cancelled() => {
tracing::debug!("Shutting down redirected connection during HTTP detection");
return;
}
let detection_result = match port_mode {
IncomingPortMode::Detect => tokio::select! {
r = MaybeHttp::detect(conn, &tls_store) => r,
_ = shutdown.cancelled() => {
tracing::debug!("Shutting down redirected connection during HTTP detection");
return;
}
},
IncomingPortMode::RawTcp => MaybeHttp::accept_raw_tcp(conn),
};

match detection_result {
Expand Down Expand Up @@ -397,10 +401,13 @@ where
mirror_txs: vec![conn_tx.clone()],
shutdown: Default::default(),
connections: Default::default(),
mode: IncomingPortMode::Detect,
});
}
Entry::Occupied(mut e) => {
e.get_mut().mirror_txs.push(conn_tx.clone());
let state = e.get_mut();
state.warn_on_mode_conflict(port, IncomingPortMode::Detect, "mirroring");
state.mirror_txs.push(conn_tx.clone());
}
};

Expand All @@ -413,13 +420,18 @@ where
let _ = receiver_tx.send(conn_rx);
}

RedirectRequest::Steal { port, receiver_tx } => {
RedirectRequest::Steal {
port,
mode,
receiver_tx,
} => {
let (conn_tx, conn_rx) = mpsc::channel(32);

match self.ports.entry(port) {
Entry::Vacant(e) => {
tracing::debug!(
from_port = port,
?mode,
"Creating a new port redirection for a stealing client"
);
self.redirector.add_redirection(port).await?;
Expand All @@ -428,10 +440,13 @@ where
mirror_txs: Default::default(),
shutdown: Default::default(),
connections: Default::default(),
mode,
});
}
Entry::Occupied(mut e) => {
e.get_mut().steal_tx.replace(conn_tx.clone());
let state = e.get_mut();
state.steal_tx.replace(conn_tx.clone());
state.warn_on_mode_conflict(port, mode, "stealing");
}
}

Expand Down Expand Up @@ -584,6 +599,7 @@ pub type MirroredConnectionsRx = mpsc::Receiver<MirroredTraffic>;
pub enum RedirectRequest {
Steal {
port: u16,
mode: IncomingPortMode,
receiver_tx: oneshot::Sender<StolenConnectionsRx>,
},
Mirror {
Expand All @@ -599,9 +615,10 @@ impl fmt::Debug for RedirectRequest {
.debug_struct("Mirror")
.field("port", port)
.finish_non_exhaustive(),
Self::Steal { port, .. } => f
Self::Steal { port, mode, .. } => f
.debug_struct("Steal")
.field("port", port)
.field("mode", mode)
.finish_non_exhaustive(),
}
}
Expand Down Expand Up @@ -663,6 +680,11 @@ struct PortState {
shutdown: CancellationToken,
/// Used to track connection IO tasks and wait for their graceful shutdown.
connections: JoinSet<()>,
/// How incoming connections for this port should be handled before delivery.
///
/// This is shared port-wide. The mode chosen by the subscriber that creates the redirection
/// wins for the lifetime of that redirection.
mode: IncomingPortMode,
}

impl fmt::Debug for PortState {
Expand All @@ -676,11 +698,24 @@ impl fmt::Debug for PortState {
.is_some_and(|tx| tx.is_closed().not()),
)
.field("mirrorers", &self.mirror_txs.len())
.field("mode", &self.mode)
.finish()
}
}

impl PortState {
fn warn_on_mode_conflict(&self, port: u16, requested_mode: IncomingPortMode, subscriber: &str) {
if self.mode != requested_mode {
tracing::warn!(
from_port = port,
existing_mode = ?self.mode,
?requested_mode,
subscriber,
"Port is already redirected with a different incoming mode; keeping existing mode",
);
}
}

/// Tell and wait for all connections to gracefully shut down.
/// This function is essentially `AsyncDrop`, and it should always
/// be called before removing the redirection.
Expand Down Expand Up @@ -708,7 +743,8 @@ mod test {
};

use crate::incoming::{
RedirectorTask, RedirectorTaskConfig, StolenTraffic, test::DummyRedirector,
IncomingPortMode, MirroredTraffic, RedirectorTask, RedirectorTaskConfig, StolenTraffic,
test::DummyRedirector,
};

#[rstest]
Expand All @@ -729,7 +765,7 @@ mod test {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();

handle.steal(port).await.unwrap();
handle.steal(port, IncomingPortMode::Detect).await.unwrap();
assert!(state.borrow().has_redirections([port]));

let mut tcp = tx.make_connection(listener.local_addr().unwrap()).await;
Expand Down Expand Up @@ -818,7 +854,7 @@ mod test {
);
tokio::spawn(task.run());

handle.steal(80).await.unwrap();
handle.steal(80, IncomingPortMode::Detect).await.unwrap();
assert!(state.borrow().has_redirections([80]));

handle.stop_steal(80);
Expand All @@ -827,7 +863,7 @@ mod test {
.await
.unwrap();

handle.steal(81).await.unwrap();
handle.steal(81, IncomingPortMode::Detect).await.unwrap();
assert!(state.borrow().has_redirections([81]));

std::mem::drop(handle);
Expand All @@ -837,6 +873,76 @@ mod test {
.unwrap();
}

#[rstest]
#[timeout(Duration::from_secs(5))]
#[tokio::test]
async fn mirror_detection_survives_later_raw_tcp_steal() {
let (redirector, _state, mut conn_tx) = DummyRedirector::new();
let (task, mut steal_handle, mut mirror_handle) = RedirectorTask::new(
redirector,
Default::default(),
RedirectorTaskConfig::from_env(),
);
tokio::spawn(task.run());

mirror_handle.mirror(80).await.unwrap();
steal_handle
.steal(80, IncomingPortMode::RawTcp)
.await
.unwrap();

let mut client_conn = conn_tx
.make_connection("127.0.0.1:80".parse().unwrap())
.await;
client_conn
.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
.await
.unwrap();

let MirroredTraffic::Http(_) = mirror_handle.next().await.unwrap().unwrap() else {
panic!("mirror subscriber lost HTTP detection mode");
};
let StolenTraffic::Http(_) = steal_handle.next().await.unwrap().unwrap() else {
panic!("steal subscriber lost HTTP detection mode");
};
}

#[rstest]
#[timeout(Duration::from_secs(5))]
#[tokio::test]
async fn raw_tcp_steal_mode_survives_later_mirror() {
let (redirector, _state, mut conn_tx) = DummyRedirector::new();
let (task, mut steal_handle, mut mirror_handle) = RedirectorTask::new(
redirector,
Default::default(),
RedirectorTaskConfig::from_env(),
);
tokio::spawn(task.run());

steal_handle
.steal(80, IncomingPortMode::RawTcp)
.await
.unwrap();
mirror_handle.mirror(80).await.unwrap();

let mut client_conn = conn_tx
.make_connection("127.0.0.1:80".parse().unwrap())
.await;
client_conn
.write_all(b"GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
.await
.unwrap();

let MirroredTraffic::Tcp(_) = mirror_handle.next().await.unwrap().unwrap() else {
panic!("mirror subscriber changed raw TCP steal mode");
};
let StolenTraffic::Tcp { join_handle_tx, .. } = steal_handle.next().await.unwrap().unwrap()
else {
panic!("steal subscriber changed raw TCP steal mode");
};
join_handle_tx.send(tokio::spawn(async {})).unwrap();
}

/// Regression test for a bug with HTTP graceful shutdown.
///
/// Verifies that [`RedirectorTask`] can handle port unsubscribe during an HTTP exchange.
Expand All @@ -856,7 +962,7 @@ mod test {
);
let redirector_task = tokio::spawn(task.run());

handle.steal(80).await.unwrap();
handle.steal(80, IncomingPortMode::Detect).await.unwrap();
let client_conn = conn_tx
.make_connection("127.0.0.1:80".parse().unwrap())
.await;
Expand Down
Loading