Skip to content

Commit 135597b

Browse files
committed
Refactor pool key sampling logic: improve readability of environment variable parsing and formatting
1 parent c50ba72 commit 135597b

File tree

5 files changed

+236
-60
lines changed

5 files changed

+236
-60
lines changed

crates/pavis/src/proxy/service/io.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ static SNI_FRAGMENT_WARN_COUNTER: std::sync::atomic::AtomicU64 =
3333
static LOGGED_UPSTREAM_CHECK_COUNTER: std::sync::atomic::AtomicU64 =
3434
std::sync::atomic::AtomicU64::new(0);
3535
static LOGGED_UPSTREAM_KEYS: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
36+
static POOL_KEY_SAMPLE_COUNTER: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
37+
static POOL_KEY_SAMPLE_RATE: OnceLock<u64> = OnceLock::new();
38+
const DEFAULT_POOL_KEY_SAMPLE_RATE: u64 = 16;
3639

3740
fn should_log_upstream_config(hash: u64) -> bool {
3841
if !LOGGED_UPSTREAM_CHECK_COUNTER
@@ -48,6 +51,23 @@ fn should_log_upstream_config(hash: u64) -> bool {
4851
false
4952
}
5053

54+
fn should_sample_pool_key() -> bool {
55+
let rate =
56+
*POOL_KEY_SAMPLE_RATE.get_or_init(|| match std::env::var("PAVIS_POOL_KEY_SAMPLE_RATE") {
57+
Ok(value) => value
58+
.parse::<u64>()
59+
.ok()
60+
.filter(|v| *v > 0)
61+
.unwrap_or(DEFAULT_POOL_KEY_SAMPLE_RATE),
62+
Err(_) => DEFAULT_POOL_KEY_SAMPLE_RATE,
63+
});
64+
if rate == 1 {
65+
return true;
66+
}
67+
let count = POOL_KEY_SAMPLE_COUNTER.fetch_add(1, Ordering::Relaxed);
68+
count % rate == 0
69+
}
70+
5171
#[async_trait]
5272
impl ProxyHttp for Proxy {
5373
type CTX = RouterContext;
@@ -875,7 +895,9 @@ impl<'a> UpstreamPeerBuilder<'a> {
875895
}
876896

877897
let sni_label = sni_value.as_ref().map(|name| name.0.as_str()).unwrap_or("");
878-
if let Some(tracker) = self.telemetry.pool_key_tracker.as_ref() {
898+
if let Some(tracker) = self.telemetry.pool_key_tracker.as_ref()
899+
&& should_sample_pool_key()
900+
{
879901
let snapshot = tracker.record(
880902
upstream_name.0.as_str(),
881903
reuse_key_hash(&addr, sni_label, verify_mode, cert),

crates/pavis/src/router/matcher.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,17 +118,8 @@ pub(crate) fn match_request<'a>(
118118
}
119119

120120
if !matches_method(&route.matcher.method, method) {
121-
println!(
122-
"DEBUG: Method mismatch for route {}: req={} predicate={:?}",
123-
compiled.index, method, route.matcher.method
124-
);
125121
stats.record_method_miss();
126122
continue;
127-
} else {
128-
println!(
129-
"DEBUG: Method MATCH for route {}: req={} predicate={:?}",
130-
compiled.index, method, route.matcher.method
131-
);
132123
}
133124

134125
if !matches_headers(

crates/pavis/src/telemetry/access_log.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio::sync::mpsc;
1313
use tokio::time::Duration;
1414

1515
pub struct AccessLog {
16-
tx: mpsc::Sender<LogEntry>,
16+
tx: Option<mpsc::Sender<LogEntry>>,
1717
enabled: bool,
1818
metrics: Mutex<Option<Arc<MetricsRegistry>>>,
1919
}
@@ -32,6 +32,9 @@ impl Service for AccessLogWorker {
3232
mut shutdown: tokio::sync::watch::Receiver<bool>,
3333
_threads: usize,
3434
) {
35+
if matches!(self.config, AccessLogPolicy::Disabled) || self.rx.is_none() {
36+
return;
37+
}
3538
let mut rx = self.rx.take().expect("Worker started twice");
3639

3740
let mut file_writer = if let AccessLogPolicy::File(path) = &self.config {
@@ -117,12 +120,17 @@ impl Service for AccessLogWorker {
117120

118121
impl AccessLog {
119122
pub fn new(config: &AccessLogPolicy) -> (Self, AccessLogWorker) {
120-
let (tx, rx) = mpsc::channel::<LogEntry>(access_log_channel_capacity());
121123
let enabled = *config != AccessLogPolicy::Disabled;
124+
let (tx, rx) = if enabled {
125+
let (tx, rx) = mpsc::channel::<LogEntry>(access_log_channel_capacity());
126+
(Some(tx), Some(rx))
127+
} else {
128+
(None, None)
129+
};
122130
let throttle_ms = access_log_throttle_ms();
123131

124132
let worker = AccessLogWorker {
125-
rx: Some(rx),
133+
rx,
126134
config: config.clone(),
127135
throttle_ms,
128136
};
@@ -159,6 +167,9 @@ impl AccessLog {
159167
if !enabled {
160168
return;
161169
}
170+
let Some(tx) = &self.tx else {
171+
return;
172+
};
162173

163174
let req = session.req_header();
164175
let method = req.method.clone();
@@ -212,7 +223,7 @@ impl AccessLog {
212223
};
213224

214225
// Non-blocking send (lossy if full)
215-
if let Err(mpsc::error::TrySendError::Full(_)) = self.tx.try_send(entry)
226+
if let Err(mpsc::error::TrySendError::Full(_)) = tx.try_send(entry)
216227
&& let Some(handle) = self
217228
.metrics
218229
.lock()
@@ -367,7 +378,9 @@ mod tests {
367378
upstream_duration_ms: Some(50),
368379
};
369380
let expected = format_log_line(&entry);
370-
let _ = access_log.tx.try_send(entry);
381+
if let Some(tx) = &access_log.tx {
382+
let _ = tx.try_send(entry);
383+
}
371384

372385
// Run worker briefly
373386
let (shutdown_tx, shutdown_rx) = watch::channel(false);

crates/pavis/src/telemetry/metrics.rs

Lines changed: 122 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::router::MatchVerdict;
22
use async_trait::async_trait;
3-
use metrics::{counter, gauge, histogram};
3+
use metrics::{SharedString, counter, gauge, histogram};
44
use metrics_exporter_prometheus::PrometheusBuilder;
55
use pingora::services::Service;
66
use std::collections::{HashMap, VecDeque};
@@ -48,6 +48,7 @@ impl<T: MetricsTransport> PrometheusEndpoint<T> {
4848
Ok(handle) => {
4949
let registry = MetricsRegistry {
5050
_handle: Arc::new(handle),
51+
labels: Arc::new(MetricLabels::new()),
5152
};
5253
(
5354
Self {
@@ -196,9 +197,12 @@ async fn serve_metrics(
196197
#[derive(Clone)]
197198
pub struct MetricsRegistry {
198199
_handle: Arc<metrics_exporter_prometheus::PrometheusHandle>,
200+
labels: Arc<MetricLabels>,
199201
}
200202

201203
pub const POOL_KEY_CARDINALITY_CAP: usize = 1024;
204+
const METRIC_LABEL_CACHE_CAP: usize = 1024;
205+
const METRIC_ROUTE_LABEL_CACHE_CAP: usize = 4096;
202206
const METRICS_REQUEST_LINE_LIMIT_BYTES: usize = 4096;
203207
const METRICS_HEADER_LIMIT_BYTES: usize = 16 * 1024;
204208
const METRICS_READ_TIMEOUT: Duration = Duration::from_secs(5);
@@ -215,6 +219,87 @@ struct BoundedKeySet {
215219
order: VecDeque<(u64, Instant)>,
216220
}
217221

222+
struct MetricLabels {
223+
common: LabelCache,
224+
route: LabelCache,
225+
status: Mutex<HashMap<u16, SharedString>>,
226+
}
227+
228+
impl MetricLabels {
229+
fn new() -> Self {
230+
Self {
231+
common: LabelCache::new(METRIC_LABEL_CACHE_CAP),
232+
route: LabelCache::new(METRIC_ROUTE_LABEL_CACHE_CAP),
233+
status: Mutex::new(HashMap::new()),
234+
}
235+
}
236+
237+
fn common(&self, value: &str) -> SharedString {
238+
self.common.get(value)
239+
}
240+
241+
fn route(&self, value: &str) -> SharedString {
242+
self.route.get(value)
243+
}
244+
245+
fn status(&self, value: u16) -> SharedString {
246+
let mut guard = self
247+
.status
248+
.lock()
249+
.expect("metrics status cache lock poisoned");
250+
if let Some(existing) = guard.get(&value) {
251+
return existing.clone();
252+
}
253+
let shared = SharedString::from(value.to_string());
254+
guard.insert(value, shared.clone());
255+
shared
256+
}
257+
}
258+
259+
struct LabelCache {
260+
cap: usize,
261+
inner: Mutex<LabelCacheInner>,
262+
}
263+
264+
struct LabelCacheInner {
265+
map: HashMap<String, SharedString>,
266+
order: VecDeque<String>,
267+
}
268+
269+
impl LabelCache {
270+
fn new(cap: usize) -> Self {
271+
Self {
272+
cap,
273+
inner: Mutex::new(LabelCacheInner {
274+
map: HashMap::new(),
275+
order: VecDeque::new(),
276+
}),
277+
}
278+
}
279+
280+
fn get(&self, value: &str) -> SharedString {
281+
let mut guard = self
282+
.inner
283+
.lock()
284+
.expect("metrics label cache lock poisoned");
285+
if let Some(existing) = guard.map.get(value) {
286+
return existing.clone();
287+
}
288+
let shared = SharedString::from(value.to_string());
289+
let key = value.to_string();
290+
guard.map.insert(key.clone(), shared.clone());
291+
guard.order.push_back(key);
292+
while guard.map.len() > self.cap {
293+
if let Some(evicted) = guard.order.pop_front() {
294+
guard.map.remove(&evicted);
295+
} else {
296+
break;
297+
}
298+
}
299+
shared
300+
}
301+
}
302+
218303
impl BoundedKeySet {
219304
fn new(cap: usize) -> Self {
220305
Self {
@@ -369,36 +454,43 @@ impl MetricsRegistry {
369454
upstream: &str,
370455
duration_secs: f64,
371456
) {
457+
let method = self.labels.common(method);
458+
let route = self.labels.route(route_pattern);
459+
let status = self.labels.status(status);
460+
let upstream = self.labels.common(upstream);
461+
372462
counter!(
373463
"pavis_http_requests_total",
374-
"method" => method.to_string(),
375-
"route" => route_pattern.to_string(),
376-
"status" => status.to_string(),
377-
"upstream" => upstream.to_string(),
464+
"method" => method,
465+
"route" => route,
466+
"status" => status.clone(),
467+
"upstream" => upstream.clone(),
378468
)
379469
.increment(1);
380470

381471
histogram!(
382472
"pavis_http_request_duration_seconds",
383-
"method" => method.to_string(),
384-
"route" => route_pattern.to_string(),
385-
"status" => status.to_string(),
386-
"upstream" => upstream.to_string(),
473+
"method" => method,
474+
"route" => route,
475+
"status" => status,
476+
"upstream" => upstream,
387477
)
388478
.record(duration_secs);
389479
}
390480

391481
pub fn record_upstream_request(&self, upstream: &str, status: u16, duration_secs: f64) {
482+
let upstream = self.labels.common(upstream);
483+
let status = self.labels.status(status);
392484
counter!(
393485
"pavis_upstream_requests_total",
394-
"upstream" => upstream.to_string(),
395-
"status" => status.to_string(),
486+
"upstream" => upstream.clone(),
487+
"status" => status.clone(),
396488
)
397489
.increment(1);
398490

399491
histogram!(
400492
"pavis_upstream_request_duration_seconds",
401-
"upstream" => upstream.to_string(),
493+
"upstream" => upstream,
402494
)
403495
.record(duration_secs);
404496
}
@@ -413,41 +505,47 @@ impl MetricsRegistry {
413505
}
414506

415507
pub fn record_pool_size(&self, upstream: &str, size: f64) {
416-
gauge!("pavis_upstream_pool_size", "upstream" => upstream.to_string()).set(size);
508+
let upstream = self.labels.common(upstream);
509+
gauge!("pavis_upstream_pool_size", "upstream" => upstream).set(size);
417510
}
418511

419512
pub fn record_pool_key_cardinality(&self, upstream: &str, cardinality: usize, saturated: bool) {
513+
let upstream = self.labels.common(upstream);
420514
let reported = if saturated {
421515
(POOL_KEY_CARDINALITY_CAP + 1) as f64
422516
} else {
423517
cardinality as f64
424518
};
425519
gauge!(
426520
"pavis_upstream_pool_key_cardinality_approx",
427-
"upstream" => upstream.to_string()
521+
"upstream" => upstream
428522
)
429523
.set(reported);
430524
}
431525

432526
pub fn record_connection_reused(&self, upstream: &str) {
527+
let upstream = self.labels.common(upstream);
433528
counter!(
434529
"pavis_upstream_connection_reused_total",
435-
"upstream" => upstream.to_string()
530+
"upstream" => upstream
436531
)
437532
.increment(1);
438533
}
439534

440535
pub fn record_connection_new(&self, upstream: &str, reason: &str) {
536+
let upstream = self.labels.common(upstream);
537+
let reason = self.labels.common(reason);
441538
counter!(
442539
"pavis_upstream_connection_new_total",
443-
"upstream" => upstream.to_string(),
444-
"reason" => reason.to_string()
540+
"upstream" => upstream,
541+
"reason" => reason
445542
)
446543
.increment(1);
447544
}
448545

449546
pub fn update_config_stats(&self, version: &str, size_bytes: u64) {
450-
gauge!("pavis_runtime_config_version", "version" => version.to_string()).set(1.0);
547+
let version = self.labels.common(version);
548+
gauge!("pavis_runtime_config_version", "version" => version).set(1.0);
451549
gauge!("pavis_runtime_config_size_bytes").set(size_bytes as f64);
452550
gauge!("pavis_runtime_reload_last_timestamp").set(
453551
std::time::SystemTime::now()
@@ -462,18 +560,21 @@ impl MetricsRegistry {
462560
}
463561

464562
pub fn record_config_validation(&self, result: &str, reason: &str) {
563+
let result = self.labels.common(result);
564+
let reason = self.labels.common(reason);
465565
counter!(
466566
"pavis_config_validation_total",
467-
"result" => result.to_string(),
468-
"reason" => reason.to_string(),
567+
"result" => result,
568+
"reason" => reason,
469569
)
470570
.increment(1);
471571
}
472572

473573
pub fn record_config_apply(&self, result: &str) {
574+
let result = self.labels.common(result);
474575
counter!(
475576
"pavis_config_apply_total",
476-
"result" => result.to_string(),
577+
"result" => result,
477578
)
478579
.increment(1);
479580
}

0 commit comments

Comments
 (0)