Skip to content

Commit 40cab0b

Browse files
authored
feat: add uTP duration metrics (#1646)
1 parent 4c5963f commit 40cab0b

File tree

9 files changed

+88
-28
lines changed

9 files changed

+88
-28
lines changed

crates/metrics/src/overlay.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ use ethportal_api::types::portal_wire::{Request, Response};
22
use prometheus_exporter::{
33
self,
44
prometheus::{
5-
opts, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
5+
histogram_opts, opts, register_histogram_vec_with_registry,
6+
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec,
67
IntCounterVec, IntGaugeVec, Registry,
78
},
89
};
910

10-
use crate::labels::{MessageDirectionLabel, MessageLabel, UtpDirectionLabel, UtpOutcomeLabel};
11+
use crate::{
12+
labels::{MessageDirectionLabel, MessageLabel, UtpDirectionLabel, UtpOutcomeLabel},
13+
timer::DiscardOnDropHistogramTimer,
14+
};
1115

1216
/// Contains metrics reporters for use in the overlay network
1317
/// (eg. `portalnet/src/overlay.rs` & `portalnet/src/overlay_service.rs`).
@@ -18,6 +22,7 @@ pub struct OverlayMetrics {
1822
pub message_total: IntCounterVec,
1923
pub utp_outcome_total: IntCounterVec,
2024
pub utp_active_gauge: IntGaugeVec,
25+
pub utp_connection_duration: HistogramVec,
2126
pub validation_total: IntCounterVec,
2227
}
2328

@@ -47,6 +52,14 @@ impl OverlayMetrics {
4752
&["protocol", "direction"],
4853
registry
4954
)?;
55+
let utp_connection_duration = register_histogram_vec_with_registry!(
56+
histogram_opts!(
57+
"trin_utp_connection_duration",
58+
"the time taken to complete a utp transfer"
59+
),
60+
&["protocol", "direction"],
61+
registry
62+
)?;
5063
let validation_total = register_int_counter_vec_with_registry!(
5164
opts!(
5265
"trin_validation_total",
@@ -59,6 +72,7 @@ impl OverlayMetrics {
5972
message_total,
6073
utp_outcome_total,
6174
utp_active_gauge,
75+
utp_connection_duration,
6276
validation_total,
6377
})
6478
}
@@ -157,6 +171,18 @@ impl OverlayMetricsReporter {
157171
.dec();
158172
}
159173

174+
pub fn start_utp_process_timer(
175+
&self,
176+
direction: UtpDirectionLabel,
177+
) -> DiscardOnDropHistogramTimer {
178+
DiscardOnDropHistogramTimer::new(
179+
self.overlay_metrics
180+
.utp_connection_duration
181+
.with_label_values(&[&self.protocol, direction.into()])
182+
.clone(),
183+
)
184+
}
185+
160186
//
161187
// Validations
162188
//

crates/portalnet/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ pub mod put_content;
1313
pub mod socket;
1414
pub mod types;
1515
pub mod utils;
16-
pub mod utp_controller;
16+
pub mod utp;

crates/portalnet/src/overlay/protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ use crate::{
5959
kbucket::{Entry, SharedKBucketsTable},
6060
node::Node,
6161
},
62-
utp_controller::UtpController,
62+
utp::controller::UtpController,
6363
};
6464

6565
/// Overlay protocol is a layer on top of discv5 that handles all requests from the overlay networks

crates/portalnet/src/overlay/request.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,9 @@ use ethportal_api::types::{
66
portal_wire::{Request, Response},
77
};
88
use futures::channel::oneshot;
9-
use tokio::sync::OwnedSemaphorePermit;
109

1110
use super::errors::OverlayRequestError;
12-
use crate::find::query_pool::QueryId;
11+
use crate::{find::query_pool::QueryId, utp::timed_semaphore::OwnedTimedSemaphorePermit};
1312

1413
/// An incoming or outgoing request.
1514
#[derive(Debug, PartialEq)]
@@ -44,7 +43,7 @@ pub struct OverlayRequest {
4443
/// Will be None for requests that are not associated with a query.
4544
pub query_id: Option<QueryId>,
4645
/// An optional permit to allow for transfer caps
47-
pub request_permit: Option<OwnedSemaphorePermit>,
46+
pub request_permit: Option<OwnedTimedSemaphorePermit>,
4847
}
4948

5049
impl OverlayRequest {
@@ -54,7 +53,7 @@ impl OverlayRequest {
5453
direction: RequestDirection,
5554
responder: Option<OverlayResponder>,
5655
query_id: Option<QueryId>,
57-
request_permit: Option<OwnedSemaphorePermit>,
56+
request_permit: Option<OwnedTimedSemaphorePermit>,
5857
) -> Self {
5958
OverlayRequest {
6059
id: rand::random(),
@@ -77,7 +76,7 @@ pub struct ActiveOutgoingRequest {
7776
/// An optional QueryID for the query that this request is associated with.
7877
pub query_id: Option<QueryId>,
7978
/// An optional permit to allow for transfer caps
80-
pub request_permit: Option<OwnedSemaphorePermit>,
79+
pub request_permit: Option<OwnedTimedSemaphorePermit>,
8180
}
8281

8382
/// A response for a particular overlay request.

crates/portalnet/src/overlay/service.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ use tokio::{
4444
sync::{
4545
broadcast,
4646
mpsc::{self, UnboundedReceiver, UnboundedSender},
47-
OwnedSemaphorePermit,
4847
},
4948
task::JoinHandle,
5049
};
@@ -85,7 +84,7 @@ use crate::{
8584
node::Node,
8685
},
8786
utils::portal_wire,
88-
utp_controller::UtpController,
87+
utp::{controller::UtpController, timed_semaphore::OwnedTimedSemaphorePermit},
8988
};
9089

9190
pub const FIND_NODES_MAX_NODES: usize = 32;
@@ -1031,7 +1030,7 @@ impl<
10311030
let utp = Arc::clone(&self.utp_controller);
10321031
tokio::spawn(async move {
10331032
utp.accept_outbound_stream(cid, &content).await;
1034-
drop(permit);
1033+
permit.drop();
10351034
});
10361035

10371036
// Connection id is sent as BE because uTP header values are stored also as BE
@@ -1218,7 +1217,7 @@ impl<
12181217
})
12191218
.collect();
12201219
let _ = join_all(handles).await;
1221-
drop(permit);
1220+
permit.drop();
12221221
return;
12231222
}
12241223
};
@@ -1242,7 +1241,7 @@ impl<
12421241
})
12431242
.collect();
12441243
let _ = join_all(handles).await;
1245-
drop(permit);
1244+
permit.drop();
12461245
return;
12471246
}
12481247
};
@@ -1302,7 +1301,7 @@ impl<
13021301
Some(utp_processing.utp_controller),
13031302
);
13041303
// explicitly drop semaphore permit in thread so the permit is moved into the thread
1305-
drop(permit);
1304+
permit.drop();
13061305
});
13071306

13081307
let accept = Accept {
@@ -1433,7 +1432,7 @@ impl<
14331432
source: Enr,
14341433
request: Request,
14351434
query_id: Option<QueryId>,
1436-
request_permit: Option<OwnedSemaphorePermit>,
1435+
request_permit: Option<OwnedTimedSemaphorePermit>,
14371436
) {
14381437
// If the node is present in the routing table, but the node is not connected, then
14391438
// use the existing entry's value and direction. Otherwise, build a new entry from
@@ -1493,7 +1492,7 @@ impl<
14931492
response: Accept,
14941493
enr: Enr,
14951494
offer: Request,
1496-
request_permit: Option<OwnedSemaphorePermit>,
1495+
request_permit: Option<OwnedTimedSemaphorePermit>,
14971496
) -> anyhow::Result<Accept> {
14981497
// Check that a valid triggering request was sent
14991498
let mut gossip_result_tx = None;
@@ -1593,7 +1592,7 @@ impl<
15931592
}
15941593
// explicitly drop permit in the thread so the permit is included in the thread
15951594
if let Some(permit) = request_permit {
1596-
drop(permit);
1595+
permit.drop();
15971596
}
15981597
});
15991598

crates/portalnet/src/put_content.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::{
2222
request::{OverlayRequest, RequestDirection},
2323
},
2424
types::kbucket::SharedKBucketsTable,
25-
utp_controller::UtpController,
25+
utp::controller::UtpController,
2626
};
2727

2828
/// Datatype to store the result of a put content request.

crates/portalnet/src/utp_controller.rs renamed to crates/portalnet/src/utp/controller.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ use std::{sync::Arc, time::Duration};
33
use anyhow::anyhow;
44
use bytes::Bytes;
55
use lazy_static::lazy_static;
6-
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
6+
use tokio::sync::Semaphore;
77
use tracing::debug;
88
use trin_metrics::{
99
labels::{UtpDirectionLabel, UtpOutcomeLabel},
1010
overlay::OverlayMetricsReporter,
1111
};
1212
use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket};
1313

14+
use super::timed_semaphore::OwnedTimedSemaphorePermit;
1415
use crate::discovery::UtpEnr;
1516
/// UtpController is meant to be a container which contains all code related to/for managing uTP
1617
/// streams We are implementing this because we want the utils of controlling uTP connection to be
@@ -65,29 +66,45 @@ impl UtpController {
6566
}
6667

6768
/// Non-blocking method to try and acquire a permit for an outbound uTP transfer.
68-
// `try_acquire_owned()` isn't blocking and will instantly return with
69-
// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
70-
pub fn get_outbound_semaphore(&self) -> Option<OwnedSemaphorePermit> {
69+
/// `try_acquire_owned()` isn't blocking and will instantly return with
70+
/// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
71+
pub fn get_outbound_semaphore(&self) -> Option<OwnedTimedSemaphorePermit> {
7172
match self
7273
.outbound_utp_transfer_semaphore
7374
.clone()
7475
.try_acquire_owned()
7576
{
76-
Ok(permit) => Some(permit),
77+
Ok(permit) => {
78+
let histogram_timer = self
79+
.metrics
80+
.start_utp_process_timer(UtpDirectionLabel::Outbound);
81+
Some(OwnedTimedSemaphorePermit {
82+
permit,
83+
histogram_timer,
84+
})
85+
}
7786
Err(_) => None,
7887
}
7988
}
8089

8190
/// Non-blocking method to try and acquire a permit for an inbound uTP transfer.
82-
// `try_acquire_owned()` isn't blocking and will instantly return with
83-
// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
84-
pub fn get_inbound_semaphore(&self) -> Option<OwnedSemaphorePermit> {
91+
/// `try_acquire_owned()` isn't blocking and will instantly return with
92+
/// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
93+
pub fn get_inbound_semaphore(&self) -> Option<OwnedTimedSemaphorePermit> {
8594
match self
8695
.inbound_utp_transfer_semaphore
8796
.clone()
8897
.try_acquire_owned()
8998
{
90-
Ok(permit) => Some(permit),
99+
Ok(permit) => {
100+
let histogram_timer = self
101+
.metrics
102+
.start_utp_process_timer(UtpDirectionLabel::Inbound);
103+
Some(OwnedTimedSemaphorePermit {
104+
permit,
105+
histogram_timer,
106+
})
107+
}
91108
Err(_) => None,
92109
}
93110
}

crates/portalnet/src/utp/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
pub mod controller;
2+
pub mod timed_semaphore;
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use tokio::sync::OwnedSemaphorePermit;
2+
use trin_metrics::timer::DiscardOnDropHistogramTimer;
3+
4+
/// A owned semaphore which records the time it has been alive from initialization to when drop() is
5+
/// called.
6+
#[derive(Debug)]
7+
pub struct OwnedTimedSemaphorePermit {
8+
pub permit: OwnedSemaphorePermit,
9+
pub histogram_timer: DiscardOnDropHistogramTimer,
10+
}
11+
12+
impl OwnedTimedSemaphorePermit {
13+
pub fn drop(self) {
14+
self.histogram_timer.stop_and_record();
15+
drop(self.permit);
16+
}
17+
}

0 commit comments

Comments
 (0)