Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions dc/s2n-quic-dc/events/acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,30 @@ struct AcceptorTcpIoError<'a> {
error: &'a std::io::Error,
}

/// Emitted when the TCP stream has been sent over a Unix domain socket
#[event("acceptor:tcp:socket_sent")]
#[subject(endpoint)]
struct AcceptorTcpSocketSent<'a> {
/// The credential ID of the stream
#[snapshot("[HIDDEN]")]
credential_id: &'a[u8],

/// The ID of the stream
stream_id: u64,

/// The amount of time the TCP stream spent in the queue before being enqueued
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which queue is this? I suspect this is time from TCP accept() to UDS sendmsg()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the "fresh" queue. Can change the comment to "before being sent over UDS"

#[timer("sojourn_time")]
sojourn_time: core::time::Duration,

/// The number of times the Unix domain socket was blocked on send
#[measure("blocked_count")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this for sending this particular stream? It might make sense to switch this (or add) a #[counter(...)] as well.

Measure will emit histograms.

blocked_count: usize,

/// The len of the payload sent over the Unix domain socket
#[measure("len", Bytes)]
payload_len: usize,
}

/// Emitted when a UDP acceptor is started
#[event("acceptor:udp:started")]
#[subject(endpoint)]
Expand Down
153 changes: 153 additions & 0 deletions dc/s2n-quic-dc/src/event/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,36 @@ pub mod api {
}
#[derive(Clone, Debug)]
#[non_exhaustive]
#[doc = " Emitted when the TCP stream has been sent over a Unix domain socket"]
pub struct AcceptorTcpSocketSent<'a> {
#[doc = " The credential ID of the stream"]
pub credential_id: &'a [u8],
#[doc = " The ID of the stream"]
pub stream_id: u64,
#[doc = " The amount of time the TCP stream spent in the queue before being enqueued"]
pub sojourn_time: core::time::Duration,
#[doc = " The number of times the Unix domain socket was blocked on send"]
pub blocked_count: usize,
#[doc = " The len of the payload sent over the Unix domain socket"]
pub payload_len: usize,
}
#[cfg(any(test, feature = "testing"))]
impl<'a> crate::event::snapshot::Fmt for AcceptorTcpSocketSent<'a> {
fn fmt(&self, fmt: &mut core::fmt::Formatter) -> core::fmt::Result {
let mut fmt = fmt.debug_struct("AcceptorTcpSocketSent");
fmt.field("credential_id", &"[HIDDEN]");
fmt.field("stream_id", &self.stream_id);
fmt.field("sojourn_time", &self.sojourn_time);
fmt.field("blocked_count", &self.blocked_count);
fmt.field("payload_len", &self.payload_len);
fmt.finish()
}
}
impl<'a> Event for AcceptorTcpSocketSent<'a> {
const NAME: &'static str = "acceptor:tcp:socket_sent";
}
#[derive(Clone, Debug)]
#[non_exhaustive]
#[doc = " Emitted when a UDP acceptor is started"]
pub struct AcceptorUdpStarted<'a> {
#[doc = " The id of the acceptor worker"]
Expand Down Expand Up @@ -2377,6 +2407,22 @@ pub mod tracing {
tracing :: event ! (target : "acceptor_tcp_io_error" , parent : parent , tracing :: Level :: DEBUG , { error = tracing :: field :: debug (error) });
}
#[inline]
fn on_acceptor_tcp_socket_sent(
&self,
meta: &api::EndpointMeta,
event: &api::AcceptorTcpSocketSent,
) {
let parent = self.parent(meta);
let api::AcceptorTcpSocketSent {
credential_id,
stream_id,
sojourn_time,
blocked_count,
payload_len,
} = event;
tracing :: event ! (target : "acceptor_tcp_socket_sent" , parent : parent , tracing :: Level :: DEBUG , { credential_id = tracing :: field :: debug (credential_id) , stream_id = tracing :: field :: debug (stream_id) , sojourn_time = tracing :: field :: debug (sojourn_time) , blocked_count = tracing :: field :: debug (blocked_count) , payload_len = tracing :: field :: debug (payload_len) });
}
#[inline]
fn on_acceptor_udp_started(
&self,
meta: &api::EndpointMeta,
Expand Down Expand Up @@ -3650,6 +3696,39 @@ pub mod builder {
}
}
#[derive(Clone, Debug)]
#[doc = " Emitted when the TCP stream has been sent over a Unix domain socket"]
pub struct AcceptorTcpSocketSent<'a> {
#[doc = " The credential ID of the stream"]
pub credential_id: &'a [u8],
#[doc = " The ID of the stream"]
pub stream_id: u64,
#[doc = " The amount of time the TCP stream spent in the queue before being enqueued"]
pub sojourn_time: core::time::Duration,
#[doc = " The number of times the Unix domain socket was blocked on send"]
pub blocked_count: usize,
#[doc = " The len of the payload sent over the Unix domain socket"]
pub payload_len: usize,
}
impl<'a> IntoEvent<api::AcceptorTcpSocketSent<'a>> for AcceptorTcpSocketSent<'a> {
#[inline]
fn into_event(self) -> api::AcceptorTcpSocketSent<'a> {
let AcceptorTcpSocketSent {
credential_id,
stream_id,
sojourn_time,
blocked_count,
payload_len,
} = self;
api::AcceptorTcpSocketSent {
credential_id: credential_id.into_event(),
stream_id: stream_id.into_event(),
sojourn_time: sojourn_time.into_event(),
blocked_count: blocked_count.into_event(),
payload_len: payload_len.into_event(),
}
}
}
#[derive(Clone, Debug)]
#[doc = " Emitted when a UDP acceptor is started"]
pub struct AcceptorUdpStarted<'a> {
#[doc = " The id of the acceptor worker"]
Expand Down Expand Up @@ -5645,6 +5724,16 @@ mod traits {
let _ = meta;
let _ = event;
}
#[doc = "Called when the `AcceptorTcpSocketSent` event is triggered"]
#[inline]
fn on_acceptor_tcp_socket_sent(
&self,
meta: &api::EndpointMeta,
event: &api::AcceptorTcpSocketSent,
) {
let _ = meta;
let _ = event;
}
#[doc = "Called when the `AcceptorUdpStarted` event is triggered"]
#[inline]
fn on_acceptor_udp_started(
Expand Down Expand Up @@ -6567,6 +6656,14 @@ mod traits {
self.as_ref().on_acceptor_tcp_io_error(meta, event);
}
#[inline]
fn on_acceptor_tcp_socket_sent(
&self,
meta: &api::EndpointMeta,
event: &api::AcceptorTcpSocketSent,
) {
self.as_ref().on_acceptor_tcp_socket_sent(meta, event);
}
#[inline]
fn on_acceptor_udp_started(
&self,
meta: &api::EndpointMeta,
Expand Down Expand Up @@ -7344,6 +7441,15 @@ mod traits {
(self.1).on_acceptor_tcp_io_error(meta, event);
}
#[inline]
fn on_acceptor_tcp_socket_sent(
&self,
meta: &api::EndpointMeta,
event: &api::AcceptorTcpSocketSent,
) {
(self.0).on_acceptor_tcp_socket_sent(meta, event);
(self.1).on_acceptor_tcp_socket_sent(meta, event);
}
#[inline]
fn on_acceptor_udp_started(
&self,
meta: &api::EndpointMeta,
Expand Down Expand Up @@ -8091,6 +8197,8 @@ mod traits {
fn on_acceptor_tcp_stream_enqueued(&self, event: builder::AcceptorTcpStreamEnqueued);
#[doc = "Publishes a `AcceptorTcpIoError` event to the publisher's subscriber"]
fn on_acceptor_tcp_io_error(&self, event: builder::AcceptorTcpIoError);
#[doc = "Publishes a `AcceptorTcpSocketSent` event to the publisher's subscriber"]
fn on_acceptor_tcp_socket_sent(&self, event: builder::AcceptorTcpSocketSent);
#[doc = "Publishes a `AcceptorUdpStarted` event to the publisher's subscriber"]
fn on_acceptor_udp_started(&self, event: builder::AcceptorUdpStarted);
#[doc = "Publishes a `AcceptorUdpDatagramReceived` event to the publisher's subscriber"]
Expand Down Expand Up @@ -8313,6 +8421,13 @@ mod traits {
self.subscriber.on_event(&self.meta, &event);
}
#[inline]
fn on_acceptor_tcp_socket_sent(&self, event: builder::AcceptorTcpSocketSent) {
let event = event.into_event();
self.subscriber
.on_acceptor_tcp_socket_sent(&self.meta, &event);
self.subscriber.on_event(&self.meta, &event);
}
#[inline]
fn on_acceptor_udp_started(&self, event: builder::AcceptorUdpStarted) {
let event = event.into_event();
self.subscriber.on_acceptor_udp_started(&self.meta, &event);
Expand Down Expand Up @@ -9075,6 +9190,7 @@ pub mod testing {
pub acceptor_tcp_packet_dropped: AtomicU64,
pub acceptor_tcp_stream_enqueued: AtomicU64,
pub acceptor_tcp_io_error: AtomicU64,
pub acceptor_tcp_socket_sent: AtomicU64,
pub acceptor_udp_started: AtomicU64,
pub acceptor_udp_datagram_received: AtomicU64,
pub acceptor_udp_packet_received: AtomicU64,
Expand Down Expand Up @@ -9159,6 +9275,7 @@ pub mod testing {
acceptor_tcp_packet_dropped: AtomicU64::new(0),
acceptor_tcp_stream_enqueued: AtomicU64::new(0),
acceptor_tcp_io_error: AtomicU64::new(0),
acceptor_tcp_socket_sent: AtomicU64::new(0),
acceptor_udp_started: AtomicU64::new(0),
acceptor_udp_datagram_received: AtomicU64::new(0),
acceptor_udp_packet_received: AtomicU64::new(0),
Expand Down Expand Up @@ -9331,6 +9448,18 @@ pub mod testing {
let out = format!("{meta:?} {event:?}");
self.output.lock().unwrap().push(out);
}
fn on_acceptor_tcp_socket_sent(
&self,
meta: &api::EndpointMeta,
event: &api::AcceptorTcpSocketSent,
) {
self.acceptor_tcp_socket_sent
.fetch_add(1, Ordering::Relaxed);
let meta = crate::event::snapshot::Fmt::to_snapshot(meta);
let event = crate::event::snapshot::Fmt::to_snapshot(event);
let out = format!("{meta:?} {event:?}");
self.output.lock().unwrap().push(out);
}
fn on_acceptor_udp_started(
&self,
meta: &api::EndpointMeta,
Expand Down Expand Up @@ -9846,6 +9975,7 @@ pub mod testing {
pub acceptor_tcp_packet_dropped: AtomicU64,
pub acceptor_tcp_stream_enqueued: AtomicU64,
pub acceptor_tcp_io_error: AtomicU64,
pub acceptor_tcp_socket_sent: AtomicU64,
pub acceptor_udp_started: AtomicU64,
pub acceptor_udp_datagram_received: AtomicU64,
pub acceptor_udp_packet_received: AtomicU64,
Expand Down Expand Up @@ -9962,6 +10092,7 @@ pub mod testing {
acceptor_tcp_packet_dropped: AtomicU64::new(0),
acceptor_tcp_stream_enqueued: AtomicU64::new(0),
acceptor_tcp_io_error: AtomicU64::new(0),
acceptor_tcp_socket_sent: AtomicU64::new(0),
acceptor_udp_started: AtomicU64::new(0),
acceptor_udp_datagram_received: AtomicU64::new(0),
acceptor_udp_packet_received: AtomicU64::new(0),
Expand Down Expand Up @@ -10166,6 +10297,18 @@ pub mod testing {
let out = format!("{meta:?} {event:?}");
self.output.lock().unwrap().push(out);
}
fn on_acceptor_tcp_socket_sent(
&self,
meta: &api::EndpointMeta,
event: &api::AcceptorTcpSocketSent,
) {
self.acceptor_tcp_socket_sent
.fetch_add(1, Ordering::Relaxed);
let meta = crate::event::snapshot::Fmt::to_snapshot(meta);
let event = crate::event::snapshot::Fmt::to_snapshot(event);
let out = format!("{meta:?} {event:?}");
self.output.lock().unwrap().push(out);
}
fn on_acceptor_udp_started(
&self,
meta: &api::EndpointMeta,
Expand Down Expand Up @@ -11138,6 +11281,7 @@ pub mod testing {
pub acceptor_tcp_packet_dropped: AtomicU64,
pub acceptor_tcp_stream_enqueued: AtomicU64,
pub acceptor_tcp_io_error: AtomicU64,
pub acceptor_tcp_socket_sent: AtomicU64,
pub acceptor_udp_started: AtomicU64,
pub acceptor_udp_datagram_received: AtomicU64,
pub acceptor_udp_packet_received: AtomicU64,
Expand Down Expand Up @@ -11244,6 +11388,7 @@ pub mod testing {
acceptor_tcp_packet_dropped: AtomicU64::new(0),
acceptor_tcp_stream_enqueued: AtomicU64::new(0),
acceptor_tcp_io_error: AtomicU64::new(0),
acceptor_tcp_socket_sent: AtomicU64::new(0),
acceptor_udp_started: AtomicU64::new(0),
acceptor_udp_datagram_received: AtomicU64::new(0),
acceptor_udp_packet_received: AtomicU64::new(0),
Expand Down Expand Up @@ -11407,6 +11552,14 @@ pub mod testing {
let out = format!("{event:?}");
self.output.lock().unwrap().push(out);
}
fn on_acceptor_tcp_socket_sent(&self, event: builder::AcceptorTcpSocketSent) {
self.acceptor_tcp_socket_sent
.fetch_add(1, Ordering::Relaxed);
let event = event.into_event();
let event = crate::event::snapshot::Fmt::to_snapshot(&event);
let out = format!("{event:?}");
self.output.lock().unwrap().push(out);
}
fn on_acceptor_udp_started(&self, event: builder::AcceptorUdpStarted) {
self.acceptor_udp_started.fetch_add(1, Ordering::Relaxed);
let event = event.into_event();
Expand Down
Loading
Loading