Add domain fronting server and finish client#9644
Conversation
00f443d to
28c0e54
Compare
MarkusPettersson98
left a comment
There was a problem hiding this comment.
@MarkusPettersson98 reviewed 9 files and all commit messages, and made 2 comments.
Reviewable status: 9 of 12 files reviewed, 2 unresolved discussions (waiting on @pinkisemils).
mullvad-api/Cargo.toml line 66 at r1 (raw file):
papaya = "0.2" # Should follow whatever tokio uses bytes = "^1.2"
⛏️ Could we convert these to workspace deps? 🙏
Code quote:
papaya = "0.2"
# Should follow whatever tokio uses
bytes = "^1.2"mullvad-api/src/proxy.rs line 97 at r1 (raw file):
)), } }
None of these arms seems fallible, so can't get_endpoint simply return an Endpoint instead of Option<Endpoint>?:)
Code quote:
fn get_endpoint(&self) -> Option<Endpoint> {
match self {
ProxyConfig::Shadowsocks(shadowsocks) => Some(Endpoint::from_socket_address(
shadowsocks.endpoint,
TransportProtocol::Tcp,
)),
ProxyConfig::Socks5Local(local) => Some(local.remote_endpoint),
ProxyConfig::Socks5Remote(remote) => Some(Endpoint::from_socket_address(
remote.endpoint,
TransportProtocol::Tcp,
)),
ProxyConfig::EncryptedDnsProxy(proxy) => {
let addr = SocketAddr::V4(proxy.addr);
Some(Endpoint::from_socket_address(addr, TransportProtocol::Tcp))
}
ProxyConfig::DomainFronting(proxy) => Some(Endpoint::from_socket_address(
proxy.addr,
TransportProtocol::Tcp,
)),
}
}28c0e54 to
90febad
Compare
There was a problem hiding this comment.
@dlon partially reviewed 7 files and made 5 comments.
Reviewable status: 4 of 14 files reviewed, 7 unresolved discussions (waiting on @MarkusPettersson98 and @pinkisemils).
mullvad-api/src/domain_fronting/mod.rs line 359 at r1 (raw file):
buf: &[u8], ) -> std::task::Poll<Result<usize, std::io::Error>> { log::debug!("call to poll_write");
Might be a bit spammy
mullvad-api/src/domain_fronting/mod.rs line 361 at r1 (raw file):
log::debug!("call to poll_write"); self.as_mut().update_write_waker(cx); if self.send_future.is_none() {
Could this be moved to an else branch after if let Some(future) = ... { ... }?
mullvad-api/src/domain_fronting/mod.rs line 367 at r1 (raw file):
self.as_mut().resolve_read_waker(); self.as_mut().resolve_write_waker(); return Poll::Ready(Ok(buf.len()));
I'm a bit confused. Is it legal to wake the task when and return Ready? Should we not return Pending here?
It seems like this should cause write to immediately resolve (Poll::Ready) but not actually send anything until a second write call is made.
I suspect it might also be better to continue and poll send_future rather than return and wake the waker.
mullvad-api/src/domain_fronting/mod.rs line 376 at r1 (raw file):
self.as_mut().resolve_read_waker(); self.send_future = None; return Poll::Pending;
Should we not be returning Ready here? Also means we shouldn't have to touch the waker ourselves, if I'm not mistaken.
mullvad-api/src/domain_fronting/mod.rs line 365 at r2 (raw file):
let payload = Bytes::copy_from_slice(buf); self.send_future = Some(Self::create_send_future(request_tx, payload)); self.as_mut().resolve_read_waker();
Why is it that we're waking the read task from here? That looks a bit off, but I might be missing something
MarkusPettersson98
left a comment
There was a problem hiding this comment.
@MarkusPettersson98 reviewed 9 files and all commit messages, made 10 comments, and resolved 2 discussions.
Reviewable status: 13 of 14 files reviewed, 15 unresolved discussions (waiting on @pinkisemils).
mullvad-api/src/domain_fronting/mod.rs line 8 at r2 (raw file):
//! # Client //! //! [`ProxyConnection`] implements `AsyncRead + AsyncWrite`, tunneling data via HTTP POST requests.
[`AsyncRead`] + [`AsyncWrite`]
📜 The traits are not properly linked atm
Code quote:
`AsyncRead + AsyncWrite`mullvad-api/src/domain_fronting/mod.rs line 61 at r2 (raw file):
//! Both client and server support generic `AsyncRead + AsyncWrite` streams for testing. //! Use `ProxyConnection::from_stream()` and `Sessions::with_connector()` to inject //! custom transports like `tokio::io::duplex` for unit tests.
📜 Would be grand if we could link these traits/functions/modules properly. Chuck a few brackets in there ✨
Code quote:
//! # Testing
//!
//! Both client and server support generic `AsyncRead + AsyncWrite` streams for testing.
//! Use `ProxyConnection::from_stream()` and `Sessions::with_connector()` to inject
//! custom transports like `tokio::io::duplex` for unit tests.mullvad-api/src/domain_fronting/mod.rs line 97 at r2 (raw file):
pub mod server; const SESSION_HEADER_KEY: &str = "X-Mullvad-Session";
How much of a pain would it be to modularize Sessions over the session header key?
Use case: Make the module Mullvad agnostic:-)
Code quote:
const SESSION_HEADER_KEY: &str = "X-Mullvad-Session";mullvad-api/src/domain_fronting/mod.rs line 126 at r2 (raw file):
front: String, proxy_host: String, }
A lil' docstring didn't hurt nobody?
/// Configuration used to [`ProxyConfig::connect`] to a proxy.
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)]
pub struct ProxyConfig {
pub addr: SocketAddr,
front: String,
proxy_host: String,
}Code quote:
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)]
pub struct ProxyConfig {
pub addr: SocketAddr,
front: String,
proxy_host: String,
}mullvad-api/src/domain_fronting/mod.rs line 126 at r2 (raw file):
front: String, proxy_host: String, }
Is there a particular reason why DomainFronting is not used instead of re-declaring front + proxy_host? Seems like we are missing out on free documentation:^)
Code quote:
pub struct ProxyConfig {
pub addr: SocketAddr,
front: String,
proxy_host: String,
}mullvad-api/src/domain_fronting/mod.rs line 274 at r2 (raw file):
log::debug!("Received {} bytes", response.len()); self.reader.get_mut().extend(response); }
⛏️ Logging here seems excessive
Code quote:
fn fill_recv_buffer(mut self: Pin<&mut Self>, response: Bytes) {
log::debug!("Received {} bytes", response.len());
self.reader.get_mut().extend(response);
}mullvad-api/src/domain_fronting/mod.rs line 349 at r2 (raw file):
))) } }
⛏️ I see that you are using ready! in poll_write already, and we could utilize it here as well
Code quote (i):
match pin!(send_future).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => {
self.as_mut().resolve_write_waker();
self.as_mut().resolve_read_waker();
self.send_future = None;
Poll::Pending
}
Poll::Ready(Err(_)) => {
self.as_mut().resolve_write_waker();
Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Actor shut down",
)))
}
}Code snippet (ii):
match ready!(pin!(send_future).poll(cx)) {
Ok(()) => {
self.as_mut().resolve_write_waker();
self.as_mut().resolve_read_waker();
self.send_future = None;
Poll::Pending
}
Err(_) => {
self.as_mut().resolve_write_waker();
Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Actor shut down",
)))
}
}mullvad-api/src/bin/domain_fronting_server.rs line 83 at r2 (raw file):
let acceptor = tls_acceptor.clone(); log::debug!("Accepted connection from {}", addr);
⛏️ Might as well use string interpolation to format log print
Code quote (i):
log::debug!("Accepted connection from {}", addr);Code snippet (ii):
log::debug!("Accepted connection from {addr}");mullvad-api/src/bin/domain_fronting_server.rs line 100 at r2 (raw file):
.await { log::error!("Error serving connection from {}: {}", addr, err);
⛏️ Might as well use string interpolation to format log print
Code quote (i):
log::error!("Error serving connection from {}: {}", addr, err);Code snippet (ii):
log::error!("Error serving connection from {addr}: {err}");mullvad-api/src/bin/domain_fronting_server.rs line 104 at r2 (raw file):
} Err(err) => { log::error!("TLS handshake failed for {}: {}", addr, err);
⛏️ Might as well use string interpolation to format log print
Code quote (i):
log::error!("TLS handshake failed for {}: {}", addr, err);Code snippet (ii):
log::error!("TLS handshake failed for {addr}: {err}");
MarkusPettersson98
left a comment
There was a problem hiding this comment.
@MarkusPettersson98 made 1 comment.
Reviewable status: 13 of 14 files reviewed, 16 unresolved discussions (waiting on @pinkisemils).
mullvad-api/src/domain_fronting/mod.rs line 390 at r2 (raw file):
Poll::Pending }
Matching on self.send_future slightly differently reveals that the trailing return value Poll::Pending is redundant
Code quote (i):
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
log::debug!("call to poll_write");
self.as_mut().update_write_waker(cx);
if self.send_future.is_none() {
let request_tx = self.request_tx.clone();
let payload = Bytes::copy_from_slice(buf);
self.send_future = Some(Self::create_send_future(request_tx, payload));
self.as_mut().resolve_read_waker();
self.as_mut().resolve_write_waker();
return Poll::Ready(Ok(buf.len()));
}
if let Some(future) = &mut self.send_future {
match ready!(pin!(future).poll(cx)) {
Ok(_) => {
self.as_mut().resolve_write_waker();
self.as_mut().resolve_read_waker();
self.send_future = None;
return Poll::Pending;
}
Err(_) => {
self.send_future = None;
self.as_mut().resolve_read_waker();
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Actor shut down",
)));
}
}
};
Poll::Pending
}Code snippet (ii):
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
log::debug!("call to poll_write");
self.as_mut().update_write_waker(cx);
let Some(future) = &mut self.send_future else {
let request_tx = self.request_tx.clone();
let payload = Bytes::copy_from_slice(buf);
self.send_future = Some(Self::create_send_future(request_tx, payload));
self.as_mut().resolve_read_waker();
self.as_mut().resolve_write_waker();
return Poll::Ready(Ok(buf.len()));
};
match ready!(pin!(future).poll(cx)) {
Ok(_) => {
self.as_mut().resolve_write_waker();
self.as_mut().resolve_read_waker();
self.send_future = None;
Poll::Pending
}
Err(_) => {
self.send_future = None;
self.as_mut().resolve_read_waker();
Poll::Ready(Err(io::Error::new(
io::ErrorKind::BrokenPipe,
"Actor shut down",
)))
}
}
}
faern
left a comment
There was a problem hiding this comment.
@faern made 1 comment.
Reviewable status: 13 of 14 files reviewed, 17 unresolved discussions (waiting on @pinkisemils).
mullvad-api/src/bin/domain_fronting_server.rs line 73 at r2 (raw file):
// Load TLS configuration let tls_config = load_tls_config(&args.cert_path, &args.key_path)?;
This does read files in a blocking manner. Can those be made async, or can we call this function in spawn_blocking?
90febad to
59986bf
Compare
faern
left a comment
There was a problem hiding this comment.
@faern made 1 comment.
Reviewable status: 5 of 15 files reviewed, 17 unresolved discussions (waiting on @MarkusPettersson98 and @pinkisemils).
mullvad-api/Cargo.toml line 66 at r1 (raw file):
Previously, MarkusPettersson98 (Markus Pettersson) wrote…
⛏️ Could we convert these to workspace deps? 🙏
But this crate is going to get ripped out to a separate repository right away, right? So probably not worth it.
pinkisemils
left a comment
There was a problem hiding this comment.
@pinkisemils made 3 comments and resolved 13 discussions.
Reviewable status: 5 of 15 files reviewed, 4 unresolved discussions (waiting on @dlon and @MarkusPettersson98).
mullvad-api/src/domain_fronting/mod.rs line 367 at r1 (raw file):
Previously, dlon (David Lönnhager) wrote…
I'm a bit confused. Is it legal to wake the task when and return
Ready? Should we not returnPendinghere?It seems like this should cause
writeto immediately resolve (Poll::Ready) but not actually send anything until a secondwritecall is made.I suspect it might also be better to continue and poll
send_futurerather than return and wake the read waker.
I think you are right. I think this ended up being design by desperation as I was trying to fight a deadlock. Now it should be a
mullvad-api/src/domain_fronting/mod.rs line 97 at r2 (raw file):
Previously, MarkusPettersson98 (Markus Pettersson) wrote…
How much of a pain would it be to modularize
Sessionsover the session header key?Use case: Make the module Mullvad agnostic:-)
Sure, we can do that. Wouldn't hurt much at all.
mullvad-api/src/domain_fronting/mod.rs line 126 at r2 (raw file):
Previously, MarkusPettersson98 (Markus Pettersson) wrote…
A lil' docstring didn't hurt nobody?
/// Configuration used to [`ProxyConfig::connect`] to a proxy. #[derive(serde::Serialize, serde::Deserialize, PartialEq, Debug, Clone)] pub struct ProxyConfig { pub addr: SocketAddr, front: String, proxy_host: String, }
I hope I've done that now.
MarkusPettersson98
left a comment
There was a problem hiding this comment.
Very minor nit left, but I think it looks good!
@MarkusPettersson98 reviewed 10 files and all commit messages, made 4 comments, and resolved 2 discussions.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @dlon and @pinkisemils).
mullvad-api/src/domain_fronting/server.rs line 70 at r4 (raw file):
pub fn new(upstream: SocketAddr, session_header_key: String) -> Arc<Self> { Self::with_connector(upstream, session_header_key, TcpConnector) }
Is there a particular reason for returning an Arc<Self> from the constructor, rather than simply Self?:)
Code quote:
/// Create a new session manager with the default TCP connector.
pub fn new(upstream: SocketAddr, session_header_key: String) -> Arc<Self> {
Self::with_connector(upstream, session_header_key, TcpConnector)
}mullvad-api/src/domain_fronting/client.rs line 47 at r4 (raw file):
&self.domain_fronting } }
This seems to be unused?
Code quote:
impl std::ops::Deref for ProxyConfig {
type Target = DomainFronting;
fn deref(&self) -> &Self::Target {
&self.domain_fronting
}
}mullvad-api/src/domain_fronting/client.rs line 149 at r4 (raw file):
log::error!("Domain fronting connection failed: {:?}", err); } });
Are we responsible for dropping this Connection when the corresponding Sender is dropped? i.e. do we ever care about cancelling this spawned task?
Code quote:
tokio::spawn(async move {
if let Err(err) = conn.await {
log::error!("Domain fronting connection failed: {:?}", err);
}
});
MarkusPettersson98
left a comment
There was a problem hiding this comment.
@MarkusPettersson98 reviewed 2 files and all commit messages, made 1 comment, and resolved 1 discussion.
Reviewable status: all files reviewed, 4 unresolved discussions (waiting on @dlon and @pinkisemils).
MarkusPettersson98
left a comment
There was a problem hiding this comment.
@MarkusPettersson98 reviewed 1 file and all commit messages, and resolved 1 discussion.
Reviewable status: all files reviewed, 3 unresolved discussions (waiting on @dlon and @pinkisemils).
2e9caed to
e1f8def
Compare
There was a problem hiding this comment.
@dlon reviewed 12 files and all commit messages, made 8 comments, and resolved 1 discussion.
Reviewable status: all files reviewed, 8 unresolved discussions (waiting on @pinkisemils).
mullvad-api/src/domain_fronting/client.rs line 232 at r7 (raw file):
if self.as_ref().recv_buffer_empty() { self.as_mut().resolve_write_waker(); self.as_mut().resolve_read_waker();
I suspect it would be more correct to drop the read waker without waking it (since we're returning Ready(...) already). But I assume this is harmless.
mullvad-api/src/domain_fronting/client.rs line 252 at r7 (raw file):
} Err(err) => { return Poll::Ready(Err(err));
Actually unreachable. Reader::read is infallible despite the return type.
mullvad-api/src/domain_fronting/client.rs line 265 at r7 (raw file):
Ok(_) => { self.as_mut().resolve_write_waker(); self.as_mut().resolve_read_waker();
Won't this be spamming a bunch of empty POSTs whenever there's nothing to read? If so, is there any reason why it needs to be this noisy?
It's very possible I'm missing something here, though.
mullvad-api/src/domain_fronting/client.rs line 297 at r7 (raw file):
Err(_) => { self.send_future = None; return Poll::Ready(Err(io::Error::new(
Could AsyncRead get stuck in the error case here if it's waiting on a pending send_future? Add resolve_read_waker, I think.
mullvad-api/src/domain_fronting/client.rs line 310 at r7 (raw file):
self.send_future = Some(Self::create_send_future(request_tx, payload)); self.as_mut().resolve_read_waker(); Poll::Ready(Ok(buf.len()))
Should we be swallowing all errors from the HTTP client? I guess it's kind of fine since the actor will be killed.
mullvad-api/src/domain_fronting/server.rs line 232 at r7 (raw file):
loop { let deadline_ref = deadline.as_mut();
How about just moving pin!(sleep(CONNECTION_TIMEOUT)); in here instead of manually resetting it? Would that be too wasteful?
mullvad-api/src/domain_fronting/server.rs line 257 at r7 (raw file):
return; }, Err(_timeout) => Bytes::new(),
Should deadline be reset here?
mullvad-api/src/domain_fronting/client.rs line 308 at r7 (raw file):
let request_tx = self.request_tx.clone(); let payload = Bytes::copy_from_slice(buf); self.send_future = Some(Self::create_send_future(request_tx, payload));
This is not being polled here. Does that mean that nothing will be sent until the ProxyConnection is read from or a second write occurs? That feels counter-intuitive. I'd expect an Ok(n_bytes) result to mean that stuff was actually sent (or at least queued somewhere)
These changes rework the client and add a proxy server for domain fronting. Instead of attempting to use the
CONNECTmethod to upgrade the underlying connection from HTTP to a different protocol, I've reverted everything back to usePOSTrequests to post TCP payloads back and forth between the server and the client. There's an end to end test that verifies that the proxy logic works.To test these changes, you can run the client against a devmole server, which unfortunately is not behind a CDN. But it has been extensively tested with a CDN as well.
This PR does not add domain fronting as an access method just yet to settings, as we don't have this service deployed. Yet.
This change is