Skip to content

Commit 84d09a6

Browse files
authored
Added Telemetry support for GLIDE client (core only) (valkey-io#2545)
1 parent 65bc84c commit 84d09a6

File tree

13 files changed

+349
-41
lines changed

13 files changed

+349
-41
lines changed

glide-core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ redis = { path = "./redis-rs/redis", features = [
1818
"cluster",
1919
"cluster-async",
2020
] }
21+
telemetrylib = { path = "./telemetry" }
2122
tokio = { version = "1", features = ["macros", "time"] }
2223
logger_core = { path = "../logger_core" }
2324
dispose = "0.5.0"
@@ -37,6 +38,8 @@ once_cell = "1.18.0"
3738
sha1_smol = "1.0.0"
3839
nanoid = "0.4.0"
3940
async-trait = { version = "0.1.24" }
41+
serde_json = "1"
42+
serde = { version = "1", features = ["derive"] }
4043

4144
[features]
4245
socket-layer = [

glide-core/redis-rs/redis/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ tracing = "0.1"
102102
# Optional uuid support
103103
uuid = { version = "1.6.1", optional = true }
104104

105+
telemetrylib = { path = "../../telemetry" }
106+
105107
[features]
106108
default = [
107109
"acl",

glide-core/redis-rs/redis/src/cluster_async/connections_container.rs

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@ use dashmap::DashMap;
66
use futures::FutureExt;
77
use rand::seq::IteratorRandom;
88
use std::net::IpAddr;
9+
use telemetrylib::Telemetry;
10+
11+
/// Count the number of connections in a connections_map object
12+
macro_rules! count_connections {
13+
($conn_map:expr) => {{
14+
let mut count = 0usize;
15+
for a in $conn_map {
16+
count = count.saturating_add(if a.management_connection.is_some() {
17+
2
18+
} else {
19+
1
20+
});
21+
}
22+
count
23+
}};
24+
}
925

1026
/// A struct that encapsulates a network connection along with its associated IP address.
1127
#[derive(Clone, Eq, PartialEq, Debug)]
@@ -66,6 +82,15 @@ where
6682
}
6783
}
6884

85+
/// Return the number of underlying connections managed by this instance of ClusterNode
86+
pub fn connections_count(&self) -> usize {
87+
if self.management_connection.is_some() {
88+
2
89+
} else {
90+
1
91+
}
92+
}
93+
6994
pub(crate) fn get_connection(&self, conn_type: &ConnectionType) -> Connection {
7095
match conn_type {
7196
ConnectionType::User => self.user_connection.conn.clone(),
@@ -106,6 +131,13 @@ pub(crate) struct ConnectionsContainer<Connection> {
106131
topology_hash: TopologyHash,
107132
}
108133

134+
impl<Connection> Drop for ConnectionsContainer<Connection> {
135+
fn drop(&mut self) {
136+
let count = count_connections!(&self.connection_map);
137+
Telemetry::decr_total_connections(count);
138+
}
139+
}
140+
109141
impl<Connection> Default for ConnectionsContainer<Connection> {
110142
fn default() -> Self {
111143
Self {
@@ -129,8 +161,14 @@ where
129161
read_from_replica_strategy: ReadFromReplicaStrategy,
130162
topology_hash: TopologyHash,
131163
) -> Self {
164+
let connection_map = connection_map.0;
165+
166+
// Update the telemetry with the number of connections
167+
let count = count_connections!(&connection_map);
168+
Telemetry::incr_total_connections(count);
169+
132170
Self {
133-
connection_map: connection_map.0,
171+
connection_map,
134172
slot_map,
135173
read_from_replica_strategy,
136174
topology_hash,
@@ -142,7 +180,11 @@ where
142180
&mut self,
143181
other_connection_map: ConnectionsMap<Connection>,
144182
) {
183+
let conn_count_before = count_connections!(&self.connection_map);
145184
self.connection_map.extend(other_connection_map.0);
185+
let conn_count_after = count_connections!(&self.connection_map);
186+
// Update the number of connections by the difference
187+
Telemetry::incr_total_connections(conn_count_after.saturating_sub(conn_count_before));
146188
}
147189

148190
/// Returns true if the address represents a known primary node.
@@ -275,20 +317,35 @@ where
275317
node: ClusterNode<Connection>,
276318
) -> String {
277319
let address = address.into();
278-
self.connection_map.insert(address.clone(), node);
320+
321+
// Increase the total number of connections by the number of connections managed by `node`
322+
Telemetry::incr_total_connections(node.connections_count());
323+
324+
if let Some(old_conn) = self.connection_map.insert(address.clone(), node) {
325+
// We are replacing a node. Reduce the counter by the number of connections managed by
326+
// the old connection
327+
Telemetry::decr_total_connections(old_conn.connections_count());
328+
};
279329
address
280330
}
281331

282332
pub(crate) fn remove_node(&self, address: &String) -> Option<ClusterNode<Connection>> {
283-
self.connection_map
284-
.remove(address)
285-
.map(|(_key, value)| value)
333+
if let Some((_key, old_conn)) = self.connection_map.remove(address) {
334+
Telemetry::decr_total_connections(old_conn.connections_count());
335+
Some(old_conn)
336+
} else {
337+
None
338+
}
286339
}
287340

288341
pub(crate) fn len(&self) -> usize {
289342
self.connection_map.len()
290343
}
291344

345+
pub(crate) fn connection_map(&self) -> &DashMap<String, ClusterNode<Connection>> {
346+
&self.connection_map
347+
}
348+
292349
pub(crate) fn get_current_topology_hash(&self) -> TopologyHash {
293350
self.topology_hash
294351
}

glide-core/redis-rs/redis/src/cluster_async/connections_logic.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
use std::net::SocketAddr;
2-
31
use super::{
42
connections_container::{ClusterNode, ConnectionWithIp},
53
Connect,
@@ -11,6 +9,7 @@ use crate::{
119
cluster_client::ClusterParams,
1210
ErrorKind, RedisError, RedisResult,
1311
};
12+
use std::net::SocketAddr;
1413

1514
use futures::prelude::*;
1615
use futures_util::{future::BoxFuture, join};
@@ -113,13 +112,15 @@ where
113112
C: ConnectionLike + Connect + Send + Sync + 'static + Clone,
114113
{
115114
match future::join(
115+
// User connection
116116
create_connection(
117117
addr,
118118
params.clone(),
119119
socket_addr,
120120
false,
121121
glide_connection_options.clone(),
122122
),
123+
// Management connection
123124
create_connection(
124125
addr,
125126
params.clone(),

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use tokio::task::JoinHandle;
5656

5757
#[cfg(feature = "tokio-comp")]
5858
use crate::aio::DisconnectNotifier;
59+
use telemetrylib::Telemetry;
5960

6061
use crate::{
6162
aio::{get_socket_addrs, ConnectionLike, MultiplexedConnection, Runtime},
@@ -144,7 +145,7 @@ where
144145
/// # Arguments
145146
///
146147
/// * `scan_state_rc` - A reference to the scan state, For initiating new scan send [`ScanStateRC::new()`],
147-
/// for each subsequent iteration use the returned [`ScanStateRC`].
148+
/// for each subsequent iteration use the returned [`ScanStateRC`].
148149
/// * `count` - An optional count of keys requested,
149150
/// the amount returned can vary and not obligated to return exactly count.
150151
/// * `object_type` - An optional [`ObjectType`] enum of requested key redis type.
@@ -181,7 +182,7 @@ where
181182
/// break;
182183
/// }
183184
/// }
184-
/// keys
185+
/// keys
185186
/// }
186187
/// ```
187188
pub async fn cluster_scan(
@@ -241,7 +242,7 @@ where
241242
/// break;
242243
/// }
243244
/// }
244-
/// keys
245+
/// keys
245246
/// }
246247
/// ```
247248
pub async fn cluster_scan_with_pattern<K: ToRedisArgs>(
@@ -495,14 +496,27 @@ pub(crate) struct ClusterConnInner<C> {
495496

496497
impl<C> Dispose for ClusterConnInner<C> {
497498
fn dispose(self) {
499+
if let Ok(conn_lock) = self.inner.conn_lock.try_read() {
500+
// Each node may contain user and *maybe* a management connection
501+
let mut count = 0usize;
502+
for node in conn_lock.connection_map() {
503+
count = node.connections_count();
504+
}
505+
Telemetry::decr_total_connections(count);
506+
}
507+
498508
if let Some(handle) = self.periodic_checks_handler {
499509
#[cfg(feature = "tokio-comp")]
500510
handle.abort()
501511
}
512+
502513
if let Some(handle) = self.connections_validation_handler {
503514
#[cfg(feature = "tokio-comp")]
504515
handle.abort()
505516
}
517+
518+
// Reduce the number of clients
519+
Telemetry::decr_total_clients(1);
506520
}
507521
}
508522

@@ -1080,6 +1094,8 @@ where
10801094
}
10811095
}
10821096

1097+
// New client added
1098+
Telemetry::incr_total_clients(1);
10831099
Ok(Disposable::new(connection))
10841100
}
10851101

@@ -1185,7 +1201,7 @@ where
11851201
Ok(connections.0)
11861202
}
11871203

1188-
// Reconnet to the initial nodes provided by the user in the creation of the client,
1204+
// Reconnect to the initial nodes provided by the user in the creation of the client,
11891205
// and try to refresh the slots based on the initial connections.
11901206
// Being used when all cluster connections are unavailable.
11911207
fn reconnect_to_initial_nodes(inner: Arc<InnerCore<C>>) -> impl Future<Output = ()> {
@@ -1272,7 +1288,7 @@ where
12721288
);
12731289

12741290
if !addrs_to_refresh.is_empty() {
1275-
// dont try existing nodes since we know a. it does not exist. b. exist but its connection is closed
1291+
// don't try existing nodes since we know a. it does not exist. b. exist but its connection is closed
12761292
Self::refresh_connections(
12771293
inner.clone(),
12781294
addrs_to_refresh,
@@ -1481,7 +1497,7 @@ where
14811497
});
14821498
let wait_duration = rate_limiter.wait_duration();
14831499
if passed_time <= wait_duration {
1484-
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
1500+
debug!("Skipping slot refresh as the wait duration hasn't yet passed. Passed time = {:?},
14851501
Wait duration = {:?}", passed_time, wait_duration);
14861502
skip_slots_refresh = true;
14871503
}

glide-core/src/client/mod.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,22 @@ pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(2
3131
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
3232
pub const INTERNAL_CONNECTION_TIMEOUT: Duration = Duration::from_millis(250);
3333
pub const FINISHED_SCAN_CURSOR: &str = "finished";
34-
// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
35-
//
36-
// Expected maximum request rate: 50,000 requests/second
37-
// Expected response time: 1 millisecond
38-
//
39-
// According to Little's Law, the maximum number of inflight requests required to fully utilize the maximum request rate is:
40-
// (50,000 requests/second) × (1 millisecond / 1000 milliseconds) = 50 requests
41-
//
42-
// The value of 1000 provides a buffer for bursts while still allowing full utilization of the maximum request rate.
34+
35+
/// The value of 1000 for the maximum number of inflight requests is determined based on Little's Law in queuing theory:
36+
///
37+
/// Expected maximum request rate: 50,000 requests/second
38+
/// Expected response time: 1 millisecond
39+
///
40+
/// According to Little's Law, the maximum number of inflight requests required to fully utilize the maximum request rate is:
41+
/// (50,000 requests/second) × (1 millisecond / 1000 milliseconds) = 50 requests
42+
///
43+
/// The value of 1000 provides a buffer for bursts while still allowing full utilization of the maximum request rate.
4344
pub const DEFAULT_MAX_INFLIGHT_REQUESTS: u32 = 1000;
4445

45-
// The connection check interval is currently not exposed to the user via ConnectionRequest,
46-
// as improper configuration could negatively impact performance or pub/sub resiliency.
47-
// A 3-second interval provides a reasonable balance between connection validation
48-
// and performance overhead.
46+
/// The connection check interval is currently not exposed to the user via ConnectionRequest,
47+
/// as improper configuration could negatively impact performance or pub/sub resiliency.
48+
/// A 3-second interval provides a reasonable balance between connection validation
49+
/// and performance overhead.
4950
pub const CONNECTION_CHECKS_INTERVAL: Duration = Duration::from_secs(3);
5051

5152
pub(super) fn get_port(address: &NodeAddress) -> u16 {
@@ -712,8 +713,7 @@ impl Client {
712713
})
713714
})
714715
.await
715-
.map_err(|_| ConnectionError::Timeout)
716-
.and_then(|res| res)
716+
.map_err(|_| ConnectionError::Timeout)?
717717
}
718718
}
719719

0 commit comments

Comments
 (0)