Skip to content

Commit 37c1a29

Browse files
authored
chore: Improve tracing, especially around running queries (#71)
* chore: Improve tracing for running queries * Add TraceLayer * changelog
1 parent 0d329c8 commit 37c1a29

File tree

22 files changed

+56
-22
lines changed

22 files changed

+56
-22
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,11 @@ All notable changes to this project will be documented in this file.
77
### Added
88

99
- Support configuring compression for OTEL ([#70]).
10+
- Improve tracing details by adding a `tower_http::trace::TraceLayer` that creates spans for every HTTP request ([#71]).
11+
- Improve tracing for running queries on Trino, adding spans for the request to Trino and parsing ([#71]).
1012

1113
[#70]: https://github.com/stackabletech/trino-lb/pull/70
14+
[#71]: https://github.com/stackabletech/trino-lb/pull/71
1215

1316
## [0.4.1] - 2025-03-03
1417

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ sqlx = { version = "0.8.2", features = [
8282
strum = { version = "0.27", features = ["derive"] }
8383
tokio = "1.39"
8484
tower = "0.5"
85+
tower-http = { version = "0.6", features = ["tracing"] }
8586
tracing = "0.1"
8687
tracing-opentelemetry = "0.25"
8788
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

trino-lb-core/src/trino_api.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub struct Stat {
8282

8383
impl TrinoQueryApiResponse {
8484
#[instrument(
85+
skip(query),
8586
fields(trino_lb_addr = %trino_lb_addr),
8687
)]
8788
pub fn new_from_queued_query(
@@ -151,6 +152,7 @@ impl TrinoQueryApiResponse {
151152
}
152153

153154
#[instrument(
155+
skip(self),
154156
fields(trino_lb_addr = %trino_lb_addr),
155157
)]
156158
pub fn change_next_uri_to_trino_lb(&mut self, trino_lb_addr: &Url) -> Result<(), Error> {

trino-lb-persistence/src/in_memory.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl Default for InMemoryPersistence {
5454
}
5555

5656
impl Persistence for InMemoryPersistence {
57-
#[instrument(skip(self))]
57+
#[instrument(skip(self, queued_query))]
5858
async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> {
5959
let mut queued_queries = self.queued_queries.write().await;
6060
queued_queries.insert(queued_query.id.clone(), queued_query);
@@ -74,15 +74,15 @@ impl Persistence for InMemoryPersistence {
7474
.clone())
7575
}
7676

77-
#[instrument(skip(self))]
77+
#[instrument(skip(self, queued_query))]
7878
async fn remove_queued_query(&self, queued_query: &QueuedQuery) -> Result<(), super::Error> {
7979
let mut queued_queries = self.queued_queries.write().await;
8080
queued_queries.remove(&queued_query.id);
8181

8282
Ok(())
8383
}
8484

85-
#[instrument(skip(self))]
85+
#[instrument(skip(self, query))]
8686
async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> {
8787
let mut queries = self.queries.write().await;
8888
queries.insert(query.id.clone(), query);

trino-lb-persistence/src/postgres/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ struct HeaderMapWrapper {
135135
}
136136

137137
impl Persistence for PostgresPersistence {
138-
#[instrument(skip(self))]
138+
#[instrument(skip(self, queued_query))]
139139
async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> {
140140
query!(
141141
r#"INSERT INTO queued_queries (id, query, headers, creation_time, last_accessed, cluster_group)
@@ -185,7 +185,7 @@ impl Persistence for PostgresPersistence {
185185
Ok(queued_query)
186186
}
187187

188-
#[instrument(skip(self))]
188+
#[instrument(skip(self, queued_query))]
189189
async fn remove_queued_query(&self, queued_query: &QueuedQuery) -> Result<(), super::Error> {
190190
query!(
191191
r#"DELETE FROM queued_queries
@@ -199,7 +199,7 @@ impl Persistence for PostgresPersistence {
199199
Ok(())
200200
}
201201

202-
#[instrument(skip(self))]
202+
#[instrument(skip(self, query))]
203203
async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> {
204204
query!(
205205
r#"INSERT INTO queries (id, trino_cluster, trino_endpoint, creation_time, delivered_time)

trino-lb-persistence/src/redis/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ impl<R> Persistence for RedisPersistence<R>
174174
where
175175
R: AsyncCommands + Clone,
176176
{
177-
#[instrument(skip(self))]
177+
#[instrument(skip(self, queued_query))]
178178
async fn store_queued_query(&self, queued_query: QueuedQuery) -> Result<(), super::Error> {
179179
let key = queued_query_key(&queued_query.id);
180180
let value = bincode::serialize(&queued_query).context(SerializeToBinarySnafu)?;
@@ -210,7 +210,7 @@ where
210210
Ok(bincode::deserialize(&value).context(DeserializeFromBinarySnafu)?)
211211
}
212212

213-
#[instrument(skip(self))]
213+
#[instrument(skip(self, queued_query))]
214214
async fn remove_queued_query(&self, queued_query: &QueuedQuery) -> Result<(), super::Error> {
215215
let key = queued_query_key(&queued_query.id);
216216
let mut connection = self.connection();
@@ -225,7 +225,7 @@ where
225225
Ok(())
226226
}
227227

228-
#[instrument(skip(self))]
228+
#[instrument(skip(self, query))]
229229
async fn store_query(&self, query: TrinoQuery) -> Result<(), super::Error> {
230230
let key = query_key(&query.id);
231231
let value = bincode::serialize(&query).context(SerializeToBinarySnafu)?;

trino-lb/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ snafu.workspace = true
4848
strum.workspace = true
4949
tokio.workspace = true
5050
tower.workspace = true
51+
tower-http.workspace = true
5152
tracing-opentelemetry.workspace = true
5253
tracing-subscriber.workspace = true
5354
tracing.workspace = true

trino-lb/src/cluster_group_manager.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use futures::future::try_join_all;
99
use http::{HeaderMap, StatusCode};
1010
use reqwest::Client;
1111
use snafu::{OptionExt, ResultExt, Snafu};
12-
use tracing::{debug, instrument};
12+
use tracing::{debug, info_span, instrument, Instrument};
1313
use tracing_opentelemetry::OpenTelemetrySpanExt;
1414
use trino_lb_core::{
1515
config::Config, sanitization::Sanitize, trino_api::TrinoQueryApiResponse,
@@ -37,6 +37,12 @@ pub enum Error {
3737
#[snafu(display("Failed to decode Trino API response"))]
3838
DecodeTrinoResponse { source: reqwest::Error },
3939

40+
#[snafu(display("Failed to get the bytes of the Trino API response"))]
41+
GetTrinoResponseBytes { source: reqwest::Error },
42+
43+
#[snafu(display("Failed to parse Trino API response as JSON"))]
44+
ParseTrinoResponse { source: serde_json::Error },
45+
4046
#[snafu(display("Configuration error: A specific Trino cluster can only be part of a single clusterGroup. Please make sure the Trino cluster {cluster_name:?} only is part of a single clusterGroup."))]
4147
ConfigErrorTrinoClusterInMultipleClusterGroups { cluster_name: String },
4248

@@ -111,6 +117,7 @@ impl IntoResponse for SendToTrinoResponse {
111117
}
112118

113119
impl ClusterGroupManager {
120+
// Intentionally including the config here, this is only logged on startup
114121
#[instrument(skip(persistence))]
115122
pub fn new(
116123
persistence: Arc<PersistenceImplementation>,
@@ -152,7 +159,10 @@ impl ClusterGroupManager {
152159
})
153160
}
154161

155-
#[instrument(skip(self))]
162+
#[instrument(
163+
skip(self, cluster),
164+
fields(cluster.name, headers = ?headers.sanitize())
165+
)]
156166
pub async fn send_query_to_cluster(
157167
&self,
158168
query: String,
@@ -188,7 +198,7 @@ impl ClusterGroupManager {
188198
let body = response
189199
.bytes()
190200
.await
191-
.context(DecodeTrinoResponseSnafu)?
201+
.context(GetTrinoResponseBytesSnafu)?
192202
.into();
193203
return Ok(SendToTrinoResponse::Unauthorized { headers, body });
194204
}
@@ -217,12 +227,19 @@ impl ClusterGroupManager {
217227
.get(next_uri)
218228
.headers(headers)
219229
.send()
230+
.instrument(info_span!("Send HTTP get to Trino"))
220231
.await
221232
.context(ContactTrinoPostQuerySnafu)?;
222233
let headers = response.headers();
223-
224234
let headers = filter_to_trino_headers(headers);
225-
let trino_query_api_response = response.json().await.context(DecodeTrinoResponseSnafu)?;
235+
236+
let bytes = response
237+
.bytes()
238+
.instrument(info_span!("Get response bytes"))
239+
.await
240+
.context(GetTrinoResponseBytesSnafu)?;
241+
let trino_query_api_response = info_span!("Parse JSON response", bytes = bytes.len())
242+
.in_scope(|| serde_json::from_slice(&bytes).context(ParseTrinoResponseSnafu))?;
226243

227244
Ok((trino_query_api_response, headers))
228245
}

trino-lb/src/http_server/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use axum_server::{tls_rustls::RustlsConfig, Handle};
1515
use futures::FutureExt;
1616
use snafu::{OptionExt, ResultExt, Snafu};
1717
use tokio::time::sleep;
18+
use tower_http::trace::TraceLayer;
1819
use tracing::info;
1920
use trino_lb_persistence::PersistenceImplementation;
2021

@@ -122,6 +123,7 @@ pub async fn start_http_server(
122123
)
123124
.route("/ui/index.html", get(ui::index::get_ui_index))
124125
.route("/ui/query.html", get(ui::query::get_ui_query))
126+
.layer(TraceLayer::new_for_http())
125127
.with_state(app_state);
126128

127129
if tls_config.enabled {

0 commit comments

Comments
 (0)