Skip to content

Commit cccaa20

Browse files
apollo_infra: cross component tracing lite
1 parent 9549636 commit cccaa20

File tree

8 files changed

+90
-13
lines changed

8 files changed

+90
-13
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/apollo_infra/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ apollo_config.workspace = true
1818
apollo_infra_utils.workspace = true
1919
apollo_metrics.workspace = true
2020
async-trait.workspace = true
21+
derive_more.workspace = true
2122
hyper = { workspace = true, features = ["client", "http2", "server", "tcp"] }
2223
metrics-exporter-prometheus.workspace = true
24+
rand.workspace = true
2325
rstest.workspace = true
2426
serde = { workspace = true, features = ["derive"] }
2527
serde_json.workspace = true

crates/apollo_infra/src/component_client/local_component_client.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use serde::de::DeserializeOwned;
33
use serde::Serialize;
44
use tokio::sync::mpsc::{channel, Sender};
55
use tokio::time::Instant;
6+
use tracing::trace;
67

78
use crate::component_client::ClientResult;
8-
use crate::component_definitions::{ComponentClient, RequestWrapper};
9+
use crate::component_definitions::{ComponentClient, RequestId, RequestWrapper};
910
use crate::metrics::LocalClientMetrics;
1011
use crate::requests::LabeledRequest;
1112

@@ -43,7 +44,13 @@ where
4344
async fn send(&self, request: Request) -> ClientResult<Response> {
4445
let request_label = request.request_label();
4546
let (res_tx, mut res_rx) = channel::<Response>(1);
46-
let request_wrapper = RequestWrapper::new(request, res_tx);
47+
let request_id = RequestId::generate();
48+
trace!(
49+
request_id = %request_id,
50+
request = request_label,
51+
"Sending local request"
52+
);
53+
let request_wrapper = RequestWrapper::new(request, res_tx, request_id);
4754
let start = Instant::now();
4855
self.tx.send(request_wrapper).await.expect("Outbound connection should be open.");
4956
let response = res_rx.recv().await.expect("Inbound connection should be open.");

crates/apollo_infra/src/component_client/remote_component_client.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ use tracing::{debug, trace, warn};
1818
use validator::Validate;
1919

2020
use super::definitions::{ClientError, ClientResult};
21-
use crate::component_definitions::{ComponentClient, ServerError, APPLICATION_OCTET_STREAM};
21+
use crate::component_definitions::{
22+
ComponentClient,
23+
RequestId,
24+
ServerError,
25+
APPLICATION_OCTET_STREAM,
26+
REQUEST_ID_HEADER,
27+
};
2228
use crate::metrics::RemoteClientMetrics;
2329
use crate::requests::LabeledRequest;
2430
use crate::serde_utils::SerdeWrapper;
@@ -163,10 +169,15 @@ where
163169
Self { uri, client, config, metrics, _req: PhantomData, _res: PhantomData }
164170
}
165171

166-
fn construct_http_request(&self, serialized_request: Bytes) -> HyperRequest<Body> {
172+
fn construct_http_request(
173+
&self,
174+
serialized_request: Bytes,
175+
request_id: RequestId,
176+
) -> HyperRequest<Body> {
167177
trace!("Constructing remote request");
168178
HyperRequest::post(self.uri.clone())
169179
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
180+
.header(REQUEST_ID_HEADER, request_id.to_string())
170181
.body(Body::from(serialized_request))
171182
.expect("Request building should succeed")
172183
}
@@ -237,9 +248,17 @@ where
237248
let max_attempts = self.config.retries + 1;
238249
trace!("Starting retry loop: max_attempts = {max_attempts}");
239250
let mut retry_interval_ms = self.config.initial_retry_delay_ms;
251+
let request_id = RequestId::generate();
240252
for attempt in 1..max_attempts + 1 {
241-
trace!("Request {log_message} attempt {attempt} of {max_attempts}");
242-
let http_request = self.construct_http_request(serialized_request_bytes.clone());
253+
trace!(
254+
request_id = %request_id,
255+
request = request_label,
256+
attempt = attempt,
257+
max_attempts = max_attempts,
258+
"Sending remote request"
259+
);
260+
let http_request =
261+
self.construct_http_request(serialized_request_bytes.clone(), request_id.clone());
243262
let start = Instant::now();
244263
let res = self.try_send(http_request).await;
245264
let elapsed = start.elapsed();

crates/apollo_infra/src/component_definitions.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use std::fmt::{Debug, Formatter, Result};
22

33
use apollo_infra_utils::type_name::short_type_name;
44
use async_trait::async_trait;
5+
use derive_more::{Display, FromStr};
6+
use rand::random;
57
use serde::de::DeserializeOwned;
68
use serde::{Deserialize, Serialize};
79
use thiserror::Error;
@@ -88,15 +90,28 @@ where
8890
pub request: Request,
8991
pub tx: Sender<Response>,
9092
pub creation_time: Instant,
93+
pub request_id: RequestId,
9194
}
9295

96+
#[derive(Clone, Display, FromStr)]
97+
pub struct RequestId(pub u64);
98+
99+
impl RequestId {
100+
pub fn generate() -> Self {
101+
Self(random::<u64>())
102+
}
103+
}
104+
105+
/// Header name for Request ID header.
106+
pub const REQUEST_ID_HEADER: &str = "request-id";
107+
93108
impl<Request, Response> RequestWrapper<Request, Response>
94109
where
95110
Request: Send,
96111
Response: Send,
97112
{
98-
pub fn new(request: Request, tx: Sender<Response>) -> Self {
99-
Self { request, tx, creation_time: Instant::now() }
113+
pub fn new(request: Request, tx: Sender<Response>, request_id: RequestId) -> Self {
114+
Self { request, tx, creation_time: Instant::now(), request_id }
100115
}
101116
}
102117

crates/apollo_infra/src/component_server/local_component_server.rs

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use crate::component_definitions::{
1717
ComponentRequestHandler,
1818
ComponentStarter,
1919
PrioritizedRequest,
20+
RequestId,
2021
RequestPriority,
2122
RequestWrapper,
2223
};
@@ -210,7 +211,7 @@ where
210211

211212
tokio::spawn(async move {
212213
loop {
213-
let (request, tx) = get_next_request_for_processing(
214+
let (request, tx, request_id) = get_next_request_for_processing(
214215
&mut high_rx,
215216
&mut normal_rx,
216217
&component_name,
@@ -221,6 +222,7 @@ where
221222
process_request(
222223
&mut component,
223224
request,
225+
request_id,
224226
tx,
225227
metrics,
226228
processing_time_warning_threshold_ms,
@@ -311,7 +313,7 @@ where
311313
tokio::spawn(async move {
312314
loop {
313315
// TODO(Tsabary): add a test for the queueing time metric.
314-
let (request, tx) = get_next_request_for_processing(
316+
let (request, tx, request_id) = get_next_request_for_processing(
315317
&mut high_rx,
316318
&mut normal_rx,
317319
&component_name,
@@ -328,6 +330,7 @@ where
328330
process_request(
329331
&mut cloned_component,
330332
request,
333+
request_id,
331334
tx,
332335
metrics,
333336
processing_time_warning_threshold_ms,
@@ -374,6 +377,7 @@ where
374377
async fn process_request<Request, Response, Component>(
375378
component: &mut Component,
376379
request: Request,
380+
request_id: RequestId,
377381
tx: Sender<Response>,
378382
metrics: &'static LocalServerMetrics,
379383
processing_time_warning_threshold_ms: u128,
@@ -386,6 +390,12 @@ async fn process_request<Request, Response, Component>(
386390
let request_info = format!("{:?}", request);
387391
let request_label = request.request_label();
388392

393+
trace!(
394+
request_id = %request_id,
395+
request = request_label,
396+
component = component_name,
397+
"Receiving local request"
398+
);
389399
trace!("Component {component_name} is starting to process request {request_info:?}",);
390400
// Please note that the we're measuring the time of an asynchronous request processing, which
391401
// might also include the awaited time of this task to execute.
@@ -430,7 +440,7 @@ async fn get_next_request_for_processing<Request, Response>(
430440
normal_rx: &mut Receiver<RequestWrapper<Request, Response>>,
431441
component_name: &str,
432442
metrics: &'static LocalServerMetrics,
433-
) -> (Request, Sender<Response>)
443+
) -> (Request, Sender<Response>, RequestId)
434444
where
435445
Request: Send + Debug + LabeledRequest,
436446
Response: Send,
@@ -451,12 +461,13 @@ where
451461
let request = request_wrapper.request;
452462
let tx = request_wrapper.tx;
453463
let creation_time = request_wrapper.creation_time;
464+
let request_id = request_wrapper.request_id;
454465

455466
trace!(
456467
"Component {component_name} received request {request:?} that was created at \
457468
{creation_time:?}",
458469
);
459470
metrics.record_queueing_time(creation_time.elapsed().as_secs_f64(), request.request_label());
460471

461-
(request, tx)
472+
(request, tx, request_id)
462473
}

crates/apollo_infra/src/component_server/remote_component_server.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ use validator::Validate;
2323
use crate::component_client::{ClientError, LocalComponentClient};
2424
use crate::component_definitions::{
2525
ComponentClient,
26+
RequestId,
2627
ServerError,
2728
APPLICATION_OCTET_STREAM,
2829
BUSY_PREVIOUS_REQUESTS_MSG,
30+
REQUEST_ID_HEADER,
2931
};
3032
use crate::component_server::ComponentServerStarter;
3133
use crate::metrics::RemoteServerMetrics;
@@ -110,6 +112,7 @@ where
110112

111113
async fn remote_component_server_handler(
112114
http_request: HyperRequest<Body>,
115+
request_id: RequestId,
113116
local_client: LocalComponentClient<Request, Response>,
114117
metrics: &'static RemoteServerMetrics,
115118
) -> Result<HyperResponse<Body>, hyper::Error> {
@@ -123,11 +126,19 @@ where
123126
.map_err(|err| ClientError::ResponseDeserializationFailure(err.to_string()))
124127
{
125128
Ok(request) => {
129+
let request_label = request.request_label();
130+
trace!(
131+
request_id = %request_id,
132+
request = request_label,
133+
"Receiving remote request"
134+
);
126135
trace!("Successfully deserialized request: {request:?}");
127136
metrics.increment_valid_received();
128137

129138
// Wrap the send operation in a tokio::spawn as it is NOT a cancel-safe operation.
130139
// Even if the current task is cancelled, the inner task will continue to run.
140+
//
141+
// Note: this creates a new request ID for the local client.
131142
let response = tokio::spawn(async move { local_client.send(request).await })
132143
.await
133144
.expect("Should be able to extract value from the task");
@@ -194,10 +205,17 @@ where
194205
Ok(permit) => {
195206
metrics.increment_number_of_connections();
196207
trace!("Acquired semaphore permit for connection");
197-
let handle_request_service = service_fn(move |req| {
208+
let handle_request_service = service_fn(move |req: HyperRequest<Body>| {
198209
trace!("Received request: {:?}", req);
210+
let request_id = req
211+
.headers()
212+
.get(REQUEST_ID_HEADER)
213+
.and_then(|header| header.to_str().ok())
214+
.and_then(|s| s.parse::<RequestId>().ok())
215+
.expect("Request ID should be present in the request headers");
199216
Self::remote_component_server_handler(
200217
req,
218+
request_id,
201219
local_client.clone(),
202220
metrics,
203221
)

crates/apollo_infra/src/tests/remote_component_client_server_test.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ use crate::component_client::{
2828
};
2929
use crate::component_definitions::{
3030
ComponentClient,
31+
RequestId,
3132
RequestWrapper,
3233
ServerError,
3334
APPLICATION_OCTET_STREAM,
3435
BUSY_PREVIOUS_REQUESTS_MSG,
36+
REQUEST_ID_HEADER,
3537
};
3638
use crate::component_server::{
3739
ComponentServerStarter,
@@ -444,6 +446,7 @@ async fn faulty_client_setup() {
444446
format!("http://[{}]:{}/", self.socket.ip(), self.socket.port()).parse().unwrap();
445447
let http_request = Request::post(uri)
446448
.header(CONTENT_TYPE, APPLICATION_OCTET_STREAM)
449+
.header(REQUEST_ID_HEADER, RequestId::generate().to_string())
447450
.body(Body::from(SerdeWrapper::new(component_request).wrapper_serialize().unwrap()))
448451
.unwrap();
449452
let http_response = Client::new().request(http_request).await.unwrap();

0 commit comments

Comments
 (0)