Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bins/nittei/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tracing-subscriber = { version = "0.3", features = [
"fmt",
"json",
"registry",
"time",
] }
opentelemetry = { version = "0.30.0", default-features = false, features = [
"trace",
Expand All @@ -61,6 +62,8 @@ reqwest = { version = "0.12", default-features = false, features = [
chrono = "0.4.39"
chrono-tz = "0.10.1"

serde_json = "1"

# Use the `jemallocator` crate to use jemalloc as the global allocator.
tikv-jemallocator = "0.6"

Expand Down
158 changes: 158 additions & 0 deletions bins/nittei/src/telemetry/logs_json_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use nittei_api::telemetry::correlation_layer::CorrelationId;
use opentelemetry::trace::TraceContextExt;
use tracing::{Event, Subscriber};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::{
fmt::format::{FormatEvent, FormatFields},
registry::LookupSpan,
};

/// Formatter for logs as JSON with correlation id & Datadog trace & span ids
pub struct LogsJsonFmt;

impl<S, N> FormatEvent<S, N> for LogsJsonFmt
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'writer> FormatFields<'writer> + 'static,
{
/// Format the event as JSON with correlation id & Datadog trace & span ids
fn format_event(
&self,
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
mut writer: tracing_subscriber::fmt::format::Writer<'_>,
event: &Event<'_>,
) -> std::fmt::Result {
use std::borrow::Cow;

// Collect the event fields into a JSON map
let mut fields_map = serde_json::Map::new();

// Custom visitor to collect fields
struct FieldVisitor<'a>(&'a mut serde_json::Map<String, serde_json::Value>);

// Collect the event fields into a JSON map
impl<'a> tracing::field::Visit for FieldVisitor<'a> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
self.0.insert(
field.name().to_string(),
serde_json::Value::String(format!("{:?}", value)),
);
}

fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
self.0.insert(
field.name().to_string(),
serde_json::Value::String(value.to_string()),
);
}

fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
self.0.insert(
field.name().to_string(),
serde_json::Value::Number(serde_json::Number::from(value)),
);
}

fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
self.0.insert(
field.name().to_string(),
serde_json::Value::Number(serde_json::Number::from(value)),
);
}

fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
self.0
.insert(field.name().to_string(), serde_json::Value::Bool(value));
}
}

// Collect the event fields into a JSON map
let mut visitor = FieldVisitor(&mut fields_map);
event.record(&mut visitor);
let fields_value = serde_json::Value::Object(fields_map);

// Get current span's otel context -> Datadog ids
let (dd_trace_id, dd_span_id) = {
let span = tracing::Span::current();
let ctx_otel = span.context();
let otel_span = ctx_otel.span();
let sc = otel_span.span_context();
if sc.is_valid() {
// Datadog expects 64-bit decimal ids:
let trace_id_bytes = sc.trace_id().to_bytes();
let tid64 = u64::from_be_bytes([
trace_id_bytes[8],
trace_id_bytes[9],
trace_id_bytes[10],
trace_id_bytes[11],
trace_id_bytes[12],
trace_id_bytes[13],
trace_id_bytes[14],
trace_id_bytes[15],
]);
let sid64 = u64::from_be_bytes(sc.span_id().to_bytes());
(Some(tid64), Some(sid64))
} else {
(None, None)
}
};

// Include correlation_id field from the current span if present
let correlation_id = {
if let Some(scope) = ctx.lookup_current() {
scope
.extensions()
.get::<CorrelationId>()
.map(|c| Cow::from(c.0.clone()))
} else {
None
}
};

// Build final JSON
let mut obj = serde_json::Map::new();

// timestamp
obj.insert(
"@timestamp".into(),
serde_json::Value::String(chrono::Utc::now().to_rfc3339()),
);

// level & target
let meta = event.metadata();
obj.insert("level".into(), meta.level().to_string().into());
obj.insert("target".into(), meta.target().into());

// event fields
if let serde_json::Value::Object(map) = fields_value {
for (k, v) in map {
obj.insert(k, v);
}
}

// correlation id (if any)
if let Some(cid) = correlation_id {
obj.insert(
"correlation_id".into(),
serde_json::Value::String(cid.into_owned()),
);
}

// datadog trace & span ids
if let (Some(t), Some(s)) = (dd_trace_id, dd_span_id) {
obj.insert(
"dd.trace_id".into(),
serde_json::Value::String(t.to_string()),
);
obj.insert(
"dd.span_id".into(),
serde_json::Value::String(s.to_string()),
);
}

// write out one line
let json_str =
serde_json::to_string(&serde_json::Value::Object(obj)).map_err(|_| std::fmt::Error)?;
writeln!(writer, "{}", json_str)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ use opentelemetry_sdk::{
use tracing::warn;
use tracing_subscriber::{EnvFilter, Registry, layer::SubscriberExt};

pub mod logs_json_format;

use logs_json_format::LogsJsonFmt;

/// Register a subscriber as global default to process span data.
///
/// It should only be called once!
Expand Down Expand Up @@ -60,7 +64,11 @@ pub fn init_subscriber() -> anyhow::Result<()> {
.with(
tracing_subscriber::fmt::layer()
.json()
.with_current_span(false),
.with_target(false)
.with_file(false)
.with_line_number(false)
.with_current_span(false)
.event_format(LogsJsonFmt),
)
.with(telemetry_layer);

Expand Down
8 changes: 6 additions & 2 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tower = "0.5"

serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
futures = "0.3"

validator = { version = "0.20", features = ["derive"] }
reqwest = { version = "0.12", default-features = false, features = [
"http2",
Expand All @@ -38,14 +38,18 @@ utoipa = { version = "5.3", features = ["axum_extras", "uuid", "chrono"] }
utoipa-axum = { version = "0.2" }
utoipa-swagger-ui = { version = "9.0", features = ["axum", "vendored"] }

async-trait = "0.1"
rrule = "0.14"
chrono = { version = "0.4.39", features = ["serde"] }
chrono-tz = "0.10.1"
uuid = "1.17"

anyhow = "1.0"
jsonwebtoken = "9"
thiserror = "2.0"

async-trait = "0.1"
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"

opentelemetry = "0.30.0"
opentelemetry-http = "0.30.0"
Expand Down
10 changes: 7 additions & 3 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ mod account;
mod calendar;
mod error;
mod event;
mod http_logger;
mod job_schedulers;
mod schedule;
mod service;
mod shared;
mod status;
pub mod telemetry;
mod user;

use std::sync::Arc;

use axum::{Extension, Router, http::header};
use futures::lock::Mutex;
use http_logger::metadata_middleware;
use job_schedulers::{start_reminder_generation_job, start_send_reminders_job};
use nittei_domain::{
Account,
Expand All @@ -25,6 +24,7 @@ use nittei_domain::{
PEMKey,
};
use nittei_infra::NitteiContext;
use telemetry::http_logger::metadata_middleware;
use tokio::{
net::TcpListener,
sync::oneshot::{self, Sender},
Expand All @@ -48,8 +48,11 @@ use utoipa_axum::router::OpenApiRouter;
use utoipa_swagger_ui::SwaggerUi;

use crate::{
http_logger::{NitteiTracingOnFailure, NitteiTracingOnResponse, NitteiTracingSpanBuilder},
shared::auth::NITTEI_X_API_KEY_HEADER,
telemetry::{
correlation_layer::CorrelationIdLayer,
http_logger::{NitteiTracingOnFailure, NitteiTracingOnResponse, NitteiTracingSpanBuilder},
},
};

/// Configure the Actix server API
Expand Down Expand Up @@ -158,6 +161,7 @@ impl Application {
.on_failure(NitteiTracingOnFailure {}),
),
)
.layer(CorrelationIdLayer)
.layer(Extension(context.clone()))
.layer(Extension(shared_state.clone()))
.split_for_parts();
Expand Down
75 changes: 75 additions & 0 deletions crates/api/src/telemetry/correlation_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use axum::{
body::Body,
http::{HeaderName, HeaderValue, Request},
response::Response,
};
use tower::{Layer, Service};
use uuid::Uuid;

static CORRELATION_ID_HEADER: HeaderName = HeaderName::from_static("x-correlation-id");

#[derive(Clone, Default)]
pub struct CorrelationIdLayer;

impl<S> Layer<S> for CorrelationIdLayer {
type Service = CorrelationIdService<S>;
fn layer(&self, inner: S) -> Self::Service {
CorrelationIdService { inner }
}
}

#[derive(Clone)]
pub struct CorrelationIdService<S> {
inner: S,
}

impl<S> Service<Request<Body>> for CorrelationIdService<S>
where
S: Service<Request<Body>, Response = Response> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Response = Response;
type Error = S::Error;
type Future = Pin<Box<dyn std::future::Future<Output = Result<Response, S::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, mut req: Request<Body>) -> Self::Future {
// 1) get or generate
let cid = req
.headers()
.get(&CORRELATION_ID_HEADER)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned())
.unwrap_or_else(|| Uuid::new_v4().to_string());

// 2) put in extensions so handlers & TraceLayer can read it
req.extensions_mut().insert(CorrelationId(cid.clone()));

// proceed
let fut = self.inner.clone().call(req);

Box::pin(async move {
let mut res = fut.await?;

// 3) echo back on response
res.headers_mut().insert(
CORRELATION_ID_HEADER.clone(),
HeaderValue::from_str(&cid).unwrap_or(HeaderValue::from_static("invalid")),
);

Ok(res)
})
}
}

#[derive(Clone, Debug)]
pub struct CorrelationId(pub String);
Loading