Skip to content

Commit a1763bf

Browse files
apollo_infra: cross component tracing lite
1 parent 9eb270f commit a1763bf

File tree

8 files changed

+76
-15
lines changed

8 files changed

+76
-15
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_infra/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ apollo_config.workspace = true
1818
apollo_infra_utils.workspace = true
1919
apollo_metrics.workspace = true
2020
async-trait.workspace = true
21+
derive_more.workspace = true
2122
hyper = { workspace = true, features = ["client", "http2", "server", "tcp"] }
2223
metrics-exporter-prometheus.workspace = true
24+
rand.workspace = true
2325
rstest.workspace = true
2426
serde = { workspace = true, features = ["derive"] }
2527
serde_json.workspace = true

crates/apollo_infra/src/component_client/local_component_client.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use serde::de::DeserializeOwned;
33
use serde::Serialize;
44
use tokio::sync::mpsc::{channel, Sender};
55
use tokio::time::Instant;
6+
use tracing::field::{display, Empty};
7+
use tracing::instrument;
68

79
use crate::component_client::ClientResult;
8-
use crate::component_definitions::{ComponentClient, RequestWrapper};
10+
use crate::component_definitions::{ComponentClient, RequestId, RequestWrapper};
911
use crate::metrics::LocalClientMetrics;
1012
use crate::requests::LabeledRequest;
1113

@@ -40,10 +42,13 @@ where
4042
Request: Send + Serialize + DeserializeOwned + LabeledRequest,
4143
Response: Send + Serialize + DeserializeOwned,
4244
{
45+
#[instrument(skip_all, fields(request_id = Empty))]
4346
async fn send(&self, request: Request) -> ClientResult<Response> {
47+
let request_id = RequestId::generate();
48+
tracing::Span::current().record("request_id", display(&request_id));
4449
let request_label = request.request_label();
4550
let (res_tx, mut res_rx) = channel::<Response>(1);
46-
let request_wrapper = RequestWrapper::new(request, res_tx);
51+
let request_wrapper = RequestWrapper::new(request, res_tx, request_id);
4752
let start = Instant::now();
4853
self.tx.send(request_wrapper).await.expect("Outbound connection should be open.");
4954
let response = res_rx.recv().await.expect("Inbound connection should be open.");

crates/apollo_infra/src/component_client/remote_component_client.rs

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,18 @@ use serde::de::DeserializeOwned;
1414
use serde::{Deserialize, Serialize};
1515
use tokio::sync::Mutex;
1616
use tokio::time::Instant;
17-
use tracing::{debug, trace, warn};
17+
use tracing::field::{display, Empty};
18+
use tracing::{debug, instrument, trace, warn};
1819
use validator::Validate;
1920

2021
use super::definitions::{ClientError, ClientResult};
21-
use crate::component_definitions::{ComponentClient, ServerError, APPLICATION_OCTET_STREAM};
22+
use crate::component_definitions::{
23+
ComponentClient,
24+
RequestId,
25+
ServerError,
26+
APPLICATION_OCTET_STREAM,
27+
REQUEST_ID_HEADER,
28+
};
2229
use crate::metrics::RemoteClientMetrics;
2330
use crate::requests::LabeledRequest;
2431
use crate::serde_utils::SerdeWrapper;
@@ -163,10 +170,15 @@ where
163170
Self { uri, client, config, metrics, _req: PhantomData, _res: PhantomData }
164171
}
165172

166-
fn construct_http_request(&self, serialized_request: Bytes) -> HyperRequest<Body> {
173+
fn construct_http_request(
174+
&self,
175+
serialized_request: Bytes,
176+
request_id: &RequestId,
177+
) -> HyperRequest<Body> {
167178
trace!("Constructing remote request");
168179
HyperRequest::post(self.uri.clone())
169180
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
181+
.header(REQUEST_ID_HEADER, request_id.to_string())
170182
.body(Body::from(serialized_request))
171183
.expect("Request building should succeed")
172184
}
@@ -217,7 +229,10 @@ where
217229
Request: Send + Serialize + DeserializeOwned + Debug + AsRef<str> + LabeledRequest,
218230
Response: Send + Serialize + DeserializeOwned + Debug,
219231
{
232+
#[instrument(skip_all, fields(request_id = Empty))]
220233
async fn send(&self, component_request: Request) -> ClientResult<Response> {
234+
let request_id = RequestId::generate();
235+
tracing::Span::current().record("request_id", display(&request_id));
221236
let log_message = format!("{} to {}", component_request.as_ref(), self.uri);
222237
let request_label = component_request.request_label();
223238

@@ -239,7 +254,8 @@ where
239254
let mut retry_interval_ms = self.config.initial_retry_delay_ms;
240255
for attempt in 1..max_attempts + 1 {
241256
trace!("Request {log_message} attempt {attempt} of {max_attempts}");
242-
let http_request = self.construct_http_request(serialized_request_bytes.clone());
257+
let http_request =
258+
self.construct_http_request(serialized_request_bytes.clone(), &request_id);
243259
let start = Instant::now();
244260
let res = self.try_send(http_request).await;
245261
let elapsed = start.elapsed();

crates/apollo_infra/src/component_definitions.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::fmt::{Debug, Formatter, Result};
22

33
use apollo_infra_utils::type_name::short_type_name;
44
use async_trait::async_trait;
5+
use derive_more::{Display, FromStr};
6+
use rand::random;
57
use serde::de::DeserializeOwned;
68
use serde::{Deserialize, Serialize};
79
use thiserror::Error;
@@ -88,15 +90,28 @@ where
8890
pub request: Request,
8991
pub tx: Sender<Response>,
9092
pub creation_time: Instant,
93+
pub request_id: RequestId,
9194
}
9295

96+
#[derive(Clone, Display, FromStr)]
97+
pub struct RequestId(pub u64);
98+
99+
impl RequestId {
100+
pub fn generate() -> Self {
101+
Self(random::<u64>())
102+
}
103+
}
104+
105+
/// Header name for Request ID header.
106+
pub const REQUEST_ID_HEADER: &str = "request-id";
107+
93108
impl<Request, Response> RequestWrapper<Request, Response>
94109
where
95110
Request: Send,
96111
Response: Send,
97112
{
98-
pub fn new(request: Request, tx: Sender<Response>) -> Self {
99-
Self { request, tx, creation_time: Instant::now() }
113+
pub fn new(request: Request, tx: Sender<Response>, request_id: RequestId) -> Self {
114+
Self { request, tx, creation_time: Instant::now(), request_id }
100115
}
101116
}
102117

crates/apollo_infra/src/component_server/local_component_server.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ use serde::{Deserialize, Serialize};
1010
use tokio::sync::mpsc::{channel, Receiver, Sender};
1111
use tokio::sync::Semaphore;
1212
use tokio::time::Instant;
13-
use tracing::{error, info, trace, warn};
13+
use tracing::{error, info, instrument, trace, warn};
1414
use validator::Validate;
1515

1616
use crate::component_definitions::{
1717
ComponentRequestHandler,
1818
ComponentStarter,
1919
PrioritizedRequest,
20+
RequestId,
2021
RequestPriority,
2122
RequestWrapper,
2223
};
@@ -210,7 +211,7 @@ where
210211

211212
tokio::spawn(async move {
212213
loop {
213-
let (request, tx) = get_next_request_for_processing(
214+
let (request, tx, request_id) = get_next_request_for_processing(
214215
&mut high_rx,
215216
&mut normal_rx,
216217
&component_name,
@@ -221,6 +222,7 @@ where
221222
process_request(
222223
&mut component,
223224
request,
225+
request_id,
224226
tx,
225227
metrics,
226228
processing_time_warning_threshold_ms,
@@ -311,7 +313,7 @@ where
311313
tokio::spawn(async move {
312314
loop {
313315
// TODO(Tsabary): add a test for the queueing time metric.
314-
let (request, tx) = get_next_request_for_processing(
316+
let (request, tx, request_id) = get_next_request_for_processing(
315317
&mut high_rx,
316318
&mut normal_rx,
317319
&component_name,
@@ -328,6 +330,7 @@ where
328330
process_request(
329331
&mut cloned_component,
330332
request,
333+
request_id,
331334
tx,
332335
metrics,
333336
processing_time_warning_threshold_ms,
@@ -371,9 +374,11 @@ where
371374
}
372375
}
373376

377+
#[instrument(skip_all,fields(request_id = %request_id))]
374378
async fn process_request<Request, Response, Component>(
375379
component: &mut Component,
376380
request: Request,
381+
request_id: RequestId,
377382
tx: Sender<Response>,
378383
metrics: &'static LocalServerMetrics,
379384
processing_time_warning_threshold_ms: u128,
@@ -430,7 +435,7 @@ async fn get_next_request_for_processing<Request, Response>(
430435
normal_rx: &mut Receiver<RequestWrapper<Request, Response>>,
431436
component_name: &str,
432437
metrics: &'static LocalServerMetrics,
433-
) -> (Request, Sender<Response>)
438+
) -> (Request, Sender<Response>, RequestId)
434439
where
435440
Request: Send + Debug + LabeledRequest,
436441
Response: Send,
@@ -451,12 +456,13 @@ where
451456
let request = request_wrapper.request;
452457
let tx = request_wrapper.tx;
453458
let creation_time = request_wrapper.creation_time;
459+
let request_id = request_wrapper.request_id;
454460

455461
trace!(
456462
"Component {component_name} received request {request:?} that was created at \
457463
{creation_time:?}",
458464
);
459465
metrics.record_queueing_time(creation_time.elapsed().as_secs_f64(), request.request_label());
460466

461-
(request, tx)
467+
(request, tx, request_id)
462468
}

crates/apollo_infra/src/component_server/remote_component_server.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@ use serde::de::DeserializeOwned;
1717
use serde::{Deserialize, Serialize};
1818
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
1919
use tower::{service_fn, Service, ServiceExt};
20-
use tracing::{debug, error, trace, warn};
20+
use tracing::{debug, error, instrument, trace, warn};
2121
use validator::Validate;
2222

2323
use crate::component_client::{ClientError, LocalComponentClient};
2424
use crate::component_definitions::{
2525
ComponentClient,
26+
RequestId,
2627
ServerError,
2728
APPLICATION_OCTET_STREAM,
2829
BUSY_PREVIOUS_REQUESTS_MSG,
30+
REQUEST_ID_HEADER,
2931
};
3032
use crate::component_server::ComponentServerStarter;
3133
use crate::metrics::RemoteServerMetrics;
@@ -108,8 +110,10 @@ where
108110
Self { local_client, config: remote_server_config, port, max_concurrency, metrics }
109111
}
110112

113+
#[instrument(skip_all,fields(request_id = %request_id))]
111114
async fn remote_component_server_handler(
112115
http_request: HyperRequest<Body>,
116+
request_id: RequestId,
113117
local_client: LocalComponentClient<Request, Response>,
114118
metrics: &'static RemoteServerMetrics,
115119
) -> Result<HyperResponse<Body>, hyper::Error> {
@@ -128,6 +132,7 @@ where
128132

129133
// Wrap the send operation in a tokio::spawn as it is NOT a cancel-safe operation.
130134
// Even if the current task is cancelled, the inner task will continue to run.
135+
// Note: this creates a new request ID for the local client.
131136
let response = tokio::spawn(async move { local_client.send(request).await })
132137
.await
133138
.expect("Should be able to extract value from the task");
@@ -194,10 +199,17 @@ where
194199
Ok(permit) => {
195200
metrics.increment_number_of_connections();
196201
trace!("Acquired semaphore permit for connection");
197-
let handle_request_service = service_fn(move |req| {
202+
let handle_request_service = service_fn(move |req: HyperRequest<Body>| {
198203
trace!("Received request: {:?}", req);
204+
let request_id = req
205+
.headers()
206+
.get(REQUEST_ID_HEADER)
207+
.and_then(|header| header.to_str().ok())
208+
.and_then(|s| s.parse::<RequestId>().ok())
209+
.expect("Request ID should be present in the request headers");
199210
Self::remote_component_server_handler(
200211
req,
212+
request_id,
201213
local_client.clone(),
202214
metrics,
203215
)

crates/apollo_infra/src/tests/remote_component_client_server_test.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ use crate::component_client::{
2828
};
2929
use crate::component_definitions::{
3030
ComponentClient,
31+
RequestId,
3132
RequestWrapper,
3233
ServerError,
3334
APPLICATION_OCTET_STREAM,
3435
BUSY_PREVIOUS_REQUESTS_MSG,
36+
REQUEST_ID_HEADER,
3537
};
3638
use crate::component_server::{
3739
ComponentServerStarter,
@@ -447,6 +449,7 @@ async fn faulty_client_setup() {
447449
format!("http://[{}]:{}/", self.socket.ip(), self.socket.port()).parse().unwrap();
448450
let http_request = Request::post(uri)
449451
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
452+
.header(REQUEST_ID_HEADER, RequestId::generate().to_string())
450453
.body(Body::from(SerdeWrapper::new(component_request).wrapper_serialize().unwrap()))
451454
.unwrap();
452455
let http_response = Client::new().request(http_request).await.unwrap();

0 commit comments

Comments
 (0)