Skip to content

Commit cb51fe3

Browse files
astuyveduncanista
andauthored
chore: lifecycle listener to axum (#789)
# What? Migrates Lifecycle Listener to Axum # Motivation Better connection handling and [SVLS-7467](https://datadoghq.atlassian.net/browse/SVLS-7467) --------- Co-authored-by: jordan gonzález <[email protected]>
1 parent 29a3c84 commit cb51fe3

File tree

1 file changed

+145
-180
lines changed

1 file changed

+145
-180
lines changed

bottlecap/src/lifecycle/listener.rs

Lines changed: 145 additions & 180 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use ddcommon::hyper_migration;
5-
use ddcommon::hyper_migration::Body;
6-
use http_body_util::BodyExt;
7-
use hyper::service::service_fn;
8-
use hyper::{HeaderMap, Method, Response, StatusCode, http};
4+
use axum::{
5+
Router,
6+
extract::{Request, State},
7+
http::{HeaderMap, StatusCode},
8+
response::{IntoResponse, Response},
9+
routing::{get, post},
10+
};
11+
use bytes::Bytes;
912
use serde_json::json;
1013
use std::collections::HashMap;
11-
use std::io;
1214
use std::net::SocketAddr;
1315
use std::sync::Arc;
16+
use tokio::net::TcpListener;
1417
use tokio::sync::Mutex;
1518
use tracing::{debug, error, warn};
1619

17-
use crate::lifecycle::invocation::processor::Processor as InvocationProcessor;
18-
use crate::traces::propagation::text_map_propagator::{
19-
DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, DATADOG_SAMPLING_PRIORITY_KEY, DATADOG_TAGS_KEY,
20-
DATADOG_TRACE_ID_KEY,
20+
use crate::{
21+
http::extract_request_body,
22+
lifecycle::invocation::processor::Processor as InvocationProcessor,
23+
traces::propagation::text_map_propagator::{
24+
DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY, DATADOG_SAMPLING_PRIORITY_KEY, DATADOG_TAGS_KEY,
25+
DATADOG_TRACE_ID_KEY,
26+
},
2127
};
2228

2329
const HELLO_PATH: &str = "/lambda/hello";
@@ -31,203 +37,162 @@ pub struct Listener {
3137

3238
impl Listener {
3339
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
34-
let invocation_processor = self.invocation_processor.clone();
40+
let port = u16::try_from(AGENT_PORT).expect("AGENT_PORT is too large");
41+
let addr = SocketAddr::from(([127, 0, 0, 1], port));
42+
let listener = TcpListener::bind(&addr).await?;
43+
44+
let router = self.make_router();
3545

36-
let service = service_fn(move |req| {
37-
let invocation_processor = invocation_processor.clone();
46+
debug!("Lifecycle API | Starting listener on {}", addr);
47+
axum::serve(listener, router).await?;
48+
Ok(())
49+
}
3850

39-
Self::handler(
40-
req.map(hyper_migration::Body::incoming),
41-
invocation_processor.clone(),
42-
)
43-
});
51+
fn make_router(&self) -> Router {
52+
let invocation_processor = self.invocation_processor.clone();
4453

45-
let port = u16::try_from(AGENT_PORT).expect("AGENT_PORT is too large");
46-
let addr = SocketAddr::from(([127, 0, 0, 1], port));
47-
let listener = tokio::net::TcpListener::bind(&addr).await?;
48-
49-
let server = hyper::server::conn::http1::Builder::new();
50-
let mut joinset = tokio::task::JoinSet::new();
51-
loop {
52-
let conn = tokio::select! {
53-
con_res = listener.accept() => match con_res {
54-
Err(e)
55-
if matches!(
56-
e.kind(),
57-
io::ErrorKind::ConnectionAborted
58-
| io::ErrorKind::ConnectionReset
59-
| io::ErrorKind::ConnectionRefused
60-
) =>
61-
{
62-
continue;
63-
}
64-
Err(e) => {
65-
error!("Server error: {e}");
66-
return Err(e.into());
67-
}
68-
Ok((conn, _)) => conn,
69-
},
70-
finished = async {
71-
match joinset.join_next().await {
72-
Some(finished) => finished,
73-
None => std::future::pending().await,
74-
}
75-
} => match finished {
76-
Err(e) if e.is_panic() => {
77-
std::panic::resume_unwind(e.into_panic());
78-
},
79-
Ok(()) | Err(_) => continue,
80-
},
81-
};
82-
let conn = hyper_util::rt::TokioIo::new(conn);
83-
let server = server.clone();
84-
let service = service.clone();
85-
joinset.spawn(async move {
86-
if let Err(e) = server.serve_connection(conn, service).await {
87-
debug!("Lifecycle connection error: {e}");
88-
}
89-
});
90-
}
54+
Router::new()
55+
.route(START_INVOCATION_PATH, post(Self::handle_start_invocation))
56+
.route(END_INVOCATION_PATH, post(Self::handle_end_invocation))
57+
.route(HELLO_PATH, get(Self::handle_hello))
58+
.with_state(invocation_processor)
9159
}
9260

93-
async fn handler(
94-
req: hyper_migration::HttpRequest,
95-
invocation_processor: Arc<Mutex<InvocationProcessor>>,
96-
) -> http::Result<hyper_migration::HttpResponse> {
97-
match (req.method(), req.uri().path()) {
98-
(&Method::POST, START_INVOCATION_PATH) => {
99-
let (parts, body) = req.into_parts();
100-
Self::universal_instrumentation_start(&parts.headers, body, invocation_processor)
101-
.await
102-
.1
61+
async fn handle_start_invocation(
62+
State(invocation_processor): State<Arc<Mutex<InvocationProcessor>>>,
63+
request: Request,
64+
) -> Response {
65+
let (parts, body) = match extract_request_body(request).await {
66+
Ok(r) => r,
67+
Err(e) => {
68+
error!("Failed to extract request body: {e}");
69+
return (
70+
StatusCode::BAD_REQUEST,
71+
"Could not read start invocation request body",
72+
)
73+
.into_response();
10374
}
104-
(&Method::POST, END_INVOCATION_PATH) => {
105-
let (parts, body) = req.into_parts();
106-
match Self::universal_instrumentation_end(
107-
&parts.headers,
108-
body,
109-
invocation_processor,
75+
};
76+
77+
let (_, response) =
78+
Self::universal_instrumentation_start(&parts.headers, body, invocation_processor).await;
79+
80+
response
81+
}
82+
83+
async fn handle_end_invocation(
84+
State(invocation_processor): State<Arc<Mutex<InvocationProcessor>>>,
85+
request: Request,
86+
) -> Response {
87+
let (parts, body) = match extract_request_body(request).await {
88+
Ok(r) => r,
89+
Err(e) => {
90+
error!("Failed to extract request body: {e}");
91+
return (
92+
StatusCode::BAD_REQUEST,
93+
"Could not read end invocation request body",
11094
)
111-
.await
112-
{
113-
Ok(response) => Ok(response),
114-
Err(e) => {
115-
error!("Failed to end invocation {e}");
116-
Ok(Response::builder()
117-
.status(500)
118-
.body(hyper_migration::Body::empty())
119-
.expect("no body"))
120-
}
121-
}
95+
.into_response();
12296
}
123-
(&Method::GET, HELLO_PATH) => Self::hello_handler(),
124-
_ => {
125-
let mut not_found = Response::default();
126-
*not_found.status_mut() = StatusCode::NOT_FOUND;
127-
Ok(not_found)
97+
};
98+
99+
match Self::universal_instrumentation_end(&parts.headers, body, invocation_processor).await
100+
{
101+
Ok(response) => response,
102+
Err(e) => {
103+
error!("Failed to end invocation {e}");
104+
(
105+
StatusCode::INTERNAL_SERVER_ERROR,
106+
"Failed to end invocation",
107+
)
108+
.into_response()
128109
}
129110
}
130111
}
131112

113+
#[allow(clippy::unused_async)]
114+
async fn handle_hello() -> Response {
115+
warn!("[DEPRECATED] Please upgrade your tracing library, the /hello route is deprecated");
116+
(StatusCode::OK, json!({}).to_string()).into_response()
117+
}
118+
132119
pub async fn universal_instrumentation_start(
133120
headers: &HeaderMap,
134-
body: Body,
121+
body: Bytes,
135122
invocation_processor: Arc<Mutex<InvocationProcessor>>,
136-
) -> (u64, http::Result<hyper_migration::HttpResponse>) {
123+
) -> (u64, Response) {
137124
debug!("Received start invocation request");
138-
match body.collect().await {
139-
Ok(b) => {
140-
let body = b.to_bytes().to_vec();
141-
142-
let headers = Self::headers_to_map(headers);
143-
144-
let extracted_span_context = {
145-
let mut processor = invocation_processor.lock().await;
146-
processor.on_universal_instrumentation_start(headers, body)
147-
};
148-
let mut response = Response::builder().status(200);
149-
150-
let found_parent_span_id;
151-
// If a `SpanContext` exists, then tell the tracer to use it.
152-
// todo: update this whole code with DatadogHeaderPropagator::inject
153-
// since this logic looks messy
154-
if let Some(sp) = extracted_span_context {
155-
response = response.header(DATADOG_TRACE_ID_KEY, sp.trace_id.to_string());
156-
if let Some(priority) = sp.sampling.and_then(|s| s.priority) {
157-
response =
158-
response.header(DATADOG_SAMPLING_PRIORITY_KEY, priority.to_string());
159-
}
160-
161-
// Handle 128 bit trace ids
162-
if let Some(trace_id_higher_order_bits) =
163-
sp.tags.get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY)
164-
{
165-
response = response.header(
166-
DATADOG_TAGS_KEY,
167-
format!("{DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY}={trace_id_higher_order_bits}"),
168-
);
169-
}
170-
found_parent_span_id = sp.span_id;
171-
} else {
172-
found_parent_span_id = 0;
173-
}
174-
175-
(
176-
found_parent_span_id,
177-
response.body(hyper_migration::Body::from(json!({}).to_string())),
178-
)
125+
let body = body.to_vec();
126+
127+
let headers = Self::headers_to_map(headers);
128+
129+
let extracted_span_context = {
130+
let mut processor = invocation_processor.lock().await;
131+
processor.on_universal_instrumentation_start(headers, body)
132+
};
133+
134+
let found_parent_span_id;
135+
136+
// If a `SpanContext` exists, then tell the tracer to use it.
137+
// todo: update this whole code with DatadogHeaderPropagator::inject
138+
// since this logic looks messy
139+
let response = if let Some(sp) = extracted_span_context {
140+
let mut headers = HeaderMap::new();
141+
headers.insert(
142+
DATADOG_TRACE_ID_KEY,
143+
sp.trace_id
144+
.to_string()
145+
.parse()
146+
.expect("Failed to parse trace id"),
147+
);
148+
if let Some(priority) = sp.sampling.and_then(|s| s.priority) {
149+
headers.insert(
150+
DATADOG_SAMPLING_PRIORITY_KEY,
151+
priority
152+
.to_string()
153+
.parse()
154+
.expect("Failed to parse sampling priority"),
155+
);
179156
}
180-
Err(e) => {
181-
error!("Could not read start invocation request body {e}");
182157

183-
(
184-
0,
185-
Response::builder()
186-
.status(400)
187-
.body(hyper_migration::Body::from(
188-
"Could not read start invocation request body",
189-
)),
190-
)
158+
// Handle 128 bit trace ids
159+
if let Some(trace_id_higher_order_bits) =
160+
sp.tags.get(DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY)
161+
{
162+
headers.insert(
163+
DATADOG_TAGS_KEY,
164+
format!(
165+
"{DATADOG_HIGHER_ORDER_TRACE_ID_BITS_KEY}={trace_id_higher_order_bits}"
166+
)
167+
.parse()
168+
.expect("Failed to parse tags"),
169+
);
191170
}
192-
}
171+
found_parent_span_id = sp.span_id;
172+
173+
(StatusCode::OK, headers, json!({}).to_string()).into_response()
174+
} else {
175+
found_parent_span_id = 0;
176+
(StatusCode::OK, json!({}).to_string()).into_response()
177+
};
178+
179+
(found_parent_span_id, response)
193180
}
194181

195182
pub async fn universal_instrumentation_end(
196183
headers: &HeaderMap,
197-
body: Body,
184+
body: Bytes,
198185
invocation_processor: Arc<Mutex<InvocationProcessor>>,
199-
) -> http::Result<hyper_migration::HttpResponse> {
186+
) -> Result<Response, Box<dyn std::error::Error>> {
200187
debug!("Received end invocation request");
201-
match body.collect().await {
202-
Ok(b) => {
203-
let body = b.to_bytes().to_vec();
204-
let mut processor = invocation_processor.lock().await;
205-
206-
let headers = Self::headers_to_map(headers);
207-
processor.on_universal_instrumentation_end(headers, body);
208-
drop(processor);
209-
210-
Response::builder()
211-
.status(200)
212-
.body(hyper_migration::Body::from(json!({}).to_string()))
213-
}
214-
Err(e) => {
215-
error!("Could not read end invocation request body {e}");
188+
let body = body.to_vec();
189+
let mut processor = invocation_processor.lock().await;
216190

217-
Response::builder()
218-
.status(400)
219-
.body(hyper_migration::Body::from(
220-
"Could not read end invocation request body",
221-
))
222-
}
223-
}
224-
}
191+
let headers = Self::headers_to_map(headers);
192+
processor.on_universal_instrumentation_end(headers, body);
193+
drop(processor);
225194

226-
fn hello_handler() -> http::Result<hyper_migration::HttpResponse> {
227-
warn!("[DEPRECATED] Please upgrade your tracing library, the /hello route is deprecated");
228-
Response::builder()
229-
.status(200)
230-
.body(hyper_migration::Body::from(json!({}).to_string()))
195+
Ok((StatusCode::OK, json!({}).to_string()).into_response())
231196
}
232197

233198
fn headers_to_map(headers: &HeaderMap) -> HashMap<String, String> {

0 commit comments

Comments
 (0)