Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions nts-pool-ke/src/pool_ke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::{
};

use notify::{RecursiveMode, Watcher};
use opentelemetry::{KeyValue, metrics::Counter};
use opentelemetry::{
KeyValue,
metrics::{Counter, Histogram},
};
use pool_nts::{
AlgorithmDescription, BufferBorrowingReader, ClientRequest, ErrorCode, ErrorResponse,
FixedKeyRequest, KeyExchangeResponse, MAX_MESSAGE_SIZE, NoAgreementResponse, NtsError,
Expand All @@ -27,6 +30,7 @@ use crate::{
error::PoolError,
haproxy::parse_haproxy_header,
servers::{ConnectionType, Server, ServerConnection, ServerManager},
telemetry::TIMING_HISTOGRAM_BUCKET_BOUNDARIES,
util::{ActiveCounter, load_certificates},
};

Expand All @@ -44,7 +48,9 @@ struct NtsPoolKe<S> {
server_tls: RwLock<TlsAcceptor>,
monitoring_keys: RwLock<Arc<HashSet<String>>>,
session_counter: Counter<u64>,
session_duration: Histogram<f64>,
upstream_session_counter: Counter<u64>,
upstream_session_duration: Histogram<f64>,
server_manager: S,
}

Expand All @@ -63,17 +69,33 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
.with_description("number of ke sessions with clients")
.build();

let session_duration = meter
.f64_histogram("session_duration")
.with_description("Duration of the session")
.with_unit("s")
.with_boundaries(TIMING_HISTOGRAM_BUCKET_BOUNDARIES.to_vec())
.build();

let upstream_session_counter = meter
.u64_counter("upstream_cookie_sessions")
.with_description("number of ke sessions with upstream for getting cookies")
.build();

let upstream_session_duration = meter
.f64_histogram("upstream_get_cookie_duration")
.with_description("Duration to get cookies from upstream time source")
.with_unit("s")
.with_boundaries(TIMING_HISTOGRAM_BUCKET_BOUNDARIES.to_vec())
.build();

Ok(NtsPoolKe {
config,
server_tls,
monitoring_keys,
session_counter,
session_duration,
upstream_session_counter,
upstream_session_duration,
server_manager,
})
}
Expand Down Expand Up @@ -150,6 +172,7 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
};
prereserved_permits.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let active_connection_token = active_connection_counter.get_active_token();
let start_time = std::time::Instant::now();
let self_clone = self.clone();
let active_monitor_connection_counter_clone = active_monitor_connection_counter.clone();

Expand All @@ -165,6 +188,13 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
.await
{
Err(_) => {
self_clone.session_duration.record(
start_time.elapsed().as_secs_f64(),
&[
KeyValue::new("outcome", "timeout"),
KeyValue::new("is_monitor", monitor_count_token.is_some()),
],
);
::tracing::debug!(?source_address, "NTS Pool KE timed out");
self_clone.session_counter.add(
1,
Expand All @@ -175,6 +205,13 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
);
}
Ok(Err(err)) => {
self_clone.session_duration.record(
start_time.elapsed().as_secs_f64(),
&[
KeyValue::new("outcome", "error"),
KeyValue::new("is_monitor", monitor_count_token.is_some()),
],
);
::tracing::debug!(?err, ?source_address, "NTS Pool KE failed");
self_clone.session_counter.add(
1,
Expand All @@ -185,6 +222,13 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
);
}
Ok(Ok(())) => {
self_clone.session_duration.record(
start_time.elapsed().as_secs_f64(),
&[
KeyValue::new("outcome", "success"),
KeyValue::new("is_monitor", monitor_count_token.is_some()),
],
);
::tracing::debug!(?source_address, "NTS Pool KE completed");
self_clone.session_counter.add(
1,
Expand Down Expand Up @@ -593,14 +637,29 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
}
}

// TODO: Implement connection reuse
let start_time = std::time::Instant::now();
match tokio::time::timeout(self.config.timesource_timeout, async {
let server_stream = server.connect(connection_type).await?;
workaround_lifetime_bug(buffer, request, server_stream).await
})
.await
{
Ok(v) => {
self.upstream_session_duration.record(
start_time.elapsed().as_secs_f64(),
&[
KeyValue::new(
"outcome",
match v {
Ok(_) => "success",
Err(_) => "error",
},
),
KeyValue::new("server", server.name().clone()),
KeyValue::new("uuid", server.uuid().clone()),
KeyValue::new("is_monitor", for_monitor),
],
);
self.upstream_session_counter.add(
1,
&[
Expand All @@ -619,6 +678,15 @@ impl<S: ServerManager + 'static> NtsPoolKe<S> {
v
}
Err(_) => {
self.upstream_session_duration.record(
start_time.elapsed().as_secs_f64(),
&[
KeyValue::new("outcome", "timeout"),
KeyValue::new("server", server.name().clone()),
KeyValue::new("uuid", server.uuid().clone()),
KeyValue::new("is_monitor", for_monitor),
],
);
self.upstream_session_counter.add(
1,
&[
Expand Down
42 changes: 40 additions & 2 deletions nts-pool-ke/src/servers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::{
};

use notify::{RecursiveMode, Watcher};
use opentelemetry::{KeyValue, metrics::Counter};
use opentelemetry::{
KeyValue,
metrics::{Counter, Histogram},
};
use pool_nts::{
AlgorithmDescription, AlgorithmId, BufferBorrowingReader, MAX_MESSAGE_SIZE, ProtocolId,
ServerInformationRequest, ServerInformationResponse,
Expand All @@ -21,7 +24,10 @@ use tokio::{
};
use tokio_rustls::{TlsConnector, client::TlsStream};

use crate::{config::BackendConfig, error::PoolError, util::load_certificates};
use crate::{
config::BackendConfig, error::PoolError, telemetry::TIMING_HISTOGRAM_BUCKET_BOUNDARIES,
util::load_certificates,
};

mod geo;
pub use geo::GeographicServerManager;
Expand Down Expand Up @@ -188,6 +194,18 @@ async fn fetch_support_data(
.build()
});

static SUPPORT_REQUEST_DURATION: OnceLock<Histogram<f64>> = OnceLock::new();

let support_request_duration = SUPPORT_REQUEST_DURATION.get_or_init(|| {
opentelemetry::global::meter("PoolKe")
.f64_histogram("support_request_duration")
.with_description("Duration of support information requests to a server")
.with_unit("s")
.with_boundaries(TIMING_HISTOGRAM_BUCKET_BOUNDARIES.to_vec())
.build()
});

let start = std::time::Instant::now();
match tokio::time::timeout(timeout, async {
ServerInformationRequest {
key: key.into(),
Expand Down Expand Up @@ -222,6 +240,19 @@ async fn fetch_support_data(
.await
{
Ok(v) => {
support_request_duration.record(
start.elapsed().as_secs_f64(),
&[
KeyValue::new("uuid", uuid.clone()),
KeyValue::new(
"outcome",
match v {
Ok(_) => "success",
Err(_) => "error",
},
),
],
);
support_request_counter.add(
1,
&[
Expand All @@ -238,6 +269,13 @@ async fn fetch_support_data(
v
}
Err(_) => {
support_request_duration.record(
start.elapsed().as_secs_f64(),
&[
KeyValue::new("uuid", uuid.clone()),
KeyValue::new("outcome", "timeout"),
],
);
support_request_counter.add(
1,
&[
Expand Down
5 changes: 5 additions & 0 deletions nts-pool-ke/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::metrics::SdkMeterProvider;

// For now, three buckets per decade (power of 10), grouping everything below 1 ms and above 5s
pub(crate) const TIMING_HISTOGRAM_BUCKET_BOUNDARIES: &[f64] = &[
0.0, 1e-3, 2e-3, 5e-3, 1e-2, 2e-2, 5e-2, 1e-1, 2e-1, 5e-1, 1.0, 2.0, 5.0,
];

fn build_otlp_exporter() -> Option<opentelemetry_otlp::MetricExporter> {
let otlp_url = match std::env::var("OTEL_METRICS_EXPORT_DESTINATION") {
Ok(otlp_url) => otlp_url,
Expand Down
Loading