Skip to content

Commit 96a1b71

Browse files
chore(proxy): discard request context span during passthrough (#9882)
## Problem The RequestContext::span shouldn't live for the entire postgres connection, only the handshake. ## Summary of changes * Slight refactor to the RequestContext to discard the span upon handshake completion. * Make sure the temporary future for the handshake is dropped (not bound to a variable) * Runs our nightly fmt script
1 parent a74ab93 commit 96a1b71

File tree

6 files changed

+61
-57
lines changed

6 files changed

+61
-57
lines changed

proxy/src/cancellation.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
use std::net::SocketAddr;
1+
use std::net::{IpAddr, SocketAddr};
22
use std::sync::Arc;
33

44
use dashmap::DashMap;
5+
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
56
use pq_proto::CancelKeyData;
67
use thiserror::Error;
78
use tokio::net::TcpStream;
@@ -17,9 +18,6 @@ use crate::rate_limiter::LeakyBucketRateLimiter;
1718
use crate::redis::cancellation_publisher::{
1819
CancellationPublisher, CancellationPublisherMut, RedisPublisherClient,
1920
};
20-
use std::net::IpAddr;
21-
22-
use ipnet::{IpNet, Ipv4Net, Ipv6Net};
2321

2422
pub type CancelMap = Arc<DashMap<CancelKeyData, Option<CancelClosure>>>;
2523
pub type CancellationHandlerMain = CancellationHandler<Option<Arc<Mutex<RedisPublisherClient>>>>;

proxy/src/console_redirect_proxy.rs

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22

3-
use futures::TryFutureExt;
3+
use futures::{FutureExt, TryFutureExt};
44
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
55
use tokio_util::sync::CancellationToken;
66
use tracing::{debug, error, info, Instrument};
@@ -88,40 +88,37 @@ pub async fn task_main(
8888
crate::metrics::Protocol::Tcp,
8989
&config.region,
9090
);
91-
let span = ctx.span();
92-
93-
let startup = Box::pin(
94-
handle_client(
95-
config,
96-
backend,
97-
&ctx,
98-
cancellation_handler,
99-
socket,
100-
conn_gauge,
101-
)
102-
.instrument(span.clone()),
103-
);
104-
let res = startup.await;
91+
92+
let res = handle_client(
93+
config,
94+
backend,
95+
&ctx,
96+
cancellation_handler,
97+
socket,
98+
conn_gauge,
99+
)
100+
.instrument(ctx.span())
101+
.boxed()
102+
.await;
105103

106104
match res {
107105
Err(e) => {
108-
// todo: log and push to ctx the error kind
109106
ctx.set_error_kind(e.get_error_kind());
110-
error!(parent: &span, "per-client task finished with an error: {e:#}");
107+
error!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
111108
}
112109
Ok(None) => {
113110
ctx.set_success();
114111
}
115112
Ok(Some(p)) => {
116113
ctx.set_success();
117-
ctx.log_connect();
118-
match p.proxy_pass().instrument(span.clone()).await {
114+
let _disconnect = ctx.log_connect();
115+
match p.proxy_pass().await {
119116
Ok(()) => {}
120117
Err(ErrorSource::Client(e)) => {
121-
error!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
118+
error!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
122119
}
123120
Err(ErrorSource::Compute(e)) => {
124-
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
121+
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
125122
}
126123
}
127124
}
@@ -219,6 +216,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
219216
client: stream,
220217
aux: node.aux.clone(),
221218
compute: node,
219+
session_id: ctx.session_id(),
222220
_req: request_gauge,
223221
_conn: conn_gauge,
224222
_cancel: session,

proxy/src/context/mod.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -272,11 +272,14 @@ impl RequestContext {
272272
this.success = true;
273273
}
274274

275-
pub fn log_connect(&self) {
276-
self.0
277-
.try_lock()
278-
.expect("should not deadlock")
279-
.log_connect();
275+
pub fn log_connect(self) -> DisconnectLogger {
276+
let mut this = self.0.into_inner();
277+
this.log_connect();
278+
279+
// close current span.
280+
this.span = Span::none();
281+
282+
DisconnectLogger(this)
280283
}
281284

282285
pub(crate) fn protocol(&self) -> Protocol {
@@ -434,8 +437,14 @@ impl Drop for RequestContextInner {
434437
fn drop(&mut self) {
435438
if self.sender.is_some() {
436439
self.log_connect();
437-
} else {
438-
self.log_disconnect();
439440
}
440441
}
441442
}
443+
444+
pub struct DisconnectLogger(RequestContextInner);
445+
446+
impl Drop for DisconnectLogger {
447+
fn drop(&mut self) {
448+
self.0.log_disconnect();
449+
}
450+
}

proxy/src/proxy/mod.rs

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub(crate) mod wake_compute;
1010
use std::sync::Arc;
1111

1212
pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource};
13-
use futures::TryFutureExt;
13+
use futures::{FutureExt, TryFutureExt};
1414
use itertools::Itertools;
1515
use once_cell::sync::OnceCell;
1616
use pq_proto::{BeMessage as Be, StartupMessageParams};
@@ -123,42 +123,39 @@ pub async fn task_main(
123123
crate::metrics::Protocol::Tcp,
124124
&config.region,
125125
);
126-
let span = ctx.span();
127-
128-
let startup = Box::pin(
129-
handle_client(
130-
config,
131-
auth_backend,
132-
&ctx,
133-
cancellation_handler,
134-
socket,
135-
ClientMode::Tcp,
136-
endpoint_rate_limiter2,
137-
conn_gauge,
138-
)
139-
.instrument(span.clone()),
140-
);
141-
let res = startup.await;
126+
127+
let res = handle_client(
128+
config,
129+
auth_backend,
130+
&ctx,
131+
cancellation_handler,
132+
socket,
133+
ClientMode::Tcp,
134+
endpoint_rate_limiter2,
135+
conn_gauge,
136+
)
137+
.instrument(ctx.span())
138+
.boxed()
139+
.await;
142140

143141
match res {
144142
Err(e) => {
145-
// todo: log and push to ctx the error kind
146143
ctx.set_error_kind(e.get_error_kind());
147-
warn!(parent: &span, "per-client task finished with an error: {e:#}");
144+
warn!(parent: &ctx.span(), "per-client task finished with an error: {e:#}");
148145
}
149146
Ok(None) => {
150147
ctx.set_success();
151148
}
152149
Ok(Some(p)) => {
153150
ctx.set_success();
154-
ctx.log_connect();
155-
match p.proxy_pass().instrument(span.clone()).await {
151+
let _disconnect = ctx.log_connect();
152+
match p.proxy_pass().await {
156153
Ok(()) => {}
157154
Err(ErrorSource::Client(e)) => {
158-
warn!(parent: &span, "per-client task finished with an IO error from the client: {e:#}");
155+
warn!(?session_id, "per-client task finished with an IO error from the client: {e:#}");
159156
}
160157
Err(ErrorSource::Compute(e)) => {
161-
error!(parent: &span, "per-client task finished with an IO error from the compute: {e:#}");
158+
error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}");
162159
}
163160
}
164161
}
@@ -352,6 +349,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
352349
client: stream,
353350
aux: node.aux.clone(),
354351
compute: node,
352+
session_id: ctx.session_id(),
355353
_req: request_gauge,
356354
_conn: conn_gauge,
357355
_cancel: session,

proxy/src/proxy/passthrough.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub(crate) struct ProxyPassthrough<P, S> {
5959
pub(crate) client: Stream<S>,
6060
pub(crate) compute: PostgresConnection,
6161
pub(crate) aux: MetricsAuxInfo,
62+
pub(crate) session_id: uuid::Uuid,
6263

6364
pub(crate) _req: NumConnectionRequestsGuard<'static>,
6465
pub(crate) _conn: NumClientConnectionsGuard<'static>,
@@ -69,7 +70,7 @@ impl<P, S: AsyncRead + AsyncWrite + Unpin> ProxyPassthrough<P, S> {
6970
pub(crate) async fn proxy_pass(self) -> Result<(), ErrorSource> {
7071
let res = proxy_pass(self.client, self.compute.stream, self.aux).await;
7172
if let Err(err) = self.compute.cancel_closure.try_cancel_query().await {
72-
tracing::warn!(?err, "could not cancel the query in the database");
73+
tracing::warn!(session_id = ?self.session_id, ?err, "could not cancel the query in the database");
7374
}
7475
res
7576
}

proxy/src/redis/cancellation_publisher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
use core::net::IpAddr;
12
use std::sync::Arc;
23

3-
use core::net::IpAddr;
44
use pq_proto::CancelKeyData;
55
use redis::AsyncCommands;
66
use tokio::sync::Mutex;

0 commit comments

Comments
 (0)